提交 6d28d01b 编写于 作者: S Shengliang Guan

TD-2165

上级 8b8c7948
...@@ -24,7 +24,7 @@ extern "C" { ...@@ -24,7 +24,7 @@ extern "C" {
int32_t vnodeInitCWorker(); int32_t vnodeInitCWorker();
void vnodeCleanupCWorker(); void vnodeCleanupCWorker();
int32_t vnodeWriteIntoCQueue(SVReadMsg *pRead); int32_t vnodeWriteIntoCQueue(SVnodeObj *pVnode, SVReadMsg *pRead);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -120,17 +120,21 @@ void vnodeCleanupCWorker() { ...@@ -120,17 +120,21 @@ void vnodeCleanupCWorker() {
vnodeStopCWorker(); vnodeStopCWorker();
} }
int32_t vnodeWriteIntoCQueue(SVReadMsg *pRead) { int32_t vnodeWriteIntoCQueue(SVnodeObj *pVnode, SVReadMsg *pRead) {
vTrace("msg:%p, write into vcqueue", pRead); atomic_add_fetch_32(&pVnode->refCount, 1);
pRead->pVnode = pVnode;
vTrace("vgId:%d, write into vcqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);
return taosWriteQitem(tsVCWorkerQueue, pRead->qtype, pRead); return taosWriteQitem(tsVCWorkerQueue, pRead->qtype, pRead);
} }
static void vnodeFreeFromCQueue(SVReadMsg *pRead) { static void vnodeFreeFromCQueue(SVnodeObj *pVnode, SVReadMsg *pRead) {
vTrace("msg:%p, free from vcqueue", pRead); vTrace("vgId:%d, free from vcqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);
taosFreeQitem(pRead); taosFreeQitem(pRead);
vnodeRelease(pVnode);
} }
static void vnodeSendVCancelRpcRsp(SVReadMsg *pRead, int32_t code) { static void vnodeSendVCancelRpcRsp(SVnodeObj *pVnode, SVReadMsg *pRead, int32_t code) {
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pRead->rpcHandle, .handle = pRead->rpcHandle,
.pCont = pRead->rspRet.rsp, .pCont = pRead->rspRet.rsp,
...@@ -139,7 +143,7 @@ static void vnodeSendVCancelRpcRsp(SVReadMsg *pRead, int32_t code) { ...@@ -139,7 +143,7 @@ static void vnodeSendVCancelRpcRsp(SVReadMsg *pRead, int32_t code) {
}; };
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
vnodeFreeFromCQueue(pRead); vnodeFreeFromCQueue(pVnode, pRead);
} }
static void *vnodeCWorkerFunc(void *param) { static void *vnodeCWorkerFunc(void *param) {
...@@ -153,13 +157,12 @@ static void *vnodeCWorkerFunc(void *param) { ...@@ -153,13 +157,12 @@ static void *vnodeCWorkerFunc(void *param) {
break; break;
} }
vTrace("msg:%p will be processed in vcworker queue", pRead);
assert(qtype == TAOS_QTYPE_RPC); assert(qtype == TAOS_QTYPE_RPC);
assert(pVnode == NULL); assert(pVnode == NULL);
assert(pRead->pVnode != NULL);
int32_t code = vnodeProcessRead(NULL, pRead);
vnodeSendVCancelRpcRsp(pRead, code); int32_t code = vnodeProcessRead(pRead->pVnode, pRead);
vnodeSendVCancelRpcRsp(pRead->pVnode, pRead, code);
} }
return NULL; return NULL;
......
...@@ -116,9 +116,9 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt ...@@ -116,9 +116,9 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt
} }
pRead->qtype = qtype; pRead->qtype = qtype;
if (pRead->msgType == TSDB_MSG_TYPE_CM_KILL_QUERY) { if (pRead->msgType == TSDB_MSG_TYPE_CM_KILL_QUERY) {
return vnodeWriteIntoCQueue(pRead); return vnodeWriteIntoCQueue(pVnode, pRead);
} else { } else {
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
atomic_add_fetch_32(&pVnode->queuedRMsg, 1); atomic_add_fetch_32(&pVnode->queuedRMsg, 1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册