未验证 提交 f817ba16 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #20501 from taosdata/fix/TS-2823-2.6

TS-2823 delete msg wait thread fixed
......@@ -164,22 +164,12 @@ void dnodeFreeVWriteQueue(void *pWqueue) {
void* waitingResultThread(void* param) {
SVWriteMsg* pWrite = (SVWriteMsg* )param;
// wait AddWaitThread to list finished
dDebug(":SDEL pVnode:%p wait AddWaitThread finished... pWrite=%p", pWrite->pVnode, pWrite);
tsem_t* psem = vnodeSemWait(pWrite->pVnode);
tsem_wait(psem);
tsem_post(psem);
dDebug(":SDEL pVnode:%p wait AddWaitThread ok pWrite=%p", pWrite->pVnode, pWrite);
// wait request deal finished
int32_t ret = tsem_wait(pWrite->rspRet.psem);
dDebug(":SDEL pVnode:%p wait request ok pWrite=%p", pWrite->pVnode, pWrite);
if(ret == 0) {
// success
}
dInfo(":SDEL pVnode:%p start wait commit pWrite=%p", pWrite->pVnode, pWrite);
tsem_wait(pWrite->rspRet.psem);
tsem_destroy(pWrite->rspRet.psem);
tfree(pWrite->rspRet.psem);
dInfo(":SDEL pVnode:%p end wait commit pWrite=%p", pWrite->pVnode, pWrite);
// wait ok
SRpcMsg rpcRsp = {
......@@ -223,14 +213,11 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) {
rpcSendResponse(&rpcRsp);
vnodeFreeFromWQueue(pVnode, pWrite);
} else {
tsem_t* psem = vnodeSemWait(pVnode);
tsem_wait(psem);
// need async to wait result in another thread
// first add to list
vnodeAddWait(pVnode, NULL, pWrite->rspRet.psem, pWrite);
pthread_t* thread = taosCreateThread(waitingResultThread, pWrite);
// add to wait thread manager
vnodeAddWait(pVnode, thread, pWrite->rspRet.psem, pWrite);
dDebug(":SDEL pVnode=%p vnode add wait %p ok, tsem_post.", pVnode, pWrite);
tsem_post(psem);
// set thread
vnodeSetWait(pVnode, thread, pWrite);
}
}
}
......
......@@ -106,10 +106,11 @@ int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
// wait thread
void vnodeAddWait(void* pVnode, pthread_t* pthread, tsem_t* psem, void* param);
// vnode set wait pthread
void vnodeSetWait(void* vparam, pthread_t* pthread, void* param);
void vnodeRemoveWait(void* pVnode, void* param);
// get wait thread count
bool vnodeWaitTooMany(void* vparam);
tsem_t* vnodeSemWait(void* vparam);
#ifdef __cplusplus
}
......
......@@ -198,7 +198,7 @@ static void *tsdbLoopCommit(void *arg) {
} else if (req == COMMIT_BOTH_REQ) {
SControlDataInfo* pCtlDataInfo = (SControlDataInfo* )param;
if(!pCtlDataInfo->memNull) {
tsdbInfo(":SDEL vgId=%d commit mem before delete data. mem=%p imem=%p \n", REPO_ID(pRepo), pRepo->mem, pRepo->imem);
tsdbInfo("vgId:%d :SDEL commit mem before delete data. mem=%p imem=%p \n", REPO_ID(pRepo), pRepo->mem, pRepo->imem);
tsdbCommitData(pRepo, false);
}
tsdbCommitControl(pRepo, param);
......
......@@ -646,8 +646,39 @@ void vnodeAddWait(void* vparam, pthread_t* pthread, tsem_t* psem, void* param) {
waitThread.param = param;
// append
tsem_wait(&pVnode->semWait);
tdListAppend(pVnode->waitThreads, &waitThread);
vDebug("vgId:%d :SDEL add wait thread %p wait list count=%d ", pVnode->vgId, param, listNEles(pVnode->waitThreads));
tsem_post(&pVnode->semWait);
vInfo("vgId:%d :SDEL add ok. pWrite=%p list count=%d", pVnode->vgId, param, listNEles(pVnode->waitThreads));
}
// vnode set wait pthread
void vnodeSetWait(void* vparam, pthread_t* pthread, void* param) {
SVnodeObj* pVnode = (SVnodeObj* )vparam;
SListIter iter = {0};
bool found = false;
tsem_wait(&pVnode->semWait);
tdListInitIter(pVnode->waitThreads, &iter, TD_LIST_FORWARD);
while (1) {
SListNode* pNode = tdListNext(&iter);
if (pNode == NULL)
break;
SWaitThread * pWaitThread = (SWaitThread *)pNode->data;
if (pWaitThread->param == param) {
// found
pWaitThread->pthread = pthread;
found = true;
break;
}
}
tsem_post(&pVnode->semWait);
if(!found) {
vInfo("vgId:%d :SDEL vnodeSetWait no found. maybe thread finished. pWrite=%p", pVnode->vgId, param);
}
}
// called in wait thread
......@@ -687,9 +718,4 @@ bool vnodeWaitTooMany(void* vparam) {
}
return false;
}
tsem_t* vnodeSemWait(void* vparam) {
SVnodeObj* pVnode = (SVnodeObj* )vparam;
return &pVnode->semWait;
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册