From 1477ce97a70f989f7accc201c89e373b79baff99 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 3 Jan 2023 18:07:18 +0800 Subject: [PATCH] fix: drop task while task executing issue --- include/libs/executor/executor.h | 2 ++ source/libs/executor/src/executor.c | 9 +++++++++ source/libs/qworker/src/qworker.c | 15 +++++++++++++-- 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index cfd5bd1ed7..63e1c556de 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -154,6 +154,8 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo); */ int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode); +bool qTaskIsExecuting(qTaskInfo_t qinfo); + /** * destroy query info structure * @param qHandle diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index e5ff104d5c..814ead57f0 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -704,6 +704,15 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) { return TSDB_CODE_SUCCESS; } +bool qTaskIsExecuting(qTaskInfo_t qinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; + if (NULL == pTaskInfo) { + return false; + } + + return 0 != atomic_load_64(&pTaskInfo->owner); +} + static void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) { STaskCostInfo* pSummary = &pTaskInfo->cost; int64_t idleTime = pSummary->start - pSummary->created; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index cf6e251d72..dcb7c02580 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -202,6 +202,15 @@ _return: QW_RET(code); } +bool qwTaskNotInExec(SQWTaskCtx *ctx) { + qTaskInfo_t taskHandle = ctx->taskHandle; + if (NULL == taskHandle || !qTaskIsExecuting(taskHandle)) { + return true; + } + + return false; +} + int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) { int32_t taskNum = 0; @@ -508,8 +517,10 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp } if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { - QW_ERR_JRET(qwDropTask(QW_FPARAMS())); - QW_ERR_JRET(ctx->rspCode); + if (QW_PHASE_POST_FETCH != phase || qwTaskNotInExec(ctx)) { + QW_ERR_JRET(qwDropTask(QW_FPARAMS())); + QW_ERR_JRET(ctx->rspCode); + } } if (ctx->rspCode) { -- GitLab