diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index ebccb7950cf537d255881a81443e5723873a1ab3..d77e42388b454d6a6d6b3c39ee68d8b3963a7dda 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -66,7 +66,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { return TSDB_CODE_SUCCESS; } -int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { +int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { int32_t code = 0; bool qcontinue = true; SSDataBlock *pRes = NULL; @@ -104,8 +104,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx)); - if (queryEnd) { - *queryEnd = true; + if (queryStop) { + *queryStop = true; } break; @@ -125,6 +125,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", rows, qcontinue); if (!qcontinue) { + if (queryStop) { + *queryStop = true; + } + break; } @@ -566,7 +570,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { SQWPhaseInput input = {0}; void *rsp = NULL; int32_t dataLen = 0; - bool queryEnd = false; + bool queryStop = false; do { QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL)); @@ -576,7 +580,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { atomic_store_8((int8_t *)&ctx->queryInQueue, 0); atomic_store_8((int8_t *)&ctx->queryContinue, 0); - QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd)); + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop)); if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { SOutputData sOutput = {0}; @@ -627,7 +631,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } QW_LOCK(QW_WRITE, &ctx->lock); - if (queryEnd || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { + if (queryStop || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { // Note: query is not running anymore QW_SET_PHASE(ctx, 0); QW_UNLOCK(QW_WRITE, &ctx->lock); diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index a606311f3c3676141a401b891eb142335244a6ee..0c19e4a2fe06f477f3e52eb4fcecf0541ba181d5 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -224,7 +224,7 @@ # ---- stream ./test.sh -f tsim/stream/basic0.sim -#./test.sh -f tsim/stream/basic1.sim +./test.sh -f tsim/stream/basic1.sim ./test.sh -f tsim/stream/basic2.sim ./test.sh -f tsim/stream/drop_stream.sim ./test.sh -f tsim/stream/distributeInterval0.sim