diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index cfd5bd1ed77bc1a0c4f158f00cb8c2462ac70443..63e1c556deeae912c010c849218dec514e09be44 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -154,6 +154,8 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo); */ int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode); +bool qTaskIsExecuting(qTaskInfo_t qinfo); + /** * destroy query info structure * @param qHandle diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index e5ff104d5c94d018c2f5cbf53fec1d686adbf4a9..814ead57f0b51428f223f65b6fe97bec51f5c8ad 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -704,6 +704,15 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) { return TSDB_CODE_SUCCESS; } +bool qTaskIsExecuting(qTaskInfo_t qinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; + if (NULL == pTaskInfo) { + return false; + } + + return 0 != atomic_load_64(&pTaskInfo->owner); +} + static void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) { STaskCostInfo* pSummary = &pTaskInfo->cost; int64_t idleTime = pSummary->start - pSummary->created; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index cf6e251d723a401a22860cbe877578d4b79a1afe..dcb7c02580654ae629722f556e252d199eeec4cc 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -202,6 +202,15 @@ _return: QW_RET(code); } +bool qwTaskNotInExec(SQWTaskCtx *ctx) { + qTaskInfo_t taskHandle = ctx->taskHandle; + if (NULL == taskHandle || !qTaskIsExecuting(taskHandle)) { + return true; + } + + return false; +} + int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) { int32_t taskNum = 0; @@ -508,8 +517,10 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp } if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { - QW_ERR_JRET(qwDropTask(QW_FPARAMS())); - QW_ERR_JRET(ctx->rspCode); + if (QW_PHASE_POST_FETCH != phase || qwTaskNotInExec(ctx)) { + QW_ERR_JRET(qwDropTask(QW_FPARAMS())); + QW_ERR_JRET(ctx->rspCode); + } } if (ctx->rspCode) {