提交 ff2b545b 编写于 作者: wmmhello's avatar wmmhello

fix:set task status killed when vnode receive subscribe msg from mnode

上级 f7ab8dab
......@@ -90,6 +90,8 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
*/
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
void qSetTaskCode(qTaskInfo_t tinfo, int32_t code);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
// todo refactor
......
......@@ -394,7 +394,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
tqSetHandleIdle(pHandle);
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
return code;
......@@ -574,7 +574,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
// kill executing task
qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
if (pTaskInfo != NULL) {
qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
qKillTask(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
}
taosWLockLatch(&pTq->lock);
......
......@@ -180,6 +180,11 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
doSetTaskId(pTaskInfo->pRoot);
}
void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) {
SExecTaskInfo* pTaskInfo = tinfo;
pTaskInfo->code = code;
}
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
if (tinfo == NULL) {
return TSDB_CODE_APP_ERROR;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册