diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 3e15fd9c57735ece5ad49c3381f52c11ca9551fa..b455d779c16887c6b12bcfb7985d2778232bcb32 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -48,7 +48,6 @@ struct SSmaEnv { typedef struct { int32_t smaRef; - int32_t refId; } SSmaMgmt; #define SMA_ENV_LOCK(env) ((env)->lock) @@ -63,12 +62,12 @@ struct STSmaStat { struct SRSmaStat { SSma *pSma; - int64_t refId; - void *tmrHandle; - tmr_h tmrId; - int32_t tmrSeconds; - int8_t triggerStat; - int8_t runningStat; + int64_t refId; // shared by persistence/fetch tasks + void *tmrHandle; // for persistence task + tmr_h tmrId; // for persistence task + int32_t tmrSeconds; // for persistence task + int8_t triggerStat; // for persistence task + int8_t runningStat; // for persistence task SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; }; diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index e70e79fb6fbca87617e67ce2209d2af7b6e45c4b..120d6612a25665b295190434cc00f2ac42e5ce21 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -135,7 +135,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS // init smaMgmt smaMgmt.smaRef = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat); - if (smaMgmt.refId < 0) { + if (smaMgmt.smaRef < 0) { smaError("init smaRef failed, num:%d", SMA_MGMT_REF_NUM); terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index bd74bbd639d4c38c097ba2037ecbf49424035b1b..bf662ddc05bda6326a947cde76d896056067f585 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -50,6 +50,7 @@ static int32_t tdRSmaRestoreTSDataReload(SSma *pSma); struct SRSmaInfoItem { SRSmaInfo *pRsmaInfo; + int64_t refId; void *taskInfo; // qTaskInfo_t tmr_h tmrId; int8_t level; @@ -60,11 +61,14 @@ struct SRSmaInfoItem { struct SRSmaInfo { STSchema *pTSchema; - SSma *pSma; + SRSmaStat *pStat; int64_t suid; SRSmaInfoItem items[TSDB_RETENTION_L2]; }; +#define RSMA_INFO_SMA(r) ((r)->pStat->pSma) +#define RSMA_INFO_STAT(r) ((r)->pStat) + struct SRSmaQTaskInfoItem { int32_t len; int8_t type; @@ -107,22 +111,21 @@ static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle, int32_t vgId, void *tdFreeRSmaInfo(SRSmaInfo *pInfo) { if (pInfo) { + SSma *pSma = RSMA_INFO_SMA(pInfo); for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { SRSmaInfoItem *pItem = &pInfo->items[i]; if (pItem->taskInfo) { - smaDebug("vgId:%d, stb %" PRIi64 " stop fetch-timer %p level %d", SMA_VID(pInfo->pSma), pInfo->suid, - pItem->tmrId, i + 1); + smaDebug("vgId:%d, stb %" PRIi64 " stop fetch-timer %p level %d", SMA_VID(pSma), pInfo->suid, pItem->tmrId, + i + 1); taosTmrStopA(&pItem->tmrId); - tdFreeTaskHandle(&pItem->taskInfo, SMA_VID(pInfo->pSma), i + 1); + tdFreeTaskHandle(&pItem->taskInfo, SMA_VID(pSma), i + 1); } else { - smaDebug("vgId:%d, stb %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", - SMA_VID(pInfo->pSma), pInfo->suid, i + 1); + smaDebug("vgId:%d, stb %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma), + pInfo->suid, i + 1); } } taosMemoryFree(pInfo->pTSchema); taosMemoryFree(pInfo); - } else { - smaDebug("vgId:%d, stb %" PRIi64 " no need to destroy rsma info since empty", SMA_VID(pInfo->pSma), pInfo->suid); } return NULL; @@ -255,6 +258,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo if (param->qmsg[idx]) { SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); + pItem->refId = RSMA_REF_ID(pRSmaInfo->pStat); pItem->pRsmaInfo = pRSmaInfo; pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], pReadHandle); if (!pItem->taskInfo) { @@ -340,7 +344,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con goto _err; } pRSmaInfo->pTSchema = pTSchema; - pRSmaInfo->pSma = pSma; + pRSmaInfo->pStat = pStat; pRSmaInfo->suid = suid; if (tdSetRSmaInfoItemParams(pSma, param, pRSmaInfo, &handle, 0) < 0) { @@ -522,7 +526,7 @@ static void tdDestroySDataBlockArray(SArray *pArray) { static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) { SArray *pResult = NULL; SRSmaInfo *pRSmaInfo = pItem->pRsmaInfo; - SSma *pSma = pRSmaInfo->pSma; + SSma *pSma = RSMA_INFO_SMA(pRSmaInfo); while (1) { SSDataBlock *output = NULL; @@ -585,21 +589,29 @@ _err: */ static void tdRSmaFetchTrigger(void *param, void *tmrId) { SRSmaInfoItem *pItem = param; - SSma *pSma = pItem->pRsmaInfo->pSma; - SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)pSma->pRSmaEnv); + SSma *pSma = NULL; + SRSmaStat *pStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, pItem->refId); + if (!pStat) { + smaDebug("rsma fetch task not start since already destroyed"); + return; + } + pSma = RSMA_INFO_SMA(pItem->pRsmaInfo); + + // if rsma trigger stat in cancelled or finished, not start fetch task anymore int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat)); if (rsmaTriggerStat == TASK_TRIGGER_STAT_CANCELLED || rsmaTriggerStat == TASK_TRIGGER_STAT_FINISHED) { - smaDebug("vgId:%d, level %" PRIi8 " not fetch since stat is cancelled for table suid:%" PRIi64, SMA_VID(pSma), - pItem->level, pItem->pRsmaInfo->suid); + taosReleaseRef(smaMgmt.smaRef, pItem->refId); + smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is cancelled", + SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid); return; } int8_t fetchTriggerStat = atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); if (fetchTriggerStat == TASK_TRIGGER_STAT_ACTIVE) { - smaDebug("vgId:%d, level %" PRIi8 " stat is active for table suid:%" PRIi64, SMA_VID(pSma), pItem->level, - pItem->pRsmaInfo->suid); + smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma), + pItem->level, pItem->pRsmaInfo->suid); tdRefSmaStat(pSma, (SSmaStat *)pStat); @@ -610,9 +622,11 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { tdUnRefSmaStat(pSma, (SSmaStat *)pStat); } else { - smaDebug("vgId:%d, level %" PRIi8 " stat is inactive for table suid:%" PRIi64, SMA_VID(pSma), pItem->level, - pItem->pRsmaInfo->suid); + smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is inactive", + SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid); } +_end: + taosReleaseRef(smaMgmt.smaRef, pItem->refId); } static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, tb_uid_t suid, @@ -632,7 +646,6 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SUBMIT_BLOCK); atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); - smaDebug("vgId:%d, process rsma insert", SMA_VID(pSma)); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat); @@ -1036,7 +1049,7 @@ static void *tdRSmaPersistExec(void *param) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { qTaskInfo_t taskInfo = pRSmaInfo->items[i].taskInfo; if (!taskInfo) { - smaDebug("vgId:%d, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1); + smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1); continue; } @@ -1044,27 +1057,20 @@ static void *tdRSmaPersistExec(void *param) { int32_t len = 0; int8_t type = (int8_t)(i + 1); if (qSerializeTaskStatus(taskInfo, &pOutput, &len) < 0) { - smaError("vgId:%d, table %" PRIi64 " level %d serialize rsma task failed since %s", vid, pRSmaInfo->suid, i + 1, - terrstr(terrno)); + smaError("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo failed since %s", vid, pRSmaInfo->suid, + i + 1, terrstr(terrno)); goto _err; } if (!pOutput || len <= 0) { - smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success but no output(len %d), not persist", + smaDebug("vgId:%d, rsma, table %" PRIi64 + " level %d serialize qTaskInfo success but no output(len %d), not persist", vid, pRSmaInfo->suid, i + 1, len); taosMemoryFreeClear(pOutput); continue; } - smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success with len %d, need persist", vid, + smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo success with len %d, need persist", vid, pRSmaInfo->suid, i + 1, len); -#if 0 - if (qDeserializeTaskStatus(taskInfo, pOutput, len) < 0) { - smaError("vgId:%d, table %" PRIi64 "level %d deserialize rsma task failed since %s", vid, pRSmaInfo->suid, - i + 1, terrstr(terrno)); - } else { - smaDebug("vgId:%d, table %" PRIi64 " level %d deserialize rsma task success", vid, pRSmaInfo->suid, i + 1); - } -#endif if (!isFileCreated) { char qTaskInfoFName[TSDB_FILENAME_LEN]; @@ -1084,11 +1090,11 @@ static void *tdRSmaPersistExec(void *param) { ASSERT(headLen <= RSMA_QTASKINFO_HEAD_LEN); tdAppendTFile(&tFile, (void *)&tmpBuf, headLen, &toffset); - smaDebug("vgId:%d, table %" PRIi64 " level %d head part(len:%d) appended to offset:%" PRIi64, vid, + smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d head part(len:%d) appended to offset:%" PRIi64, vid, pRSmaInfo->suid, i + 1, headLen, toffset); tdAppendTFile(&tFile, pOutput, len, &toffset); - smaDebug("vgId:%d, table %" PRIi64 " level %d body part len:%d appended to offset:%" PRIi64, vid, pRSmaInfo->suid, - i + 1, len, toffset); + smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d body part len:%d appended to offset:%" PRIi64, vid, + pRSmaInfo->suid, i + 1, len, toffset); taosMemoryFree(pOutput); } @@ -1098,13 +1104,13 @@ static void *tdRSmaPersistExec(void *param) { _normal: if (isFileCreated) { if (tdUpdateTFileHeader(&tFile) < 0) { - smaError("vgId:%d, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile), + smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile), tstrerror(terrno)); tdCloseTFile(&tFile); tdRemoveTFile(&tFile); goto _err; } else { - smaDebug("vgId:%d, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile)); + smaDebug("vgId:%d, rsma, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile)); } tdCloseTFile(&tFile); @@ -1114,10 +1120,10 @@ _normal: char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_F]); strncpy(pos, tdQTaskInfoFname[TD_QTASK_TMP_F], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName)); if (taosRenameFile(TD_TFILE_FULL_NAME(&tFile), newFName) != 0) { - smaError("vgId:%d, failed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName); + smaError("vgId:%d, rsma, failed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName); goto _err; } else { - smaDebug("vgId:%d, succeed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName); + smaDebug("vgId:%d, rsma, succeed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName); } } goto _end; @@ -1129,13 +1135,13 @@ _end: if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INACTIVE, TASK_TRIGGER_STAT_ACTIVE)) { - smaDebug("vgId:%d, persist task is active again", vid); + smaDebug("vgId:%d, rsma persist task is active again", vid); } else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_CANCELLED, TASK_TRIGGER_STAT_FINISHED)) { - smaDebug("vgId:%d, persist task is cancelled", vid); + smaDebug("vgId:%d, rsma persist task is cancelled", vid); } else { - smaWarn("vgId:%d, persist task in abnormal stat %" PRIi8, vid, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))); + smaWarn("vgId:%d, rsma persist task in abnormal stat %" PRIi8, vid, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))); ASSERT(0); } atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); @@ -1179,9 +1185,8 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) { */ static void tdRSmaPersistTrigger(void *param, void *tmrId) { SRSmaStat *rsmaStat = param; - int64_t refId = rsmaStat->refId; + SRSmaStat *pRSmaStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, rsmaStat->refId); - SRSmaStat *pRSmaStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, refId); if (!pRSmaStat) { smaDebug("rsma persistence task not start since already destroyed"); return; @@ -1221,5 +1226,5 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) { smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat); } break; } - taosReleaseRef(smaMgmt.smaRef, refId); + taosReleaseRef(smaMgmt.smaRef, rsmaStat->refId); }