提交 c7c255e9 编写于 作者: wmmhello's avatar wmmhello

fix:set task status killed when vnode receive subscribe msg from mnode

上级 ff2b545b
...@@ -370,6 +370,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -370,6 +370,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
bool exec = tqIsHandleExec(pHandle); bool exec = tqIsHandleExec(pHandle);
if(!exec) { if(!exec) {
tqSetHandleExec(pHandle); tqSetHandleExec(pHandle);
qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle); tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
break; break;
...@@ -394,7 +395,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -394,7 +395,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
code = tqExtractDataForMq(pTq, pHandle, &req, pMsg); code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
tqSetHandleIdle(pHandle); tqSetHandleIdle(pHandle);
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle); tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
return code; return code;
...@@ -571,15 +571,17 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -571,15 +571,17 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
// atomic_store_32(&pHandle->epoch, 0); // atomic_store_32(&pHandle->epoch, 0);
} }
taosWLockLatch(&pTq->lock);
// kill executing task // kill executing task
qTaskInfo_t pTaskInfo = pHandle->execHandle.task; if(tqIsHandleExec(pHandle)) {
if (pTaskInfo != NULL) { qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
qKillTask(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED); if (pTaskInfo != NULL) {
} qKillTask(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
}
taosWLockLatch(&pTq->lock); if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { qStreamCloseTsdbReader(pTaskInfo);
qStreamCloseTsdbReader(pTaskInfo); }
} }
// remove if it has been register in the push manager, and return one empty block to consumer // remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle(pTq, pHandle); tqUnregisterPushHandle(pTq, pHandle);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册