提交 ff647b96 编写于 作者: C Cary Xu

enh: rsma fetch logic optimization

上级 834c9eda
...@@ -143,7 +143,7 @@ struct SRSmaInfoItem { ...@@ -143,7 +143,7 @@ struct SRSmaInfoItem {
int8_t level : 4; int8_t level : 4;
int8_t fetchLevel : 4; int8_t fetchLevel : 4;
int8_t triggerStat; int8_t triggerStat;
uint16_t nSkipped; uint16_t nScanned;
int32_t maxDelay; // ms int32_t maxDelay; // ms
tmr_h tmrId; tmr_h tmrId;
}; };
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt #define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt #define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
#define RSMA_FETCH_DELAY_MAX (900000) // ms #define RSMA_FETCH_DELAY_MAX (120000) // ms
#define RSMA_FETCH_ACTIVE_MAX (1000) // ms #define RSMA_FETCH_ACTIVE_MAX (1000) // ms
#define RSMA_FETCH_INTERVAL (5000) // ms #define RSMA_FETCH_INTERVAL (5000) // ms
...@@ -1712,21 +1712,23 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { ...@@ -1712,21 +1712,23 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
continue; continue;
} }
int64_t curMs = taosGetTimestampMs(); if ((++pItem->nScanned * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) {
if ((pItem->nSkipped * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi8 " maxDelay:%d, fetch executed",
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch executed", SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay);
SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay);
} else if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) {
++pItem->nSkipped;
smaTrace("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ",
SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
continue;
} else { } else {
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", int64_t curMs = taosGetTimestampMs();
SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); if ((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX) {
smaTrace("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ",
SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); // restore the active stat
continue;
} else {
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ",
SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
}
} }
pItem->nSkipped = 0; pItem->nScanned = 0;
if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
goto _err; goto _err;
...@@ -1737,12 +1739,12 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { ...@@ -1737,12 +1739,12 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
} }
tdCleanupStreamInputDataBlock(taskInfo); tdCleanupStreamInputDataBlock(taskInfo);
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch finished", smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi8 " maxDelay:%d, fetch finished",
SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay);
} else { } else {
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi8
" maxDelay:%d, fetch not executed as fetch level is %" PRIi8, " maxDelay:%d, fetch not executed as fetch level is %" PRIi8,
SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay, pItem->fetchLevel); SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay, pItem->fetchLevel);
} }
} }
...@@ -1832,7 +1834,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { ...@@ -1832,7 +1834,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
bool occupied = (batchMax <= 1); bool occupied = (batchMax <= 1);
if (batchMax > 1) { if (batchMax > 1) {
batchMax = 100 / batchMax; batchMax = 100 / batchMax;
batchMax = MAX(batchMax, 4); batchMax = TMAX(batchMax, 4);
} }
while (occupied || (++batchCnt < batchMax)) { // greedy mode while (occupied || (++batchCnt < batchMax)) { // greedy mode
taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册