From 74ea5ff492af8a8a264ac599b609eb072556ab8a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 3 Jul 2020 19:44:15 +0800 Subject: [PATCH] add func dnodePutQhandleIntoReadQueue to simplify the refCount of vnode --- src/dnode/src/dnodeVRead.c | 19 +++++++++++++++++-- src/inc/dnode.h | 1 + src/inc/taosdef.h | 1 + src/inc/vnode.h | 1 + src/vnode/src/vnodeMain.c | 9 +++++++++ 5 files changed, 29 insertions(+), 2 deletions(-) diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 6bbb291b6a..947d0fa501 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -179,6 +179,16 @@ void dnodeFreeVnodeRqueue(void *rqueue) { // dynamically adjust the number of threads } +void dnodePutQhandleIntoReadQueue(void *pVnode, void *qhandle) { + SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); + pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; + pRead->pCont = qhandle; + pRead->contLen = 0; + + taos_queue queue = vnodeAccquireRqueue(pVnode); + taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead); +} + static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) { SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); pRead->rpcMsg = pMsg->rpcMsg; @@ -219,9 +229,14 @@ static void *dnodeProcessReadQueue(void *param) { break; } - dDebug("%p, msg:%s will be processed in vread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]); + dDebug("%p, msg:%s will be processed in vread queue, qtype:%d", pReadMsg->rpcMsg.ahandle, + taosMsg[pReadMsg->rpcMsg.msgType], type); int32_t code = vnodeProcessRead(pVnode, pReadMsg); - dnodeSendRpcReadRsp(pVnode, pReadMsg, code); + + if (type == TAOS_QTYPE_RPC) { + dnodeSendRpcReadRsp(pVnode, pReadMsg, code); + } + taosFreeQitem(pReadMsg); } diff --git a/src/inc/dnode.h b/src/inc/dnode.h index b561c407a3..1d33dafbaa 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -53,6 +53,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode); void dnodeFreeVnodeWqueue(void *queue); void *dnodeAllocateVnodeRqueue(void *pVnode); void dnodeFreeVnodeRqueue(void *rqueue); +void dnodePutQhandleIntoReadQueue(void *pVnode, void *qhandle); void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code); int32_t dnodeAllocateMnodePqueue(); diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 76ca99c9ad..e4ee058cef 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -365,6 +365,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TAOS_QTYPE_FWD 1 #define TAOS_QTYPE_WAL 2 #define TAOS_QTYPE_CQ 3 +#define TAOS_QTYPE_QUERY 4 typedef enum { TSDB_SUPER_TABLE = 0, // super table diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 9f0c8cc241..49bd67a04f 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -52,6 +52,7 @@ void vnodeRelease(void *pVnode); void* vnodeAccquireVnode(int32_t vgId); // add refcount void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged +void* vnodeAccquireRqueue(void *); void* vnodeGetRqueue(void *); void* vnodeGetWqueue(int32_t vgId); void* vnodeGetWal(void *pVnode); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 5eb78fda52..4c446a78ec 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -393,6 +393,15 @@ void *vnodeAccquireVnode(int32_t vgId) { return pVnode; } +void *vnodeAccquireRqueue(void *param) { + SVnodeObj *pVnode = param; + if (pVnode == NULL) return NULL; + + atomic_add_fetch_32(&pVnode->refCount, 1); + vDebug("vgId:%d, get vnode rqueue, refCount:%d", pVnode->vgId, pVnode->refCount); + return ((SVnodeObj *)pVnode)->rqueue; +} + void *vnodeGetRqueue(void *pVnode) { return ((SVnodeObj *)pVnode)->rqueue; } -- GitLab