From b9033a75015e1c74e592867483446759ca7b67e7 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 20 Jan 2022 10:36:06 +0800 Subject: [PATCH] feature/qnode --- source/libs/qworker/inc/qworkerInt.h | 1 + source/libs/qworker/src/qworker.c | 47 +++++++++++++++++++++++----- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 193287eeac..7d043a0e02 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -105,6 +105,7 @@ typedef struct SQWTaskCtx { SRWLatch lock; int8_t phase; + bool emptyRes; int8_t queryContinue; int8_t inQueue; int32_t rspCode; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 50a6158631..c6e2bad421 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -532,6 +532,20 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void bool queryEnd = false; int32_t code = 0; + if (ctx->emptyRes) { + QW_TASK_DLOG("query empty result, query end, phase:%d", ctx->phase); + + QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED)); + + QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); + + *rspMsg = rsp; + *dataLen = 0; + pOutput->queryEnd = true; + + return TSDB_CODE_SUCCESS; + } + dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); if (len < 0) { @@ -644,6 +658,10 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S ctx->taskHandle = input->taskHandle; ctx->sinkHandle = input->sinkHandle; + + if (NULL == ctx->taskHandle && NULL == ctx->sinkHandle) { + ctx->emptyRes = true; + } if (input->code) { QW_SET_RSP_CODE(ctx, input->code); @@ -885,12 +903,14 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t QW_ERR_JRET(code); } - QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, TSDB_CODE_SUCCESS)); + QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, code)); + QW_TASK_DLOG("query msg rsped, code:%d", code); queryRsped = true; - QW_ERR_JRET(qwExecTask(QW_FPARAMS(), pTaskInfo, sinkHandle)); - + if (pTaskInfo && sinkHandle) { + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), pTaskInfo, sinkHandle)); + } _return: if (code) { @@ -899,6 +919,10 @@ _return: if (!queryRsped) { code = qwBuildAndSendQueryRsp(qwMsg->connection, rspCode); + if (TSDB_CODE_SUCCESS == code) { + QW_TASK_DLOG("query msg rsped, code:%d", rspCode); + } + if (TSDB_CODE_SUCCESS == rspCode && code) { rspCode = code; } @@ -921,7 +945,8 @@ _return: QW_ERR_RET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, &output)); if (queryRsped && output.needRsp) { - qwBuildAndSendReadyRsp(qwMsg->connection, output.rspCode); + qwBuildAndSendReadyRsp(qwMsg->connection, output.rspCode); + QW_TASK_DLOG("ready msg rsped, code:%x", output.rspCode); } QW_RET(rspCode); @@ -940,9 +965,11 @@ int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t if (phase == QW_PHASE_PRE_QUERY) { QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY); + QW_TASK_DLOG("ready msg not rsped, phase:%d", phase); } else if (phase == QW_PHASE_POST_QUERY) { QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); QW_ERR_JRET(qwBuildAndSendReadyRsp(qwMsg->connection, ctx->rspCode)); + QW_TASK_DLOG("ready msg rsped, code:%x", ctx->rspCode); } else { QW_TASK_ELOG("invalid phase when got ready msg, phase:%d", phase); assert(0); @@ -1009,7 +1036,8 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); - qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code); + qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code); + QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen); } else { atomic_store_8(&ctx->queryContinue, 1); } @@ -1022,6 +1050,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t qwFreeFetchRsp(rsp); rsp = NULL; qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code); + QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, 0); } input.code = code; @@ -1072,7 +1101,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t qwBuildFetchRsp(rsp, &sOutput, dataLen); } - if ((!sOutput.queryEnd) && (/* DS_BUF_LOW == sOutput.bufStatus || */ DS_BUF_EMPTY == sOutput.bufStatus)) { + if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", sOutput.bufStatus); QW_LOCK(QW_WRITE, &ctx->lock); @@ -1103,12 +1132,14 @@ _return: if (code) { qwFreeFetchRsp(rsp); rsp = NULL; - qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code); + dataLen = 0; + qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code); + QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen); } else if (rsp) { qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code); + QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen); } - QW_RET(code); } -- GitLab