From 8c6ea3513045ebec2ae5ab293708df7e012cb49a Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Thu, 16 Mar 2023 20:41:04 +0800 Subject: [PATCH] fix: adjust wait sem order changed for delete response. --- src/dnode/src/dnodeVWrite.c | 27 +++++++------------------ src/inc/vnode.h | 3 ++- src/vnode/src/vnodeMain.c | 40 ++++++++++++++++++++++++++++++------- 3 files changed, 42 insertions(+), 28 deletions(-) diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index a9d648793d..a2a4947a20 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -164,22 +164,12 @@ void dnodeFreeVWriteQueue(void *pWqueue) { void* waitingResultThread(void* param) { SVWriteMsg* pWrite = (SVWriteMsg* )param; - - // wait AddWaitThread to list finished - dDebug(":SDEL pVnode:%p wait AddWaitThread finished... pWrite=%p", pWrite->pVnode, pWrite); - tsem_t* psem = vnodeSemWait(pWrite->pVnode); - tsem_wait(psem); - tsem_post(psem); - dDebug(":SDEL pVnode:%p wait AddWaitThread ok pWrite=%p", pWrite->pVnode, pWrite); - // wait request deal finished - int32_t ret = tsem_wait(pWrite->rspRet.psem); - dDebug(":SDEL pVnode:%p wait request ok pWrite=%p", pWrite->pVnode, pWrite); - if(ret == 0) { - // success - } + dInfo(":SDEL pVnode:%p start wait commit pWrite=%p", pWrite->pVnode, pWrite); + tsem_wait(pWrite->rspRet.psem); tsem_destroy(pWrite->rspRet.psem); tfree(pWrite->rspRet.psem); + dInfo(":SDEL pVnode:%p end wait commit pWrite=%p", pWrite->pVnode, pWrite); // wait ok SRpcMsg rpcRsp = { @@ -223,14 +213,11 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) { rpcSendResponse(&rpcRsp); vnodeFreeFromWQueue(pVnode, pWrite); } else { - tsem_t* psem = vnodeSemWait(pVnode); - tsem_wait(psem); - // need async to wait result in another thread + // first add to list + vnodeAddWait(pVnode, NULL, pWrite->rspRet.psem, pWrite); pthread_t* thread = taosCreateThread(waitingResultThread, pWrite); - // add to wait thread manager - vnodeAddWait(pVnode, thread, pWrite->rspRet.psem, pWrite); - dDebug(":SDEL pVnode=%p vnode add wait %p ok, tsem_post.", pVnode, pWrite); - tsem_post(psem); + // set thread + vnodeSetWait(pVnode, thread, pWrite); } } } diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 41f3eca08b..7f15ee78e4 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -106,10 +106,11 @@ int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead); // wait thread void vnodeAddWait(void* pVnode, pthread_t* pthread, tsem_t* psem, void* param); +// vnode set wait pthread +void vnodeSetWait(void* vparam, pthread_t* pthread, void* param); void vnodeRemoveWait(void* pVnode, void* param); // get wait thread count bool vnodeWaitTooMany(void* vparam); -tsem_t* vnodeSemWait(void* vparam); #ifdef __cplusplus } diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index f215453f74..3d87e12b14 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -646,8 +646,39 @@ void vnodeAddWait(void* vparam, pthread_t* pthread, tsem_t* psem, void* param) { waitThread.param = param; // append + tsem_wait(&pVnode->semWait); tdListAppend(pVnode->waitThreads, &waitThread); - vDebug("vgId:%d :SDEL add wait thread %p wait list count=%d ", pVnode->vgId, param, listNEles(pVnode->waitThreads)); + tsem_post(&pVnode->semWait); + vInfo("vgId:%d :SDEL add ok. pWrite=%p list count=%d", pVnode->vgId, param, listNEles(pVnode->waitThreads)); +} + +// vnode set wait pthread +void vnodeSetWait(void* vparam, pthread_t* pthread, void* param) { + SVnodeObj* pVnode = (SVnodeObj* )vparam; + SListIter iter = {0}; + bool found = false; + + tsem_wait(&pVnode->semWait); + tdListInitIter(pVnode->waitThreads, &iter, TD_LIST_FORWARD); + + while (1) { + SListNode* pNode = tdListNext(&iter); + if (pNode == NULL) + break; + + SWaitThread * pWaitThread = (SWaitThread *)pNode->data; + if (pWaitThread->param == param) { + // found + pWaitThread->pthread = pthread; + found = true; + break; + } + } + + tsem_post(&pVnode->semWait); + if(!found) { + vInfo("vgId:%d :SDEL vnodeSetWait no found. maybe thread finished. pWrite=%p", pVnode->vgId, param); + } } // called in wait thread @@ -687,9 +718,4 @@ bool vnodeWaitTooMany(void* vparam) { } return false; -} - -tsem_t* vnodeSemWait(void* vparam) { - SVnodeObj* pVnode = (SVnodeObj* )vparam; - return &pVnode->semWait; -} +} \ No newline at end of file -- GitLab