diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index fe949d92cec218ad90b59407e572c158b2d55efb..f5f7218ab9f663b6142f7ce3aefb8f6b80d20509 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -90,7 +90,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 */ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); -void qSetTaskCode(qTaskInfo_t tinfo, int32_t code); +//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 85d8d073f7f4073135b8cb556c9bb9b983f93701..17eac7d096c3b2affd71ac49106770555252c9b2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -370,7 +370,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { bool exec = tqIsHandleExec(pHandle); if(!exec) { tqSetHandleExec(pHandle); - qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS); +// qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS); tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle); taosWUnLockLatch(&pTq->lock); break; @@ -572,16 +572,16 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg // atomic_add_fetch_32(&pHandle->epoch, 1); // kill executing task - if(tqIsHandleExec(pHandle)) { - qTaskInfo_t pTaskInfo = pHandle->execHandle.task; - if (pTaskInfo != NULL) { - qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); - } +// if(tqIsHandleExec(pHandle)) { +// qTaskInfo_t pTaskInfo = pHandle->execHandle.task; +// if (pTaskInfo != NULL) { +// qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); +// } // if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { // qStreamCloseTsdbReader(pTaskInfo); // } - } +// } // remove if it has been register in the push manager, and return one empty block to consumer tqUnregisterPushHandle(pTq, pHandle); taosWUnLockLatch(&pTq->lock); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index eea542e042794fccfb1c372fe5a42db621556150..a73deffa528c71999e036370b3a3b021b861f024 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -180,10 +180,10 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) { doSetTaskId(pTaskInfo->pRoot); } -void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) { - SExecTaskInfo* pTaskInfo = tinfo; - pTaskInfo->code = code; -} +//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) { +// SExecTaskInfo* pTaskInfo = tinfo; +// pTaskInfo->code = code; +//} int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { if (tinfo == NULL) {