diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index ca5367f39714ed1f3a979068b0a9a7204d385f8c..0e644be2889b79b853a7ca8e69bd4b0e162b61b2 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -312,15 +312,22 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat); + int32_t nLoops = 0; // step 1: set rsma stat atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); - atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 1); + while (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 1) != 0) { + ++nLoops; + if (nLoops > 1000) { + sched_yield(); + nLoops = 0; + } + } pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; ASSERT(pRSmaStat->commitAppliedVer > 0); // step 2: wait for all triggered fetch tasks to finish - int32_t nLoops = 0; + while (1) { if (T_REF_VAL_GET(pStat) == 0) { smaDebug("vgId:%d, rsma commit, fetch tasks are all finished", SMA_VID(pSma)); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 426ab521fdd16e3d0907fd1c056da15d7d6937cc..52b08d131c48160f9e871a68ce200acc142c95c0 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -21,6 +21,7 @@ #define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt #define RSMA_FETCH_DELAY_MAX (900000) // ms #define RSMA_FETCH_ACTIVE_MAX (1800) // ms +#define RSMA_FETCH_INTERVAL (5000) // ms SSmaMgmt smaMgmt = { .inited = 0, @@ -1501,13 +1502,13 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { switch (rsmaTriggerStat) { case TASK_TRIGGER_STAT_PAUSED: case TASK_TRIGGER_STAT_CANCELLED: { - tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); smaDebug("vgId:%d, rsma fetch task not start for level %" PRIi8 " since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64 " refId:%d", SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId); if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) { - taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId); + taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); } + tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); return; } default: @@ -1518,7 +1519,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); switch (fetchTriggerStat) { case TASK_TRIGGER_STAT_ACTIVE: { - smaDebug("vgId:%d, rsma fetch task started for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", + smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", SMA_VID(pSma), pItem->level, pRSmaInfo->suid); // async process pItem->fetchLevel = pItem->level; @@ -1531,8 +1532,6 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { if (atomic_load_8(&pRSmaInfo->assigned) == 0) { tsem_post(&(pStat->notEmpty)); } - smaInfo("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64, SMA_VID(pSma), pItem->level, - pRSmaInfo->suid); } break; case TASK_TRIGGER_STAT_PAUSED: { smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is paused", @@ -1715,15 +1714,25 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), qallItemSize, type); } - if (type == RSMA_EXEC_OVERFLOW) { + if (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2) == 0) { tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); + atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); } if (qallItemSize > 0) { atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); continue; } else if (RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { - continue; + if (atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat)) == 0) { + continue; + } + for (int32_t j = 0; j < TSDB_RETENTION_L2; ++j) { + SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, j); + if (pItem->fetchLevel) { + pItem->fetchLevel = 0; + taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); + } + } } break; @@ -1775,7 +1784,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { if (pEnv->flag & SMA_ENV_FLG_CLOSE) { break; } - + tsem_wait(&pRSmaStat->notEmpty); if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { diff --git a/source/libs/executor/src/tsimplehash.c b/source/libs/executor/src/tsimplehash.c index 8cd376e0920c2c1e4bfaf89a2d3a087c7cf0ac5a..84b615af7a93aef9fbf86190a2544474b7b2c87b 100644 --- a/source/libs/executor/src/tsimplehash.c +++ b/source/libs/executor/src/tsimplehash.c @@ -295,11 +295,7 @@ int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t ke } if (*pIter == (void *)GET_SHASH_NODE_DATA(pNode)) { - if (!pPrev) { - *pIter = NULL; - } else { - *pIter = GET_SHASH_NODE_DATA(pPrev); - } + *pIter = pPrev ? GET_SHASH_NODE_DATA(pPrev) : NULL; } FREE_HASH_NODE(pNode); diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index a2d65d6a542f72eae239f15c79be7c64c8df3bd1..06ebbf27fb1dc2e32d6b9ce26b735df827de8f96 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -97,7 +97,7 @@ int32_t tqDebugFlag = 135; int32_t fsDebugFlag = 135; int32_t metaDebugFlag = 135; int32_t udfDebugFlag = 135; -int32_t smaDebugFlag = 135; +int32_t smaDebugFlag = 131; int32_t idxDebugFlag = 135; int64_t dbgEmptyW = 0;