提交 74ea5ff4 编写于 作者: S Shengliang Guan

add func dnodePutQhandleIntoReadQueue to simplify the refCount of vnode

上级 636aa8d2
......@@ -179,6 +179,16 @@ void dnodeFreeVnodeRqueue(void *rqueue) {
// dynamically adjust the number of threads
}
void dnodePutQhandleIntoReadQueue(void *pVnode, void *qhandle) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle;
pRead->contLen = 0;
taos_queue queue = vnodeAccquireRqueue(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead);
}
static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = pMsg->rpcMsg;
......@@ -219,9 +229,14 @@ static void *dnodeProcessReadQueue(void *param) {
break;
}
dDebug("%p, msg:%s will be processed in vread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]);
dDebug("%p, msg:%s will be processed in vread queue, qtype:%d", pReadMsg->rpcMsg.ahandle,
taosMsg[pReadMsg->rpcMsg.msgType], type);
int32_t code = vnodeProcessRead(pVnode, pReadMsg);
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
if (type == TAOS_QTYPE_RPC) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
}
taosFreeQitem(pReadMsg);
}
......
......@@ -53,6 +53,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode);
void dnodeFreeVnodeWqueue(void *queue);
void *dnodeAllocateVnodeRqueue(void *pVnode);
void dnodeFreeVnodeRqueue(void *rqueue);
void dnodePutQhandleIntoReadQueue(void *pVnode, void *qhandle);
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code);
int32_t dnodeAllocateMnodePqueue();
......
......@@ -365,6 +365,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TAOS_QTYPE_FWD 1
#define TAOS_QTYPE_WAL 2
#define TAOS_QTYPE_CQ 3
#define TAOS_QTYPE_QUERY 4
typedef enum {
TSDB_SUPER_TABLE = 0, // super table
......
......@@ -52,6 +52,7 @@ void vnodeRelease(void *pVnode);
void* vnodeAccquireVnode(int32_t vgId); // add refcount
void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged
void* vnodeAccquireRqueue(void *);
void* vnodeGetRqueue(void *);
void* vnodeGetWqueue(int32_t vgId);
void* vnodeGetWal(void *pVnode);
......
......@@ -393,6 +393,15 @@ void *vnodeAccquireVnode(int32_t vgId) {
return pVnode;
}
void *vnodeAccquireRqueue(void *param) {
SVnodeObj *pVnode = param;
if (pVnode == NULL) return NULL;
atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("vgId:%d, get vnode rqueue, refCount:%d", pVnode->vgId, pVnode->refCount);
return ((SVnodeObj *)pVnode)->rqueue;
}
void *vnodeGetRqueue(void *pVnode) {
return ((SVnodeObj *)pVnode)->rqueue;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册