diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 729ac474e4720a13a2da8d463820d05db72970f0..08e3bb5aefdeefa8a95460395bda689958a6fc35 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -138,6 +138,7 @@ typedef struct SQWTaskCtx { int32_t execId; int32_t level; + bool queryGotData; bool queryRsped; bool queryEnd; bool queryContinue; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index f006096ce20a45e18a5b9d990c9c63b621638ac5..d57b8e2e3368e9053759d15fac209e36334b7b1b 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -471,8 +471,10 @@ _return: if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) { qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC); + ctx->queryGotData = true; } +#if 0 if (QW_PHASE_POST_QUERY == phase && ctx) { ctx->queryRsped = true; @@ -485,6 +487,7 @@ _return: QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); } } +#endif if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); @@ -531,6 +534,15 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { _return: +#if 1 + qwBuildAndSendQueryRsp(qwMsg->msgType + 1, &qwMsg->connInfo, code, ctx); + if (ctx) { + ctx->queryRsped = true; + ctx->phase = -1; + QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); + } +#endif + if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); qwReleaseTaskCtx(mgmt, ctx); @@ -600,10 +612,31 @@ _return: input.msgType = qwMsg->msgType; code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); - // if (!queryRsped) { - // qwBuildAndSendQueryRsp(&qwMsg->connInfo, code); - // QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); - //} + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { + void *rsp = NULL; + int32_t dataLen = 0; + SOutputData sOutput = {0}; + QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); + + if (rsp) { + bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); + + qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); + if (qComplete) { + atomic_store_8((int8_t *)&ctx->queryEnd, true); + } + + qwMsg->connInfo = ctx->dataConnInfo; + QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); + + qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code); + rsp = NULL; + + QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, + tstrerror(code), dataLen); + } + } + QW_RET(TSDB_CODE_SUCCESS); } @@ -726,7 +759,9 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { locked = true; // RC WARNING - if (QW_QUERY_RUNNING(ctx)) { + if (-1 == ctx->phase || false == ctx->queryGotData) { + QW_TASK_DLOG_E("task query unfinished"); + } else if (QW_QUERY_RUNNING(ctx)) { atomic_store_8((int8_t *)&ctx->queryContinue, 1); } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) { qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 9cab39c30122072207daa9e9639ab92645fc1633..4ac3961aa7729021fb487666795ea35742910ca9 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -867,9 +867,11 @@ int32_t schLaunchTaskImpl(void *param) { SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); +#if 0 if (SCH_IS_QUERY_JOB(pJob)) { SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask)); } +#endif SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));