diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index a9eca6467557d161e22ed8bf42ff2c97feaf889c..a0e04b6a19bd16cb98415278ea8748406e1cfa89 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -127,6 +127,7 @@ typedef struct SQWTaskCtx { bool queryRsped; bool queryEnd; bool queryContinue; + bool queryExecDone; bool queryInQueue; int32_t rspCode; int64_t affectedRows; // for insert ...select stmt diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index aad9a52126541dc4132eb18d5740366433fe29cf..5bfc9b7444997b551f521a4d1a9df2b8546e6981 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -59,6 +59,8 @@ static void freeItem(void *param) { int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { qTaskInfo_t taskHandle = ctx->taskHandle; + ctx->queryExecDone = true; + if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) { if (ctx->explain) { SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo)); @@ -116,6 +118,14 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { DataSinkHandle sinkHandle = ctx->sinkHandle; SLocalFetch localFetch = {(void *)mgmt, ctx->localExec, qWorkerProcessLocalFetch, ctx->explainRes}; + if (ctx->queryExecDone) { + if (queryStop) { + *queryStop = true; + } + + return TSDB_CODE_SUCCESS; + } + SArray *pResList = taosArrayInit(4, POINTER_BYTES); while (true) { QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);