提交 f9e0aad8 编写于 作者: C Cary Xu

enh: rsma batch process

上级 50e7e033
...@@ -91,10 +91,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { ...@@ -91,10 +91,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pQueryQ)) { while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
taosMsleep(10);
dInfo("prop:vgId:%d, query queue size is %d", pVnode->vgId, taosQueueItemSize(pVnode->pQueryQ));
}
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
dTrace("vgId:%d, vnode queue is empty", pVnode->vgId); dTrace("vgId:%d, vnode queue is empty", pVnode->vgId);
......
...@@ -316,6 +316,8 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { ...@@ -316,6 +316,8 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
// step 1: set rsma stat // step 1: set rsma stat
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 1); atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 1);
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
ASSERT(pRSmaStat->commitAppliedVer > 0);
// step 2: wait all triggered fetch tasks finished // step 2: wait all triggered fetch tasks finished
int32_t nLoops = 0; int32_t nLoops = 0;
...@@ -378,8 +380,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { ...@@ -378,8 +380,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
// unlock // unlock
// taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); // taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
#endif #endif
// step 5: others
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -152,7 +152,6 @@ int32_t smaPreClose(SSma *pSma) { ...@@ -152,7 +152,6 @@ int32_t smaPreClose(SSma *pSma) {
for (int32_t i = 0; i < RSMA_EXECUTOR_MAX; ++i) { for (int32_t i = 0; i < RSMA_EXECUTOR_MAX; ++i) {
tsem_post(&(pRSmaStat->notEmpty)); tsem_post(&(pRSmaStat->notEmpty));
} }
smaInfo("prop:vgId:%d post notEmtpy", SMA_VID(pSma));
} }
return 0; return 0;
} }
......
...@@ -1769,7 +1769,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { ...@@ -1769,7 +1769,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
int64_t itemSize = 0; int64_t itemSize = 0;
if ((itemSize = taosQueueItemSize(pInfo->queue)) || RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || if ((itemSize = taosQueueItemSize(pInfo->queue)) || RSMA_INFO_ITEM(pInfo, 0)->fetchLevel ||
RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) {
smaDebug("vgId:%d queueItemSize is %" PRIi64 " execType:%" PRIi8, SMA_VID(pSma), itemSize, type); smaDebug("vgId:%d, queueItemSize is %" PRIi64 " execType:%" PRIi8, SMA_VID(pSma), itemSize, type);
if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) { if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) {
taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
int32_t qallItemSize = taosQallItemSize(pInfo->qall); int32_t qallItemSize = taosQallItemSize(pInfo->qall);
...@@ -1828,7 +1828,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { ...@@ -1828,7 +1828,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
} }
tsem_wait(&pRSmaStat->notEmpty); tsem_wait(&pRSmaStat->notEmpty);
if (pVnode->inClose && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { if (pVnode->inClose && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {
smaInfo("prop:vgId:%d loop end check - break - inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), pVnode->inClose, smaInfo("vgId:%d, exec task end, inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), pVnode->inClose,
atomic_load_64(&pRSmaStat->nBufItems)); atomic_load_64(&pRSmaStat->nBufItems));
break; break;
} }
......
...@@ -220,13 +220,6 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -220,13 +220,6 @@ int vnodeCommit(SVnode *pVnode) {
vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID,
pVnode->state.applied); pVnode->state.applied);
// preCommit
// smaSyncPreCommit(pVnode->pSma);
smaAsyncPreCommit(pVnode->pSma);
vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL;
pVnode->state.commitTerm = pVnode->state.applyTerm; pVnode->state.commitTerm = pVnode->state.applyTerm;
// save info // save info
...@@ -241,6 +234,16 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -241,6 +234,16 @@ int vnodeCommit(SVnode *pVnode) {
} }
walBeginSnapshot(pVnode->pWal, pVnode->state.applied); walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
// preCommit
// smaSyncPreCommit(pVnode->pSma);
if(smaAsyncPreCommit(pVnode->pSma) < 0){
ASSERT(0);
return -1;
}
vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL;
// commit each sub-system // commit each sub-system
if (metaCommit(pVnode->pMeta) < 0) { if (metaCommit(pVnode->pMeta) < 0) {
ASSERT(0); ASSERT(0);
...@@ -248,7 +251,10 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -248,7 +251,10 @@ int vnodeCommit(SVnode *pVnode) {
} }
if (VND_IS_RSMA(pVnode)) { if (VND_IS_RSMA(pVnode)) {
smaAsyncCommit(pVnode->pSma); if (smaAsyncCommit(pVnode->pSma) < 0) {
ASSERT(0);
return -1;
}
if (tsdbCommit(VND_RSMA0(pVnode)) < 0) { if (tsdbCommit(VND_RSMA0(pVnode)) < 0) {
ASSERT(0); ASSERT(0);
...@@ -285,7 +291,10 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -285,7 +291,10 @@ int vnodeCommit(SVnode *pVnode) {
// postCommit // postCommit
// smaSyncPostCommit(pVnode->pSma); // smaSyncPostCommit(pVnode->pSma);
smaAsyncPostCommit(pVnode->pSma); if (smaAsyncPostCommit(pVnode->pSma) < 0) {
ASSERT(0);
return -1;
}
// apply the commit (TODO) // apply the commit (TODO)
walEndSnapshot(pVnode->pWal); walEndSnapshot(pVnode->pWal);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册