diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index b7e6c42e3b7ede77237aa7e8e910b45f236b5cc9..2598b5c28cd24c5eaf04a45355a714c7540872ed 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -90,6 +90,8 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 */ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); +void qSetTaskCode(qTaskInfo_t tinfo, int32_t code); + int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); // todo refactor diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6f619e9bbe6d22057cb371ad61acb8ffcddb01a4..9e43c4a9446004de62c94603abf1885368f56c65 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -394,7 +394,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); code = tqExtractDataForMq(pTq, pHandle, &req, pMsg); - + qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS); tqSetHandleIdle(pHandle); tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle); return code; @@ -574,7 +574,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg // kill executing task qTaskInfo_t pTaskInfo = pHandle->execHandle.task; if (pTaskInfo != NULL) { - qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); + qKillTask(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED); } taosWLockLatch(&pTq->lock); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c4a56d78aebbe0f6a89e77473ba6791ed61c0212..eea542e042794fccfb1c372fe5a42db621556150 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -180,6 +180,11 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) { doSetTaskId(pTaskInfo->pRoot); } +void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) { + SExecTaskInfo* pTaskInfo = tinfo; + pTaskInfo->code = code; +} + int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { if (tinfo == NULL) { return TSDB_CODE_APP_ERROR;