From ff2b545b27afa6f28f704d896910c04e727d19c8 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 19 May 2023 17:16:47 +0800 Subject: [PATCH] fix:set task status killed when vnode receive subscribe msg from mnode --- include/libs/executor/executor.h | 2 ++ source/dnode/vnode/src/tq/tq.c | 4 ++-- source/libs/executor/src/executor.c | 5 +++++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index b7e6c42e3b..2598b5c28c 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -90,6 +90,8 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 */ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); +void qSetTaskCode(qTaskInfo_t tinfo, int32_t code); + int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); // todo refactor diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6f619e9bbe..9e43c4a944 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -394,7 +394,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); code = tqExtractDataForMq(pTq, pHandle, &req, pMsg); - + qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS); tqSetHandleIdle(pHandle); tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle); return code; @@ -574,7 +574,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg // kill executing task qTaskInfo_t pTaskInfo = pHandle->execHandle.task; if (pTaskInfo != NULL) { - qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); + qKillTask(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED); } taosWLockLatch(&pTq->lock); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c4a56d78ae..eea542e042 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -180,6 +180,11 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) { doSetTaskId(pTaskInfo->pRoot); } +void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) { + SExecTaskInfo* pTaskInfo = tinfo; + pTaskInfo->code = code; +} + int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { if (tinfo == NULL) { return TSDB_CODE_APP_ERROR; -- GitLab