未验证 提交 c6fe02cf 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16505 from taosdata/feature/3.0_interval_hash_optimize

enh: rsma fetch all logic optimization
...@@ -95,6 +95,7 @@ struct SRSmaStat { ...@@ -95,6 +95,7 @@ struct SRSmaStat {
int64_t refId; // shared by fetch tasks int64_t refId; // shared by fetch tasks
volatile int64_t nBufItems; // number of items in queue buffer volatile int64_t nBufItems; // number of items in queue buffer
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo) SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
volatile int32_t nFetchAll; // active number of fetch all
int8_t triggerStat; // shared by fetch tasks int8_t triggerStat; // shared by fetch tasks
int8_t commitStat; // 0 not in committing, 1 in committing int8_t commitStat; // 0 not in committing, 1 in committing
SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w) SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w)
......
...@@ -312,7 +312,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { ...@@ -312,7 +312,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
SSmaStat *pStat = SMA_ENV_STAT(pEnv); SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat); SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
int32_t nLoops = 0; int32_t nLoops = 0;
// step 1: set rsma stat // step 1: set rsma stat
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
...@@ -327,7 +327,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { ...@@ -327,7 +327,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
ASSERT(pRSmaStat->commitAppliedVer > 0); ASSERT(pRSmaStat->commitAppliedVer > 0);
// step 2: wait for all triggered fetch tasks to finish // step 2: wait for all triggered fetch tasks to finish
nLoops = 0;
while (1) { while (1) {
if (T_REF_VAL_GET(pStat) == 0) { if (T_REF_VAL_GET(pStat) == 0) {
smaDebug("vgId:%d, rsma commit, fetch tasks are all finished", SMA_VID(pSma)); smaDebug("vgId:%d, rsma commit, fetch tasks are all finished", SMA_VID(pSma));
...@@ -351,7 +351,8 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { ...@@ -351,7 +351,8 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
smaInfo("vgId:%d, rsma commit, wait for all items to be consumed, TID:%p", SMA_VID(pSma), (void*)taosGetSelfPthreadId()); smaInfo("vgId:%d, rsma commit, wait for all items to be consumed, TID:%p", SMA_VID(pSma),
(void *)taosGetSelfPthreadId());
nLoops = 0; nLoops = 0;
while (atomic_load_64(&pRSmaStat->nBufItems) > 0) { while (atomic_load_64(&pRSmaStat->nBufItems) > 0) {
++nLoops; ++nLoops;
...@@ -366,7 +367,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { ...@@ -366,7 +367,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
} }
smaInfo("vgId:%d, rsma commit, operator state commited, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); smaInfo("vgId:%d, rsma commit, operator state commited, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
#if 0 // consuming task of qTaskInfo clone #if 0 // consuming task of qTaskInfo clone
// step 4: swap queue/qall and iQueue/iQall // step 4: swap queue/qall and iQueue/iQall
// lock // lock
// taosWLockLatch(SMA_ENV_LOCK(pEnv)); // taosWLockLatch(SMA_ENV_LOCK(pEnv));
......
...@@ -1714,9 +1714,14 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { ...@@ -1714,9 +1714,14 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), qallItemSize, type); smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), qallItemSize, type);
} }
if (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2) == 0) { int8_t oldStat = atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2);
if (oldStat == 0 ||
((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) {
atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1);
tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr);
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) {
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
}
} }
if (qallItemSize > 0) { if (qallItemSize > 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册