diff --git a/include/util/thash.h b/include/util/thash.h index fc8785a8fb51ac781355b4e10d9f1dec63a1732e..781c22a56aaba0d449d1f711b32fe4bd75a39003 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -188,7 +188,7 @@ void *taosHashGetKey(void *data, size_t* keyLen); void *taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen); /** - * release the prevous acquired obj + * release the previous acquired obj * * @param pHashObj * @param data diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index d5b719dfb974816dfce1f8085ba4cddcbaec3600..e767d94ebd9eba24f41cd10e2b1908b95dba37ed 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -67,7 +67,6 @@ struct SRSmaStat { int64_t submitVer; int64_t refId; // shared by fetch tasks int8_t triggerStat; // shared by fetch tasks - int8_t runningStat; // for persistence task SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; }; @@ -83,7 +82,6 @@ struct SSmaStat { #define SMA_RSMA_STAT(s) (&(s)->rsmaStat) #define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash) #define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) -#define RSMA_RUNNING_STAT(r) (&(r)->runningStat) #define RSMA_REF_ID(r) ((r)->refId) #define RSMA_SUBMIT_VER(r) ((r)->submitVer) @@ -93,7 +91,7 @@ enum { TASK_TRIGGER_STAT_INACTIVE = 2, TASK_TRIGGER_STAT_PAUSED = 3, TASK_TRIGGER_STAT_CANCELLED = 4, - TASK_TRIGGER_STAT_FINISHED = 5, + TASK_TRIGGER_STAT_DROPPED = 5, }; void tdDestroySmaEnv(SSmaEnv *pSmaEnv); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 57595a37d10bb1ea179bcab1ec00e94a0bc9e2e4..7b298ba830a348f280032fc36415cbb7f496bcae 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -172,8 +172,9 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); int64_t tdRSmaGetMaxSubmitVer(SSma* pSma, int8_t level); -int32_t tdProcessRSmaCreate(SVnode* pVnode, SVCreateStbReq* pReq); +int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq); int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType); +int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq); int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid); int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore); void tdUidStoreDestory(STbUidStore* pStore); diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 5eb96653263da70a48383b702cc9ee89a4c4826a..2cf4fd51a96130966f5f0d0957247bb67d3aff64 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -254,26 +254,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { // step 1: set rsma trigger stat cancelled atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED); - // step 2: wait the persistence thread to finish - int32_t nLoops = 0; - if (atomic_load_8(RSMA_RUNNING_STAT(pStat)) == 1) { - while (1) { - if (atomic_load_8(RSMA_TRIGGER_STAT(pStat)) == TASK_TRIGGER_STAT_FINISHED) { - smaDebug("vgId:%d, rsma persist task finished already", SMA_VID(pSma)); - break; - } else { - smaDebug("vgId:%d, rsma persist task not finished yet since rsma stat in %" PRIi8, SMA_VID(pSma), - atomic_load_8(RSMA_TRIGGER_STAT(pStat))); - } - ++nLoops; - if (nLoops > 1000) { - sched_yield(); - nLoops = 0; - } - } - } - - // step 3: destroy the rsma info and associated fetch tasks + // step 2: destroy the rsma info and associated fetch tasks // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) { void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL); @@ -285,8 +266,8 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { } taosHashCleanup(RSMA_INFO_HASH(pStat)); - // step 5: wait all triggered fetch tasks finished - nLoops = 0; + // step 3: wait all triggered fetch tasks finished + int32_t nLoops = 0; while (1) { if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) { smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma)); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index efa2886e48bdbd97e4953a918e9e23bf922288f1..14497c6f9bed80af4b8bddd295a0300bab27c46a 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -37,8 +37,6 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid); static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat, int8_t blkType); static void tdRSmaFetchTrigger(void *param, void *tmrId); -static void tdRSmaPersistTrigger(void *param, void *tmrId); -static void *tdRSmaPersistExec(void *param); static void tdRSmaQTaskInfoGetFName(int32_t vid, int64_t version, char *outputName); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); @@ -68,8 +66,8 @@ struct SRSmaInfo { static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) { // adapt accordingly if definition of SRSmaInfo update int32_t rsmaInfoHeadLen = sizeof(int64_t) + sizeof(STSchema *); - ASSERT(pItem->level == 1 || pItem->level == 2); - return (SRSmaInfo *)POINTER_SHIFT(pItem, -sizeof(SRSmaInfoItem) * (pItem->level - 1) - rsmaInfoHeadLen); + ASSERT(pItem->level == 0 || pItem->level == 1); + return (SRSmaInfo *)POINTER_SHIFT(pItem, -sizeof(SRSmaInfoItem) * pItem->level - rsmaInfoHeadLen); } struct SRSmaQTaskInfoItem { @@ -278,7 +276,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat if (pItem->maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) { pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY; } - pItem->level = (idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2); + pItem->level = idx; smaInfo("vgId:%d table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64 ", finally maxdelay:%" PRIi32, SMA_VID(pSma), pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay); @@ -375,20 +373,48 @@ _err: /** * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam currently * - * @param pVnode + * @param pSma * @param pReq * @return int32_t */ -int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { - SSma *pSma = pVnode->pSma; +int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) { + SVnode *pVnode = pSma->pVnode; if (!pReq->rollup) { - smaTrace("vgId:%d, return directly since no rollup for stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid); + smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since no rollup in req", TD_VID(pVnode), pReq->name, + pReq->suid); + return TSDB_CODE_SUCCESS; + } + + if (!VND_IS_RSMA(pVnode)) { + smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name, + pReq->suid); return TSDB_CODE_SUCCESS; } return tdProcessRSmaCreateImpl(pSma, &pReq->rsmaParam, pReq->suid, pReq->name); } +/** + * @brief drop cache for stb + * + * @param pSma + * @param pReq + * @return int32_t + */ +int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) { + SVnode *pVnode = pSma->pVnode; + if (!VND_IS_RSMA(pVnode)) { + smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name, + pReq->suid); + return TSDB_CODE_SUCCESS; + } + + + + smaDebug("vgId:%d, drop rsma for table %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid); + return TSDB_CODE_SUCCESS; + } + /** * @brief store suid/[uids], prefer to use array and then hash * @@ -1174,123 +1200,6 @@ _err: return TSDB_CODE_FAILED; } -static void *tdRSmaPersistExec(void *param) { - setThreadName("rsma-task-persist"); - SRSmaStat *pRSmaStat = param; - SSma *pSma = pRSmaStat->pSma; - - int8_t triggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)); - - if (TASK_TRIGGER_STAT_CANCELLED == triggerStat || TASK_TRIGGER_STAT_PAUSED == triggerStat) { - goto _end; - } - - // execution - tdRSmaPersistExecImpl(pRSmaStat); - -_end: - if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), - TASK_TRIGGER_STAT_INACTIVE, - TASK_TRIGGER_STAT_ACTIVE)) { - smaDebug("vgId:%d, rsma persist task is active again", SMA_VID(pSma)); - } else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), - TASK_TRIGGER_STAT_CANCELLED, - TASK_TRIGGER_STAT_FINISHED)) { - smaDebug("vgId:%d, rsma persist task is cancelled", SMA_VID(pSma)); - } else { - smaWarn("vgId:%d, rsma persist task in stat %" PRIi8, SMA_VID(pSma), atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))); - } - - atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); - smaDebug("vgId:%d, release rsetId rsetId:%" PRIi64 " refId:%d", SMA_VID(pSma), smaMgmt.rsetId, pRSmaStat->refId); - tdReleaseSmaRef(smaMgmt.rsetId, pRSmaStat->refId, __func__, __LINE__); - taosThreadExit(NULL); - return NULL; -} - -static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) { - TdThreadAttr thAttr; - taosThreadAttrInit(&thAttr); - taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED); - TdThread tid; - - if (taosThreadCreate(&tid, &thAttr, tdRSmaPersistExec, pRSmaStat) != 0) { - if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), - TASK_TRIGGER_STAT_INACTIVE, - TASK_TRIGGER_STAT_ACTIVE)) { - smaDebug("vgId:%d, persist task is active again", SMA_VID(pRSmaStat->pSma)); - } else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), - TASK_TRIGGER_STAT_CANCELLED, - TASK_TRIGGER_STAT_FINISHED)) { - smaDebug("vgId:%d, persist task is cancelled and set finished", SMA_VID(pRSmaStat->pSma)); - } else { - smaWarn("vgId:%d, persist task in abnormal stat %" PRIi8, SMA_VID(pRSmaStat->pSma), - atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))); - } - atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); - smaDebug("vgId:%d, release rsetId rsetId:%" PRIi64 " refId:%d)", SMA_VID(pRSmaStat->pSma), smaMgmt.rsetId, - pRSmaStat->refId); - tdReleaseSmaRef(smaMgmt.rsetId, pRSmaStat->refId, __func__, __LINE__); - } - - taosThreadAttrDestroy(&thAttr); -} - -/** - * @brief trigger to persist rsma qTaskInfo - * - * @param param - * @param tmrId - */ -static void tdRSmaPersistTrigger(void *param, void *tmrId) { - SRSmaStat *rsmaStat = param; - SRSmaStat *pRSmaStat = (SRSmaStat *)taosAcquireRef(smaMgmt.rsetId, rsmaStat->refId); - ASSERT(0); - if (!pRSmaStat) { - smaDebug("rsma persistence task not start since already destroyed"); - return; - } - - int8_t tmrStat = - atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); - switch (tmrStat) { - case TASK_TRIGGER_STAT_ACTIVE: { - atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 1); - if (TASK_TRIGGER_STAT_CANCELLED != atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), - TASK_TRIGGER_STAT_CANCELLED, - TASK_TRIGGER_STAT_FINISHED)) { - smaDebug("vgId:%d, rsma persistence start since active", SMA_VID(pRSmaStat->pSma)); - - // start persist task - tdRSmaPersistTask(pRSmaStat); - - // taosTmrReset(tdRSmaPersistTrigger, 5000, pRSmaStat, pRSmaStat->tmrHandle, - // RSMA_TMR_ID(pRSmaStat)); - } else { - atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); - } - return; - } break; - case TASK_TRIGGER_STAT_CANCELLED: { - atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_FINISHED); - smaDebug("rsma persistence not start since cancelled and finished"); - } break; - case TASK_TRIGGER_STAT_PAUSED: { - smaDebug("rsma persistence not start since paused"); - } break; - case TASK_TRIGGER_STAT_INACTIVE: { - smaDebug("rsma persistence not start since inactive"); - } break; - case TASK_TRIGGER_STAT_INIT: { - smaDebug("rsma persistence not start since init"); - } break; - default: { - smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat); - } break; - } - taosReleaseRef(smaMgmt.rsetId, rsmaStat->refId); -} - /** * @brief trigger to get rsma result * @@ -1314,8 +1223,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat)); switch (rsmaTriggerStat) { case TASK_TRIGGER_STAT_PAUSED: - case TASK_TRIGGER_STAT_CANCELLED: - case TASK_TRIGGER_STAT_FINISHED: { + case TASK_TRIGGER_STAT_CANCELLED: { tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64 " refId:%d", @@ -1328,7 +1236,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem); - ASSERT(pRSmaInfo->suid > 0); + ASSERT(pRSmaInfo->items[pItem->level].level == pItem->level); int8_t fetchTriggerStat = atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index c55edc4942e571ee9831a25c1622095b42e384ae..d25ae817c7a700217d066ede669cb28a8b5da845 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -417,7 +417,7 @@ static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *p goto _err; } - if (tdProcessRSmaCreate(pVnode, &req) < 0) { + if (tdProcessRSmaCreate(pVnode->pSma, &req) < 0) { pRsp->code = terrno; goto _err; } @@ -573,6 +573,11 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe goto _exit; } + if (tdProcessRSmaDrop(pVnode->pSma, &req) < 0) { + rcode = terrno; + goto _exit; + } + // return rsp _exit: pRsp->code = rcode;