提交 5531a5a1 编写于 作者: C Cary Xu

enh: rsam fetch all logic optimization

上级 2a458629
......@@ -95,6 +95,7 @@ struct SRSmaStat {
int64_t refId; // shared by fetch tasks
volatile int64_t nBufItems; // number of items in queue buffer
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
volatile int32_t nFetchAll; // active number of fetch all
int8_t triggerStat; // shared by fetch tasks
int8_t commitStat; // 0 not in committing, 1 in committing
SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w)
......
......@@ -1714,9 +1714,14 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), qallItemSize, type);
}
if (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2) == 0) {
int8_t oldStat = atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2);
if (oldStat == 0 ||
((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) {
atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1);
tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr);
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) {
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
}
}
if (qallItemSize > 0) {
......
......@@ -80,7 +80,7 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapRead
goto _err;
}
pReader->pQTaskFReader->pReadH = qTaskF;
#if 0
#if 1
SQTaskFile* pQTaskF = &pReader->pQTaskFReader->fTask;
pQTaskF->nRef = 1;
#endif
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册