提交 50e7e033 编写于 作者: C Cary Xu

enh: rsma batch process

上级 053d00e6
......@@ -351,7 +351,11 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
nLoops = 0;
}
}
smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void*)taosGetSelfPthreadId());
smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
return TSDB_CODE_FAILED;
}
smaInfo("vgId:%d, rsma commit, operator state commited, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
#if 0 // consuming task of qTaskInfo clone
// step 4: swap queue/qall and iQueue/iQall
......@@ -391,13 +395,14 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
if (!pSmaEnv) {
return TSDB_CODE_SUCCESS;
}
#if 0
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
// perform persist task for qTaskInfo operator
if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
return TSDB_CODE_FAILED;
}
#endif
return TSDB_CODE_SUCCESS;
}
......@@ -421,8 +426,8 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
// lock
// taosWLockLatch(SMA_ENV_LOCK(pEnv));
void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
while (pIter) {
void *pIter = NULL;
while ((pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter))) {
tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
......@@ -440,14 +445,13 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
SMA_VID(pSma), refVal, *pSuid);
}
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
continue;
}
#if 0
if (pRSmaInfo->taskInfo[0]) {
if (pRSmaInfo->iTaskInfo[0]) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pRSmaInfo->iTaskInfo[0];
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
tdFreeRSmaInfo(pSma, pRSmaInfo, false);
pRSmaInfo->iTaskInfo[0] = NULL;
}
} else {
......@@ -456,8 +460,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid);
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
#endif
}
for (int32_t i = 0; i < taosArrayGetSize(rsmaDeleted); ++i) {
......
......@@ -19,9 +19,9 @@
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
#define RSMA_FETCH_DELAY_MAX (1800000) // ms
#define RSMA_FETCH_SKIP_MAX (1000) // cnt
#define RSMA_FETCH_ACTIVE_MAX (1800) // ms
#define RSMA_FETCH_DELAY_MAX (180000) // ms
#define RSMA_FETCH_SKIP_MAX (10) // cnt
#define RSMA_FETCH_ACTIVE_MAX (180) // ms
SSmaMgmt smaMgmt = {
.inited = 0,
......@@ -163,8 +163,8 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
pInfo->iQall = NULL;
}
taosMemoryFree(pInfo);
}
taosMemoryFree(pInfo);
}
return NULL;
}
......@@ -968,13 +968,12 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
goto _err;
}
void *pIter = taosHashIterate(uidStore.uidHash, NULL);
while (pIter) {
void *pIter = NULL;
while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) {
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
if (tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid) < 0) {
goto _err;
}
pIter = taosHashIterate(uidStore.uidHash, pIter);
}
if (tdRSmaExecCheck(pSma) < 0) {
......@@ -1418,7 +1417,10 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
}
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
#if 0
qTaskInfo_t taskInfo = RSMA_INFO_IQTASK(pRSmaInfo, i);
#endif
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pRSmaInfo, i);
if (!taskInfo) {
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1);
continue;
......@@ -1644,24 +1646,27 @@ static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubm
}
int64_t curMs = taosGetTimestampMs();
if ((pItem->nSkipped > RSMA_FETCH_SKIP_MAX) || (pItem->nSkipped * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) {
pItem->nSkipped = 0;
smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch executed",
SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay);
} else {
if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) {
++pItem->nSkipped;
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ",
SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
continue;
} else {
smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ",
SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
}
}
// if ((pItem->nSkipped > RSMA_FETCH_SKIP_MAX) || (pItem->nSkipped * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) {
// pItem->nSkipped = 0;
// smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch executed",
// SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay);
// } else {
// if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) {
// ++pItem->nSkipped;
// smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ",
// SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
// continue;
// } else {
// smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ",
// SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
// }
// }
pItem->lastFetch = curMs;
// smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ",
// SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
goto _err;
}
......@@ -1817,25 +1822,16 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
ASSERT(0);
}
smaInfo("prop:vgId:%d loop end check", SMA_VID(pSma));
if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) {
if (pVnode->inClose) {
smaInfo("prop:vgId:%d loop end check - inClose and break", SMA_VID(pSma));
break;
}
smaInfo("prop:vgId:%d loop end check - wait for notEmpty", SMA_VID(pSma));
tsem_wait(&pRSmaStat->notEmpty);
smaInfo("prop:vgId:%d loop end check - received notEmpty", SMA_VID(pSma));
if (pVnode->inClose && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {
smaInfo("prop:vgId:%d loop end check - break - inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), pVnode->inClose,
atomic_load_64(&pRSmaStat->nBufItems));
break;
} else {
smaInfo("prop:vgId:%d loop end check - continue - inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma),
pVnode->inClose, atomic_load_64(&pRSmaStat->nBufItems));
}
} else {
smaInfo("prop:vgId:%d loop end check - continue to run", SMA_VID(pSma));
}
} // end of while(true)
......
......@@ -375,6 +375,9 @@ int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
if (TABLE_IS_ROLLUP(mr.me.flags)) {
param = &mr.me.stbEntry.rsmaParam;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (!pInfo->iTaskInfo[i]) {
continue;
}
if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) {
goto _err;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册