diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 3c3071c8df67ef8ec47dc20792bd90f4016909b1..54acc622162a25e65b2a1b6eb7a2bc586093a40d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2657,6 +2657,31 @@ typedef struct { SEpSet epSet; } SVgEpSet; +typedef struct { + int64_t refId; + uint64_t handle; +} SRSmaFetchMsg; + +static FORCE_INLINE int32_t tEncodeSRSmaFetchMsg(SEncoder* pCoder, const SRSmaFetchMsg* pReq) { + if (tStartEncode(pCoder) < 0) return -1; + + if (tEncodeI64(pCoder, pReq->refId) < 0) return -1; + if (tEncodeU64(pCoder, pReq->handle) < 0) return -1; + + tEndEncode(pCoder); + return 0; +} + +static FORCE_INLINE int32_t tDecodeSRSmaFetchMsg(SDecoder* pCoder, SRSmaFetchMsg* pReq) { + if (tStartDecode(pCoder) < 0) return -1; + + if (tDecodeI64(pCoder, &pReq->refId) < 0) return -1; + if (tDecodeU64(pCoder, &pReq->handle) < 0) return -1; + + tEndDecode(pCoder); + return 0; +} + typedef struct { int8_t version; // for compatibility(default 0) int8_t intervalUnit; // MACRO: TIME_UNIT_XXX diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 3c88b6f5cb6ee9f7f2c4a071236134a55f877e68..8074c768064aa769814d42ae05d33fb95ac0ba9e 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -308,12 +308,12 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) { * @return int32_t */ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { - SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); - if (!pSmaEnv) { + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + if (!pEnv) { return TSDB_CODE_SUCCESS; } - SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); + SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); // step 1: set rsma stat @@ -337,18 +337,26 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { } // step 3: swap rsmaInfoHash and iRsmaInfoHash - ASSERT(!RSMA_IMU_INFO_HASH(pRSmaStat)); + // lock + taosWLockLatch(SMA_ENV_LOCK(pEnv)); + ASSERT(RSMA_INFO_HASH(pRSmaStat)); + ASSERT(!RSMA_IMU_INFO_HASH(pRSmaStat)); RSMA_IMU_INFO_HASH(pRSmaStat) = RSMA_INFO_HASH(pRSmaStat); RSMA_INFO_HASH(pRSmaStat) = taosHashInit(RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); if (!RSMA_INFO_HASH(pRSmaStat)) { + // unlock + taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); smaError("vgId:%d, rsma async commit failed since %s", SMA_VID(pSma), terrstr()); return TSDB_CODE_FAILED; } + // unlock + taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); + // step 4: others pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; @@ -383,44 +391,51 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { * @return int32_t */ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { - SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); - if (!pSmaEnv) { + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + if (!pEnv) { return TSDB_CODE_SUCCESS; } - SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); + SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); // step 1: merge rsmaInfoHash and iRsmaInfoHash - taosWLockLatch(SMA_ENV_LOCK(pSmaEnv)); + // lock + taosWLockLatch(SMA_ENV_LOCK(pEnv)); if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) { - // TODO: optimization - just switch the hash pointer if rsmaInfoHash is empty - } - - void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL); - while (pIter) { - tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - - if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) { - taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter)); - smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), - *pSuid); - } else { - // free the resources - SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; - tdFreeRSmaInfo(pSma, pRSmaInfo, false); - smaDebug("vgId:%d, rsma async post commit, free rsma info since already COW for table:%" PRIi64, SMA_VID(pSma), - *pSuid); + // just switch the hash pointer if rsmaInfoHash is empty + if (taosHashGetSize(RSMA_IMU_INFO_HASH(pRSmaStat)) > 0) { + SHashObj *infoHash = RSMA_INFO_HASH(pRSmaStat); + RSMA_INFO_HASH(pRSmaStat) = RSMA_IMU_INFO_HASH(pRSmaStat); + RSMA_IMU_INFO_HASH(pRSmaStat) = infoHash; } + } else { + void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL); + while (pIter) { + tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); + + if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) { + taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter)); + smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), + *pSuid); + } else { + // free the resources + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; + tdFreeRSmaInfo(pSma, pRSmaInfo, false); + smaDebug("vgId:%d, rsma async post commit, free rsma info since already COW for table:%" PRIi64, SMA_VID(pSma), + *pSuid); + } - pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter); + pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter); + } } taosHashCleanup(RSMA_IMU_INFO_HASH(pRSmaStat)); RSMA_IMU_INFO_HASH(pRSmaStat) = NULL; - taosWUnLockLatch(SMA_ENV_LOCK(pSmaEnv)); + // unlock + taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); // step 2: cleanup outdated qtaskinfo files tdCleanupQTaskInfoFiles(pSma, pRSmaStat); diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 4b831225bc73725a417f07ce029a5d541e406208..5345d68c3b365443f872aa6e3b4436fa9797c350 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -17,7 +17,7 @@ typedef struct SSmaStat SSmaStat; -#define SMA_MGMT_REF_NUM 10240 +#define SMA_MGMT_REF_NUM 10240 extern SSmaMgmt smaMgmt; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 1c6d65c4d82f0b3903ecbd3c2930602f98dd10ea..dadf92b32229b8ff0eddc4d3ad75d23772c309a7 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -38,6 +38,7 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid); static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat, int8_t blkType); static void tdRSmaFetchTrigger(void *param, void *tmrId); +static void tdRSmaFetchTrigger2(void *param, void *tmrId); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); @@ -567,6 +568,30 @@ static void tdDestroySDataBlockArray(SArray *pArray) { taosArrayDestroy(pArray); } +/** + * @brief retention of rsma1/rsma2 + * + * @param pSma + * @param now + * @return int32_t + */ +int32_t smaDoRetention(SSma *pSma, int64_t now) { + int32_t code = TSDB_CODE_SUCCESS; + if (VND_IS_RSMA(pSma->pVnode)) { + return code; + } + + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + if (pSma->pRSmaTsdb[i]) { + code = tsdbDoRetention(pSma->pRSmaTsdb[i], now); + if (code) goto _end; + } + } + +_end: + return code; +} + static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat, int8_t blkType) { SArray *pResult = NULL; @@ -697,7 +722,7 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) { return pRSmaInfo; } - if (RSMA_COMMIT_STAT(pStat) == 0) { + if (RSMA_COMMIT_STAT(pStat) == 0) { // return NULL if not in committing stat return NULL; } @@ -705,23 +730,29 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) { SRSmaInfo *pCowRSmaInfo = NULL; // lock taosWLockLatch(SMA_ENV_LOCK(pEnv)); - if (!taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t))) { // 2-phase lock + if (!(pCowRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)))) { // 2-phase lock void *iRSmaInfo = taosHashGet(RSMA_IMU_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (iRSmaInfo) { SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo; if (pIRSmaInfo) { if (tdCloneRSmaInfo(pSma, pCowRSmaInfo, pIRSmaInfo) < 0) { + // unlock taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr()); return NULL; } smaDebug("vgId:%d, clone rsma info succeed for suid:%" PRIu64, SMA_VID(pSma), suid); if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pCowRSmaInfo, sizeof(pCowRSmaInfo)) < 0) { + // unlock taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); + smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr()); return NULL; } } } + } else { + pCowRSmaInfo = *(SRSmaInfo **)pCowRSmaInfo; + ASSERT(!pCowRSmaInfo); } // unlock taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); @@ -1365,19 +1396,86 @@ _end: tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); } -int32_t smaDoRetention(SSma *pSma, int64_t now) { - int32_t code = TSDB_CODE_SUCCESS; - if (VND_IS_RSMA(pSma->pVnode)) { - return code; +/** + * @brief trigger to get rsma result + * + * @param param + * @param tmrId + */ +static void tdRSmaFetchTrigger2(void *param, void *tmrId) { + SRSmaInfoItem *pItem = param; + SSma *pSma = NULL; + SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); + + if (!pStat) { + smaDebug("rsma fetch task not start since already destroyed, rsetId rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId, + pItem->refId); + return; } - for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - if (pSma->pRSmaTsdb[i]) { - code = tsdbDoRetention(pSma->pRSmaTsdb[i], now); - if (code) goto _end; + pSma = pStat->pSma; + + // if rsma trigger stat in paused, cancelled or finished, not start fetch task + int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat)); + switch (rsmaTriggerStat) { + case TASK_TRIGGER_STAT_PAUSED: + case TASK_TRIGGER_STAT_CANCELLED: { + tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); + smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64 + " refId:%d", + SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pItem->refId); + if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) { + taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay > 5000 ? 5000 : pItem->maxDelay, pItem, smaMgmt.tmrHandle, + &pItem->tmrId); + } + return; } + default: + break; + } + + SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem); + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + goto _end; + } + + int8_t fetchTriggerStat = + atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); + switch (fetchTriggerStat) { + case TASK_TRIGGER_STAT_ACTIVE: { + smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma), + pItem->level, pRSmaInfo->suid); + + // sync procedure => async process + tdRefRSmaInfo(pSma, pRSmaInfo); + + SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; + qSetMultiStreamInput(pItem->taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK); + tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK); + tdCleanupStreamInputDataBlock(pItem->taskInfo); + + tdUnRefRSmaInfo(pSma, pRSmaInfo); + // atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); + // taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId); + } break; + case TASK_TRIGGER_STAT_PAUSED: { + smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused", + SMA_VID(pSma), pItem->level, pRSmaInfo->suid); + } break; + case TASK_TRIGGER_STAT_INACTIVE: { + smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is inactive", + SMA_VID(pSma), pItem->level, pRSmaInfo->suid); + } break; + case TASK_TRIGGER_STAT_INIT: { + smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is init", SMA_VID(pSma), + pItem->level, pRSmaInfo->suid); + } break; + default: { + smaWarn("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is unknown", + SMA_VID(pSma), pItem->level, pRSmaInfo->suid); + } break; } _end: - return code; -} \ No newline at end of file + tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); +}