提交 f14b1381 编写于 作者: S slguan

[TD-116] add vnodeAccquire vnode

上级 64edcaf1
......@@ -82,16 +82,17 @@ void dnodeRead(SRpcMsg *pMsg) {
dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle);
if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) {
queuedMsgNum = 0;
}
while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen);
pVnode = vnodeGetVnode(pHead->vgId);
if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) {
pVnode = vnodeGetVnode(pHead->vgId);
} else {
pVnode = vnodeAccquireVnode(pHead->vgId);
}
if (pVnode == NULL) {
leftLen -= pHead->contLen;
pCont -= pHead->contLen;
......@@ -261,7 +262,6 @@ static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) {
rpcSendResponse(&rpcRsp);
dTrace("dnode query msg disposed, thandle:%p", pMsg->rpcMsg.handle);
vnodeRelease(pVnode);
} else {
pQInfo = pMsg->pCont;
}
......
......@@ -31,7 +31,8 @@ int32_t vnodeOpen(int32_t vgId, char *rootDir);
int32_t vnodeClose(int32_t vgId);
void vnodeRelease(void *pVnode);
void* vnodeGetVnode(int32_t vgId);
void* vnodeAccquireVnode(int32_t vgId); // add refcount
void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged
void* vnodeGetRqueue(void *);
void* vnodeGetWqueue(int32_t vgId);
......
......@@ -219,6 +219,13 @@ void *vnodeGetVnode(int32_t vgId) {
return NULL;
}
return pVnode;
}
void *vnodeAccquireVnode(int32_t vgId) {
SVnodeObj *pVnode = vnodeGetVnode(vgId);
if (pVnode == NULL) return pVnode;
atomic_add_fetch_32(&pVnode->refCount, 1);
dTrace("pVnode:%p vgId:%d, get vnode, refCount:%d", pVnode, pVnode->vgId, pVnode->refCount);
......@@ -230,7 +237,7 @@ void *vnodeGetRqueue(void *pVnode) {
}
void *vnodeGetWqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeGetVnode(vgId);
SVnodeObj *pVnode = vnodeAccquireVnode(vgId);
if (pVnode == NULL) return NULL;
return pVnode->wqueue;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册