diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 23ad70bad3020fd6fcef2e7ae384757e675c2e40..5e5fbe861eead4311b744523854289ee1a617da0 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -28,7 +28,8 @@ extern "C" { #define smaError(...) do { if (smaDebugFlag & DEBUG_ERROR) { taosPrintLog("SMA ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) #define smaWarn(...) do { if (smaDebugFlag & DEBUG_WARN) { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) #define smaInfo(...) do { if (smaDebugFlag & DEBUG_INFO) { taosPrintLog("SMA ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) -#define smaDebug(...) do { if (smaDebugFlag & DEBUG_DEBUG) { taosPrintLog("SMA ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) +// #define smaDebug(...) do { if (smaDebugFlag & DEBUG_DEBUG) { taosPrintLog("SMA ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) +#define smaDebug(...) do { if (smaDebugFlag & DEBUG_WARN) { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) #define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) // clang-format on @@ -46,6 +47,11 @@ struct SSmaEnv { SSmaStat *pStat; }; +typedef struct { + int32_t smaRef; + int32_t refId; +} SSmaMgmt; + #define SMA_ENV_LOCK(env) ((env)->lock) #define SMA_ENV_TYPE(env) ((env)->type) #define SMA_ENV_STAT(env) ((env)->pStat) @@ -58,6 +64,7 @@ struct STSmaStat { struct SRSmaStat { SSma *pSma; + int64_t refId; void *tmrHandle; tmr_h tmrId; int32_t tmrSeconds; @@ -73,6 +80,7 @@ struct SSmaStat { }; T_REF_DECLARE() }; + #define SMA_TSMA_STAT(s) (&(s)->tsmaStat) #define SMA_RSMA_STAT(s) (&(s)->rsmaStat) #define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash) @@ -80,6 +88,7 @@ struct SSmaStat { #define RSMA_TMR_HANDLE(r) ((r)->tmrHandle) #define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) #define RSMA_RUNNING_STAT(r) (&(r)->runningStat) +#define RSMA_REF_ID(r) ((r)->refId) enum { TASK_TRIGGER_STAT_INIT = 0, @@ -192,10 +201,18 @@ typedef struct STFInfo STFInfo; typedef struct STFile STFile; struct STFInfo { + // common fields uint32_t magic; uint32_t ftype; uint32_t fver; int64_t fsize; + + // specific fields + union { + struct { + int64_t applyVer[2]; + } qTaskInfo; + }; }; struct STFile { @@ -230,7 +247,7 @@ int32_t tdUpdateTFileHeader(STFile *pTFile); void tdUpdateTFileMagic(STFile *pTFile, void *pCksm); void tdCloseTFile(STFile *pTFile); -void tdGetVndFileName(int32_t vid, const char *dname, const char *fname, char *outputName); +void tdGetVndFileName(int32_t vgId, const char *dname, const char *fname, char *outputName); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index baead763ad46a48196ebf62c1e12f21d004e9511..6bd7d4edd14a1ee5279b006a8613928756b97095 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -27,6 +27,7 @@ #include "tdatablock.h" #include "tdb.h" #include "tencode.h" +#include "tref.h" #include "tfs.h" #include "tglobal.h" #include "tjson.h" diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 7f115633b95d551d66db5df95243e46b321fadf9..36ee20fe257dd1ac7eeaec1a854ff0806afe3d03 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -18,6 +18,9 @@ typedef struct SSmaStat SSmaStat; #define RSMA_TASK_INFO_HASH_SLOT 8 +#define SMA_MGMT_REF_NUM 1024 + +extern SSmaMgmt smaMgmt; // declaration of static functions @@ -25,6 +28,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *p static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path); static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv); static void *tdFreeTSmaStat(STSmaStat *pStat); +static void tdDestroyRSmaStat(void *pRSmaStat); // implementation @@ -128,6 +132,22 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS if (smaType == TSDB_SMA_TYPE_ROLLUP) { SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat); pRSmaStat->pSma = (SSma *)pSma; + + // init smaMgmt + smaMgmt.smaRef = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat); + if (smaMgmt.refId < 0) { + smaError("init smaRef failed, num:%d", SMA_MGMT_REF_NUM); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + + int64_t refId = taosAddRef(smaMgmt.smaRef, pRSmaStat); + if (refId < 0) { + smaError("taosAddRef smaRef failed, since:%s", tstrerror(terrno)); + return TSDB_CODE_FAILED; + } + pRSmaStat->refId = refId; + // init timer RSMA_TMR_HANDLE(pRSmaStat) = taosTmrInit(10000, 100, 10000, "RSMA"); if (!RSMA_TMR_HANDLE(pRSmaStat)) { @@ -150,6 +170,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS } else { ASSERT(0); } + } return TSDB_CODE_SUCCESS; } @@ -169,9 +190,10 @@ static void *tdFreeTSmaStat(STSmaStat *pStat) { return NULL; } -static void tdDestroyRSmaStat(SRSmaStat *pStat) { - if (pStat) { - smaDebug("vgId:%d destroy rsma stat", SMA_VID(pStat->pSma)); +static void tdDestroyRSmaStat(void *pRSmaStat) { + if (pRSmaStat) { + SRSmaStat *pStat = (SRSmaStat *)pRSmaStat; + smaDebug("vgId:%d %s:%d destroy rsma stat %p", SMA_VID(pStat->pSma), __func__, __LINE__, pRSmaStat); // step 1: set persistence task cancelled atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED); @@ -183,9 +205,10 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) { if (atomic_load_8(RSMA_RUNNING_STAT(pStat)) == 1) { while (1) { if (atomic_load_8(RSMA_TRIGGER_STAT(pStat)) == TASK_TRIGGER_STAT_FINISHED) { + smaDebug("rsma, persist task finished already"); break; } else { - smaDebug("not destroyed since rsma stat in %" PRIi8, atomic_load_8(RSMA_TRIGGER_STAT(pStat))); + smaDebug("rsma, persist task not finished yet since rsma stat in %" PRIi8, atomic_load_8(RSMA_TRIGGER_STAT(pStat))); } ++nLoops; if (nLoops > 1000) { @@ -209,7 +232,10 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) { nLoops = 0; while (1) { if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) { + smaDebug("rsma, all fetch task finished already"); break; + } else { + smaDebug("rsma, fetch tasks not all finished yet"); } ++nLoops; if (nLoops > 1000) { @@ -222,18 +248,17 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) { if (RSMA_TMR_HANDLE(pStat)) { taosTmrCleanUp(RSMA_TMR_HANDLE(pStat)); } - } -} -static void *tdFreeRSmaStat(SRSmaStat *pStat) { - tdDestroyRSmaStat(pStat); - taosMemoryFreeClear(pStat); - return NULL; + } } void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { tdDestroySmaState(pSmaStat, smaType); - taosMemoryFreeClear(pSmaStat); + if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { + taosMemoryFreeClear(pSmaStat); + } + // tref used to free rsma stat + return NULL; } @@ -243,17 +268,21 @@ void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { * @param pSmaStat * @return int32_t */ + int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { if (pSmaStat) { if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat)); } else if (smaType == TSDB_SMA_TYPE_ROLLUP) { - tdDestroyRSmaStat(SMA_RSMA_STAT(pSmaStat)); + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSmaStat); + if (taosRemoveRef(smaMgmt.smaRef, RSMA_REF_ID(pRSmaStat)) < 0) { + smaError("remove refId from smaRef failed, refId:0x%" PRIx64, RSMA_REF_ID(pRSmaStat)); + } } else { ASSERT(0); } } - return TSDB_CODE_SUCCESS; + return 0; } int32_t tdLockSma(SSma *pSma) { diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 4402f37c34d24be6176590ae14d93507837dd9c6..62dddb2da7bdf2cbdbb540e168cc615f95555e91 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -18,9 +18,13 @@ #define RSMA_QTASKINFO_PERSIST_MS 7200000 #define RSMA_QTASKINFO_BUFSIZE 32768 #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid -typedef enum { TD_QTASK_TMP_FILE = 0, TD_QTASK_CUR_FILE } TD_QTASK_FILE_T; -static const char *tdQTaskInfoFname[] = {"qtaskinfo.t", "qtaskinfo"}; +SSmaMgmt smaMgmt = { + .smaRef = -1, +}; + +typedef enum { TD_QTASK_TMP_F = 0, TD_QTASK_CUR_F} TD_QTASK_FILE_T; +static const char *tdQTaskInfoFname[] = {"qtaskinfo.t", "qtaskinfo"}; typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem; typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter; @@ -80,6 +84,10 @@ struct SRSmaQTaskInfoIter { int32_t nBufPos; }; +static void tdRSmaQTaskInfoGetFName(int32_t vgId, int8_t ftype, char *outputName) { + tdGetVndFileName(vgId, VNODE_RSMA_DIR, tdQTaskInfoFname[ftype], outputName); +} + static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) { return lenWithHead - RSMA_QTASKINFO_HEAD_LEN; } @@ -761,7 +769,7 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) { STFile tFile = {0}; char qTaskInfoFName[TSDB_FILENAME_LEN]; - tdRSmaQTaskInfoGetFName(TD_VID(pVnode), TD_QTASK_CUR_FILE, qTaskInfoFName); + tdRSmaQTaskInfoGetFName(TD_VID(pVnode), TD_QTASK_TMP_F, qTaskInfoFName); if (tdInitTFile(&tFile, pVnode->pTfs, qTaskInfoFName) < 0) { goto _err; } @@ -1003,10 +1011,6 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) { return TSDB_CODE_SUCCESS; } -static void tdRSmaQTaskInfoGetFName(int32_t vid, int8_t ftype, char *outputName) { - tdGetVndFileName(vid, VNODE_RSMA_DIR, tdQTaskInfoFname[ftype], outputName); -} - static void *tdRSmaPersistExec(void *param) { setThreadName("rsma-task-persist"); SRSmaStat *pRSmaStat = param; @@ -1063,7 +1067,7 @@ static void *tdRSmaPersistExec(void *param) { if (!isFileCreated) { char qTaskInfoFName[TSDB_FILENAME_LEN]; - tdRSmaQTaskInfoGetFName(vid, TD_QTASK_TMP_FILE, qTaskInfoFName); + tdRSmaQTaskInfoGetFName(vid, TD_QTASK_TMP_F, qTaskInfoFName); tdInitTFile(&tFile, pTfs, qTaskInfoFName); tdCreateTFile(&tFile, pTfs, true, -1); @@ -1079,8 +1083,8 @@ static void *tdRSmaPersistExec(void *param) { ASSERT(headLen <= RSMA_QTASKINFO_HEAD_LEN); tdAppendTFile(&tFile, (void *)&tmpBuf, headLen, &toffset); - smaDebug("vgId:%d, table %" PRIi64 " level %d head part len:%d appended to offset:%" PRIi64, vid, pRSmaInfo->suid, - i + 1, headLen, toffset); + smaDebug("vgId:%d, table %" PRIi64 " level %d head part(len:%d) appended to offset:%" PRIi64, vid, + pRSmaInfo->suid, i + 1, headLen, toffset); tdAppendTFile(&tFile, pOutput, len, &toffset); smaDebug("vgId:%d, table %" PRIi64 " level %d body part len:%d appended to offset:%" PRIi64, vid, pRSmaInfo->suid, i + 1, len, toffset); @@ -1106,8 +1110,8 @@ _normal: char newFName[TSDB_FILENAME_LEN]; strncpy(newFName, TD_TFILE_FULL_NAME(&tFile), TSDB_FILENAME_LEN); - char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_FILE]); - strncpy(pos, tdQTaskInfoFname[TD_QTASK_CUR_FILE], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName)); + char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_F]); + strncpy(pos, tdQTaskInfoFname[TD_QTASK_TMP_F], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName)); if (taosRenameFile(TD_TFILE_FULL_NAME(&tFile), newFName) != 0) { smaError("vgId:%d, failed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName); goto _err; @@ -1134,6 +1138,7 @@ _end: ASSERT(0); } atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); + taosReleaseRef(smaMgmt.smaRef, pRSmaStat->refId); taosThreadExit(NULL); return NULL; } @@ -1159,6 +1164,7 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) { ASSERT(0); } atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); + taosReleaseRef(smaMgmt.smaRef, pRSmaStat->refId); } taosThreadAttrDestroy(&thAttr); @@ -1171,9 +1177,16 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) { * @param tmrId */ static void tdRSmaPersistTrigger(void *param, void *tmrId) { - SRSmaStat *pRSmaStat = param; + SRSmaStat *rsmaStat = param; + int64_t refId = rsmaStat->refId; + + SRSmaStat *pRSmaStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, refId); + if(!pRSmaStat) { + smaDebug("rsma persistence task not start since already destroyed"); + return; + } - int8_t tmrStat = + int8_t tmrStat = atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); switch (tmrStat) { case TASK_TRIGGER_STAT_ACTIVE: { @@ -1191,6 +1204,7 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) { } else { atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); } + return; } break; case TASK_TRIGGER_STAT_CANCELLED: { atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_FINISHED); @@ -1206,4 +1220,5 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) { smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat); } break; } + taosReleaseRef(smaMgmt.smaRef, refId); } diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 5f78aadddf6fdbb98d0efab356d02090a1b381f5..17bc2cdaca2d8dfcdd334d9cd7e30d1a394ff63c 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -141,8 +141,8 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) } #if 1 - smaDebug("append to file %s, offset:%" PRIi64 " + nbyte:%" PRIi64 " =%" PRIi64, TD_TFILE_FULL_NAME(pTFile), toffset, - nbyte, toffset + nbyte); + smaDebug("append to file %s, offset:%" PRIi64 " nbyte:%" PRIi64 " fsize:%" PRIi64, TD_TFILE_FULL_NAME(pTFile), + toffset, nbyte, toffset + nbyte); #endif ASSERT(pTFile->info.fsize == toffset); @@ -179,8 +179,8 @@ void tdCloseTFile(STFile *pTFile) { } } -void tdGetVndFileName(int32_t vid, const char *dname, const char *fname, char *outputName) { - snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/%s", vid, dname, fname); +void tdGetVndFileName(int32_t vgId, const char *dname, const char *fname, char *outputName) { + snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/%s", vgId, dname, fname); } int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) {