diff --git a/src/vnode/inc/vnodeRead.h b/src/vnode/inc/vnodeRead.h index f5375d6ab0c19277e2c39a987fd4428fae1885e1..0e9655f837e311fd8d7d8779ce915e5b152be288 100644 --- a/src/vnode/inc/vnodeRead.h +++ b/src/vnode/inc/vnodeRead.h @@ -27,7 +27,7 @@ void vnodeCleanupRead(void); int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam); void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead); int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead); -void vnodeWaitReadCompleted(void *pVnode); +void vnodeWaitReadCompleted(SVnodeObj *pVnode); #ifdef __cplusplus } diff --git a/src/vnode/inc/vnodeWrite.h b/src/vnode/inc/vnodeWrite.h index 5238e45b81fc7955e592970fd6634199940470e0..e996bc0b06f4563b24e5ebdfc90420b9e67df003 100644 --- a/src/vnode/inc/vnodeWrite.h +++ b/src/vnode/inc/vnodeWrite.h @@ -27,7 +27,7 @@ void vnodeCleanupWrite(void); int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcMsg); void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite); int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet); -void vnodeWaitWriteCompleted(void *pVnode); +void vnodeWaitWriteCompleted(SVnodeObj *pVnode); #ifdef __cplusplus } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index c864bc995bb58e81535111b1f2c7654699578930..41e631a24ffced5fdf197269a00f131630ca04b6 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -436,4 +436,9 @@ int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) { return rpcReportProgress(handle, (char *)pMsg, sizeof(SRetrieveTableMsg)); } -void vnodeWaitReadCompleted(void *pVnode) {} \ No newline at end of file +void vnodeWaitReadCompleted(SVnodeObj *pVnode) { + while (pVnode->queuedRMsg > 0) { + vTrace("vgId:%d, queued rmsg num:%d", pVnode->vgId, pVnode->queuedWMsg); + taosMsleep(10); + } +} \ No newline at end of file diff --git a/src/vnode/src/vnodeStatus.c b/src/vnode/src/vnodeStatus.c index c016b78396772f917e5eded4e73d375c345a1d55..11f79fb372e45199626ccdb9d97c6a4065de7296 100644 --- a/src/vnode/src/vnodeStatus.c +++ b/src/vnode/src/vnodeStatus.c @@ -18,6 +18,8 @@ #include "taosmsg.h" #include "query.h" #include "vnodeStatus.h" +#include "vnodeRead.h" +#include "vnodeWrite.h" char* vnodeStatus[] = { "init", @@ -77,6 +79,8 @@ bool vnodeSetClosingStatus(SVnodeObj* pVnode) { // release local resources only after cutting off outside connections qQueryMgmtNotifyClosed(pVnode->qMgmt); + vnodeWaitReadCompleted(pVnode); + vnodeWaitWriteCompleted(pVnode); return true; } @@ -121,6 +125,8 @@ bool vnodeSetResetStatus(SVnodeObj* pVnode) { // release local resources only after cutting off outside connections qQueryMgmtNotifyClosed(pVnode->qMgmt); + vnodeWaitReadCompleted(pVnode); + vnodeWaitWriteCompleted(pVnode); return true; } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 5c2e871eb646a006484bcdc34995bda70a3ede8d..4f6ce9d2e431d4998252741bf475d1549af0eb7a 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -345,4 +345,9 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { } } -void vnodeWaitWriteCompleted(void *pVnode) {} \ No newline at end of file +void vnodeWaitWriteCompleted(SVnodeObj *pVnode) { + while (pVnode->queuedWMsg > 0) { + vTrace("vgId:%d, queued wmsg num:%d", pVnode->vgId, pVnode->queuedWMsg); + taosMsleep(10); + } +}