diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 31296e0201fbdf9a0fecb0e4cf5668926b2fd316..7a57b0a2c0857cc71e3882733746ee8f8420d70c 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -82,16 +82,17 @@ void dnodeRead(SRpcMsg *pMsg) { dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle); - if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) { - queuedMsgNum = 0; - } - while (leftLen > 0) { SMsgHead *pHead = (SMsgHead *) pCont; pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); - pVnode = vnodeGetVnode(pHead->vgId); + if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) { + pVnode = vnodeGetVnode(pHead->vgId); + } else { + pVnode = vnodeAccquireVnode(pHead->vgId); + } + if (pVnode == NULL) { leftLen -= pHead->contLen; pCont -= pHead->contLen; @@ -261,7 +262,6 @@ static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) { rpcSendResponse(&rpcRsp); dTrace("dnode query msg disposed, thandle:%p", pMsg->rpcMsg.handle); - vnodeRelease(pVnode); } else { pQInfo = pMsg->pCont; } diff --git a/src/inc/vnode.h b/src/inc/vnode.h index b459c2c5625c0424f4df0094eb2a3bc8e97a6e7e..9861d1a2ff29a9167aed1a2a7480980a1d16b061 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -31,7 +31,8 @@ int32_t vnodeOpen(int32_t vgId, char *rootDir); int32_t vnodeClose(int32_t vgId); void vnodeRelease(void *pVnode); -void* vnodeGetVnode(int32_t vgId); +void* vnodeAccquireVnode(int32_t vgId); // add refcount +void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged void* vnodeGetRqueue(void *); void* vnodeGetWqueue(int32_t vgId); diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index 6dc7fd00a9a5736e3937cf05948dbaa729330353..d0352da3b1a349a118a4b8a4d97de4fddf65b61b 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -219,6 +219,13 @@ void *vnodeGetVnode(int32_t vgId) { return NULL; } + return pVnode; +} + +void *vnodeAccquireVnode(int32_t vgId) { + SVnodeObj *pVnode = vnodeGetVnode(vgId); + if (pVnode == NULL) return pVnode; + atomic_add_fetch_32(&pVnode->refCount, 1); dTrace("pVnode:%p vgId:%d, get vnode, refCount:%d", pVnode, pVnode->vgId, pVnode->refCount); @@ -230,7 +237,7 @@ void *vnodeGetRqueue(void *pVnode) { } void *vnodeGetWqueue(int32_t vgId) { - SVnodeObj *pVnode = vnodeGetVnode(vgId); + SVnodeObj *pVnode = vnodeAccquireVnode(vgId); if (pVnode == NULL) return NULL; return pVnode->wqueue; }