From 24d12c6087d40c8342f93ec1cabab39ccbcf40f6 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 16 Mar 2022 16:43:17 +0800 Subject: [PATCH] feature/scheduler --- source/libs/catalog/inc/catalogInt.h | 14 +- source/libs/catalog/src/catalog.c | 14 +- source/libs/qworker/inc/qworkerInt.h | 19 +- source/libs/qworker/src/qworker.c | 568 +++++++++------------------ source/libs/qworker/src/qworkerMsg.c | 2 +- 5 files changed, 205 insertions(+), 412 deletions(-) diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 83e663bdd7..837bac1f1b 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -58,10 +58,10 @@ enum { }; typedef struct SCtgDebug { - bool lockDebug; - bool cacheDebug; - bool apiDebug; - bool metaDebug; + bool lockEnable; + bool cacheEnable; + bool apiEnable; + bool metaEnable; uint32_t showCachePeriodSec; } SCtgDebug; @@ -242,9 +242,9 @@ typedef struct SCtgAction { #define ctgDebug(param, ...) qDebug("CTG:%p " param, pCtg, __VA_ARGS__) #define ctgTrace(param, ...) qTrace("CTG:%p " param, pCtg, __VA_ARGS__) -#define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0) -#define CTG_CACHE_DEBUG(...) do { if (gCTGDebug.cacheDebug) { qDebug(__VA_ARGS__); } } while (0) -#define CTG_API_DEBUG(...) do { if (gCTGDebug.apiDebug) { qDebug(__VA_ARGS__); } } while (0) +#define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockEnable) { qDebug(__VA_ARGS__); } } while (0) +#define CTG_CACHE_DEBUG(...) do { if (gCTGDebug.cacheEnable) { qDebug(__VA_ARGS__); } } while (0) +#define CTG_API_DEBUG(...) do { if (gCTGDebug.apiEnable) { qDebug(__VA_ARGS__); } } while (0) #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 7f95bd5579..16b4931eb5 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -55,25 +55,25 @@ SCtgAction gCtgAction[CTG_ACT_MAX] = {{ int32_t ctgDbgEnableDebug(char *option) { if (0 == strcasecmp(option, "lock")) { - gCTGDebug.lockDebug = true; + gCTGDebug.lockEnable = true; qDebug("lock debug enabled"); return TSDB_CODE_SUCCESS; } if (0 == strcasecmp(option, "cache")) { - gCTGDebug.cacheDebug = true; + gCTGDebug.cacheEnable = true; qDebug("cache debug enabled"); return TSDB_CODE_SUCCESS; } if (0 == strcasecmp(option, "api")) { - gCTGDebug.apiDebug = true; + gCTGDebug.apiEnable = true; qDebug("api debug enabled"); return TSDB_CODE_SUCCESS; } if (0 == strcasecmp(option, "meta")) { - gCTGDebug.metaDebug = true; + gCTGDebug.metaEnable = true; qDebug("api debug enabled"); return TSDB_CODE_SUCCESS; } @@ -155,7 +155,7 @@ int32_t ctgDbgGetClusterCacheNum(SCatalog* pCtg, int32_t type) { } void ctgDbgShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p) { - if (!gCTGDebug.metaDebug) { + if (!gCTGDebug.metaEnable) { return; } @@ -177,7 +177,7 @@ void ctgDbgShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p) { } void ctgDbgShowDBCache(SCatalog* pCtg, SHashObj *dbHash) { - if (NULL == dbHash || !gCTGDebug.cacheDebug) { + if (NULL == dbHash || !gCTGDebug.cacheEnable) { return; } @@ -217,7 +217,7 @@ void ctgDbgShowDBCache(SCatalog* pCtg, SHashObj *dbHash) { void ctgDbgShowClusterCache(SCatalog* pCtg) { - if (!gCTGDebug.cacheDebug || NULL == pCtg) { + if (!gCTGDebug.cacheEnable || NULL == pCtg) { return; } diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 4bd71f0fbd..1bfa2395ce 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -59,23 +59,15 @@ enum { QW_WRITE, }; -enum { - QW_EXIST_ACQUIRE = 1, - QW_EXIST_RET_ERR, -}; enum { QW_NOT_EXIST_RET_ERR = 1, QW_NOT_EXIST_ADD, }; -enum { - QW_ADD_RET_ERR = 1, - QW_ADD_ACQUIRE, -}; - typedef struct SQWDebug { - int32_t lockDebug; + bool lockEnable; + bool statusEnable; } SQWDebug; typedef struct SQWMsg { @@ -91,14 +83,10 @@ typedef struct SQWHbInfo { } SQWHbInfo; typedef struct SQWPhaseInput { - int8_t taskStatus; - int8_t taskType; int32_t code; } SQWPhaseInput; typedef struct SQWPhaseOutput { - int32_t rspCode; - bool needStop; } SQWPhaseOutput; @@ -118,7 +106,6 @@ typedef struct SQWTaskCtx { void *cancelConnection; bool emptyRes; - bool multiExec; int8_t queryContinue; int8_t queryInQueue; int32_t rspCode; @@ -198,7 +185,7 @@ typedef struct SQWorkerMgmt { #define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:0x%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) #define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:0x%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) -#define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0) +#define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockEnable) { qDebug(__VA_ARGS__); } } while (0) #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 468c744dba..eb586cb871 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -9,33 +9,21 @@ #include "tname.h" #include "dataSinkMgt.h" -SQWDebug gQWDebug = {0}; +SQWDebug gQWDebug = {.statusEnable = true}; -char *qwPhaseStr(int32_t phase) { - switch (phase) { - case QW_PHASE_PRE_QUERY: - return "PRE_QUERY"; - case QW_PHASE_POST_QUERY: - return "POST_QUERY"; - case QW_PHASE_PRE_FETCH: - return "PRE_FETCH"; - case QW_PHASE_POST_FETCH: - return "POST_FETCH"; - case QW_PHASE_PRE_CQUERY: - return "PRE_CQUERY"; - case QW_PHASE_POST_CQUERY: - return "POST_CQUERY"; - default: - break; +int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { + if (!gQWDebug.statusEnable) { + return TSDB_CODE_SUCCESS; } - - return "UNKNOWN"; -} - -int32_t qwValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus) { + int32_t code = 0; if (oriStatus == newStatus) { + if (newStatus == JOB_TASK_STATUS_EXECUTING || newStatus == JOB_TASK_STATUS_FAILED) { + *ignore = true; + return TSDB_CODE_SUCCESS; + } + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } @@ -105,14 +93,55 @@ _return: QW_RET(code); } + +char *qwPhaseStr(int32_t phase) { + switch (phase) { + case QW_PHASE_PRE_QUERY: + return "PRE_QUERY"; + case QW_PHASE_POST_QUERY: + return "POST_QUERY"; + case QW_PHASE_PRE_FETCH: + return "PRE_FETCH"; + case QW_PHASE_POST_FETCH: + return "POST_FETCH"; + case QW_PHASE_PRE_CQUERY: + return "PRE_CQUERY"; + case QW_PHASE_POST_CQUERY: + return "POST_CQUERY"; + default: + break; + } + + return "UNKNOWN"; +} + +char *qwBufStatusStr(int32_t bufStatus) { + switch (bufStatus) { + case DS_BUF_LOW: + return "LOW"; + case DS_BUF_FULL: + return "FULL"; + case DS_BUF_EMPTY: + return "EMPTY"; + default: + break; + } + + return "UNKNOWN"; +} + int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) { int32_t code = 0; int8_t origStatus = 0; + bool ignore = false; while (true) { origStatus = atomic_load_8(&task->status); - QW_ERR_RET(qwValidateStatus(QW_FPARAMS(), origStatus, status)); + QW_ERR_RET(qwDbgValidateStatus(QW_FPARAMS(), origStatus, status, &ignore)); + if (ignore) { + break; + } if (origStatus != atomic_val_compare_exchange_8(&task->status, origStatus, status)) { continue; @@ -337,18 +366,12 @@ int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTaskCtxImpl(QW_FPARAMS(), true, ctx); } -int32_t qwAddGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { - return qwAddTaskCtxImpl(QW_FPARAMS(), false, ctx); -} - - void qwReleaseTaskCtx(SQWorkerMgmt *mgmt, void *ctx) { - //QW_UNLOCK(rwType, &mgmt->ctxLock); taosHashRelease(mgmt->ctxHash, ctx); } void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) { - // RC WARNING + // Note: free/kill may in RC qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { qDestroyTask(otaskHandle); @@ -357,7 +380,7 @@ void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) { int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { int32_t code = 0; - // RC WARNING + // Note: free/kill may in RC qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle); if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) { code = qAsyncKillTask(taskHandle); @@ -378,8 +401,7 @@ void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { } -// Note: NEED CTX HASH LOCKED BEFORE ENTRANCE -int32_t qwDropTaskCtx(QW_FPARAMS_DEF, int32_t rwType) { +int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); SQWTaskCtx octx; @@ -396,29 +418,18 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF, int32_t rwType) { QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP); - if (rwType) { - QW_UNLOCK(rwType, &ctx->lock); - } - if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) { QW_TASK_ELOG_E("taosHashRemove from ctx hash failed"); QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); } - if (octx.taskHandle) { - qDestroyTask(octx.taskHandle); - } - - if (octx.sinkHandle) { - dsDestroyDataSinker(octx.sinkHandle); - } + qwFreeTask(QW_FPARAMS(), &octx); QW_TASK_DLOG_E("task ctx dropped"); return TSDB_CODE_SUCCESS; } - int32_t qwDropTaskStatus(QW_FPARAMS_DEF) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; @@ -472,6 +483,13 @@ _return: QW_RET(code); } +int32_t qwDropTask(QW_FPARAMS_DEF) { + QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS())); + QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS())); + + return TSDB_CODE_SUCCESS; +} + int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { int32_t code = 0; bool qcontinue = true; @@ -487,14 +505,15 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { code = qExecTask(*taskHandle, &pRes, &useconds); if (code) { - QW_TASK_ELOG("qExecTask failed, code:%s", tstrerror(code)); - QW_ERR_JRET(code); + QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); + QW_ERR_RET(code); } ++execNum; if (NULL == pRes) { - QW_TASK_DLOG("task query done, useconds:%"PRIu64, useconds); + QW_TASK_DLOG("qExecTask end with empty res, useconds:%"PRIu64, useconds); + dsEndPut(sinkHandle, useconds); if (TASK_TYPE_TEMP == ctx->taskType) { @@ -513,8 +532,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { SInputData inputData = {.pData = pRes}; code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue); if (code) { - QW_TASK_ELOG("dsPutDataBlock failed, code:%s", tstrerror(code)); - QW_ERR_JRET(code); + QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code)); + QW_ERR_RET(code); } QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue); @@ -532,8 +551,6 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { } } -_return: - QW_RET(code); } @@ -589,10 +606,9 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void int32_t code = 0; if (ctx->emptyRes) { - QW_TASK_DLOG("query empty result, query end, phase:%d", ctx->phase); + QW_TASK_DLOG_E("query end with empty result"); QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED)); - QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); *rspMsg = rsp; @@ -613,14 +629,13 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void if (queryEnd) { code = dsGetDataBlock(ctx->sinkHandle, pOutput); if (code) { - QW_TASK_ELOG("dsGetDataBlock failed, code:%x", code); + QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); QW_ERR_RET(code); } - QW_TASK_DLOG("no data in sink and query end, phase:%d", ctx->phase); + QW_TASK_DLOG_E("no data in sink and query end"); QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED)); - QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); *rspMsg = rsp; *dataLen = 0; @@ -629,18 +644,14 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void } pOutput->bufStatus = DS_BUF_EMPTY; - - QW_TASK_DLOG("no res data in sink, need response later, queryEnd:%d", queryEnd); return TSDB_CODE_SUCCESS; } - // Got data from sink + QW_TASK_DLOG("there are data in sink, dataLength:%d", len); *dataLen = len; - - QW_TASK_DLOG("task got data in sink, dataLength:%d", len); QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); *rspMsg = rsp; @@ -648,7 +659,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void pOutput->pData = rsp->data; code = dsGetDataBlock(ctx->sinkHandle, pOutput); if (code) { - QW_TASK_ELOG("dsGetDataBlock failed, code:%x", code); + QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); QW_ERR_RET(code); } @@ -662,9 +673,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t code = 0; - int8_t status = 0; SQWTaskCtx *ctx = NULL; - bool locked = false; void *dropConnection = NULL; void *cancelConnection = NULL; @@ -677,193 +686,92 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu } QW_LOCK(QW_WRITE, &ctx->lock); - locked = true; + + if (QW_PHASE_PRE_FETCH != phase) { + atomic_store_8(&ctx->phase, phase); + } switch (phase) { case QW_PHASE_PRE_QUERY: { - atomic_store_8(&ctx->phase, phase); - - if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { - QW_TASK_ELOG("task already cancelled/dropped at wrong phase %s", qwPhaseStr(phase)); - - output->needStop = true; - output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR; + if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { + QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase)); + QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR); break; } if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { - QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); - QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE)); + QW_ERR_JRET(qwDropTask(QW_FPARAMS())); - output->needStop = true; - output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; - QW_SET_RSP_CODE(ctx, output->rspCode); dropConnection = ctx->dropConnection; - - // Note: ctx freed, no need to unlock it - locked = false; - - break; - } else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) { - QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED)); - - QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL); - output->needStop = true; - output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; - QW_SET_RSP_CODE(ctx, output->rspCode); - - cancelConnection = ctx->cancelConnection; - + QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); break; } - if (ctx->rspCode) { - QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode)); - output->needStop = true; - output->rspCode = ctx->rspCode; - QW_ERR_JRET(output->rspCode); - } - - if (!output->needStop) { - QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING)); - } + QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING)); break; } case QW_PHASE_PRE_FETCH: { - if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { - QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); - output->needStop = true; - output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; + if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { + QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } - if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) { - QW_TASK_WLOG("task already cancelled, phase:%s", qwPhaseStr(phase)); - output->needStop = true; - output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; - QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); - } - - if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { - QW_TASK_ELOG("drop event at wrong phase %s", qwPhaseStr(phase)); - output->needStop = true; - output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR; - QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); - } else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) { - QW_TASK_ELOG("cancel event at wrong phase %s", qwPhaseStr(phase)); - output->needStop = true; - output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR; - QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); - } - - if (ctx->rspCode) { - QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode)); - output->needStop = true; - output->rspCode = ctx->rspCode; - QW_ERR_JRET(output->rspCode); - } if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { QW_TASK_WLOG("last fetch not finished, phase:%s", qwPhaseStr(phase)); - output->needStop = true; - output->rspCode = TSDB_CODE_QRY_DUPLICATTED_OPERATION; QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); } if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) { QW_TASK_ELOG("query rsp are not ready, phase:%s", qwPhaseStr(phase)); - output->needStop = true; - output->rspCode = TSDB_CODE_QRY_TASK_MSG_ERROR; QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR); } break; } case QW_PHASE_PRE_CQUERY: { - atomic_store_8(&ctx->phase, phase); - - if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) { - QW_TASK_WLOG("task already cancelled, phase:%s", qwPhaseStr(phase)); - output->needStop = true; - output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; - QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); - } - if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); - output->needStop = true; - output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { - QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); - QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE)); + QW_ERR_JRET(qwDropTask(QW_FPARAMS())); - output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; - output->needStop = true; - QW_SET_RSP_CODE(ctx, output->rspCode); dropConnection = ctx->dropConnection; - - // Note: ctx freed, no need to unlock it - locked = false; - - QW_ERR_JRET(output->rspCode); - } else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) { - QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED)); - qwFreeTask(QW_FPARAMS(), ctx); - - QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL); - - output->needStop = true; - output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; - QW_SET_RSP_CODE(ctx, output->rspCode); - cancelConnection = ctx->cancelConnection; - - QW_ERR_JRET(output->rspCode); + QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } - - if (ctx->rspCode) { - QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode)); - output->needStop = true; - output->rspCode = ctx->rspCode; - QW_ERR_JRET(output->rspCode); - } break; - } + } + default: + QW_TASK_ELOG("invalid phase %s", qwPhaseStr(phase)); + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + if (ctx->rspCode) { + QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode)); + QW_ERR_JRET(ctx->rspCode); } _return: if (ctx) { - if (output->rspCode) { - QW_UPDATE_RSP_CODE(ctx, output->rspCode); - } + QW_UPDATE_RSP_CODE(ctx, code); - if (locked) { - QW_UNLOCK(QW_WRITE, &ctx->lock); - } - + QW_UNLOCK(QW_WRITE, &ctx->lock); qwReleaseTaskCtx(mgmt, ctx); } - if (code) { - output->needStop = true; - if (TSDB_CODE_SUCCESS == output->rspCode) { - output->rspCode = code; - } - } - if (dropConnection) { - qwBuildAndSendDropRsp(dropConnection, output->rspCode); - QW_TASK_DLOG("drop msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode)); + qwBuildAndSendDropRsp(dropConnection, code); + QW_TASK_DLOG("drop msg rsped, code:%x - %s", code, tstrerror(code)); } if (cancelConnection) { - qwBuildAndSendCancelRsp(cancelConnection, output->rspCode); - QW_TASK_DLOG("cancel msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode)); + qwBuildAndSendCancelRsp(cancelConnection, code); + QW_TASK_DLOG("cancel msg rsped, code:%x - %s", code, tstrerror(code)); } - QW_TASK_DLOG("end to handle event at phase %s", qwPhaseStr(phase)); + QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code)); QW_RET(code); } @@ -871,39 +779,21 @@ _return: int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t code = 0; - int8_t status = 0; SQWTaskCtx *ctx = NULL; - bool locked = false; void *readyConnection = NULL; void *dropConnection = NULL; void *cancelConnection = NULL; QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); - - output->needStop = false; QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); QW_LOCK(QW_WRITE, &ctx->lock); - locked = true; if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); - output->needStop = true; - output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } - - if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) { - QW_TASK_WLOG("task already cancelled, phase:%s", qwPhaseStr(phase)); - output->needStop = true; - output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; - QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); - } - - if (input->code) { - output->rspCode = input->code; - } if (QW_PHASE_POST_QUERY == phase) { if (NULL == ctx->taskHandle && NULL == ctx->sinkHandle) { @@ -917,84 +807,57 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp } if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { - QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); - QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE)); - - output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; - output->needStop = true; - QW_SET_RSP_CODE(ctx, output->rspCode); - dropConnection = ctx->dropConnection; + if (QW_PHASE_POST_FETCH == phase) { + QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase)); + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } - // Note: ctx freed, no need to unlock it - locked = false; - - QW_ERR_JRET(output->rspCode); - } else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) { - QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED)); - qwFreeTask(QW_FPARAMS(), ctx); + QW_ERR_JRET(qwDropTask(QW_FPARAMS())); - QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL); - - output->needStop = true; - output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; - QW_SET_RSP_CODE(ctx, output->rspCode); - cancelConnection = ctx->cancelConnection; - - QW_ERR_JRET(output->rspCode); + dropConnection = ctx->dropConnection; + QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } if (ctx->rspCode) { - QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode)); - output->needStop = true; - output->rspCode = ctx->rspCode; - QW_ERR_JRET(output->rspCode); + QW_TASK_ELOG("task already failed, phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode)); + QW_ERR_JRET(ctx->rspCode); } - if (QW_PHASE_POST_QUERY == phase && (!output->needStop)) { - QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), input->taskStatus)); - } + QW_ERR_JRET(input->code); _return: if (ctx) { - if (output->rspCode) { - QW_UPDATE_RSP_CODE(ctx, output->rspCode); - } + QW_UPDATE_RSP_CODE(ctx, code); if (QW_PHASE_POST_FETCH != phase) { atomic_store_8(&ctx->phase, phase); } - if (locked) { - QW_UNLOCK(QW_WRITE, &ctx->lock); - } - + QW_UNLOCK(QW_WRITE, &ctx->lock); qwReleaseTaskCtx(mgmt, ctx); } - if (code) { - output->needStop = true; - if (TSDB_CODE_SUCCESS == output->rspCode) { - output->rspCode = code; - } - } - if (readyConnection) { - qwBuildAndSendReadyRsp(readyConnection, output->rspCode); - QW_TASK_DLOG("ready msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode)); + qwBuildAndSendReadyRsp(readyConnection, code); + QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code)); } if (dropConnection) { - qwBuildAndSendDropRsp(dropConnection, output->rspCode); - QW_TASK_DLOG("drop msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode)); + qwBuildAndSendDropRsp(dropConnection, code); + QW_TASK_DLOG("drop msg rsped, code:%x - %s", code, tstrerror(code)); } if (cancelConnection) { - qwBuildAndSendCancelRsp(cancelConnection, output->rspCode); - QW_TASK_DLOG("cancel msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode)); + qwBuildAndSendCancelRsp(cancelConnection, code); + QW_TASK_DLOG("cancel msg rsped, code:%x - %s", code, tstrerror(code)); + } + + if (code) { + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED); } - QW_TASK_DLOG("end to handle event at phase %s", qwPhaseStr(phase)); + QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code)); QW_RET(code); } @@ -1005,22 +868,12 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { bool queryRsped = false; bool needStop = false; struct SSubplan *plan = NULL; - int32_t rspCode = 0; SQWPhaseInput input = {0}; - SQWPhaseOutput output = {0}; qTaskInfo_t pTaskInfo = NULL; DataSinkHandle sinkHandle = NULL; SQWTaskCtx *ctx = NULL; - QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, &output)); - - needStop = output.needStop; - code = output.rspCode; - - if (needStop) { - QW_TASK_DLOG("task need stop, phase:%d", QW_PHASE_PRE_QUERY); - QW_ERR_JRET(code); - } + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); @@ -1028,13 +881,13 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { code = qStringToSubplan(qwMsg->msg, &plan); if (TSDB_CODE_SUCCESS != code) { - QW_TASK_ELOG("task string to subplan failed, code:%s", tstrerror(code)); + QW_TASK_ELOG("task string to subplan failed, code:%x - %s", code, tstrerror(code)); QW_ERR_JRET(code); } - code = qCreateExecTask(qwMsg->node, 0, tId, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle); + code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle); if (code) { - QW_TASK_ELOG("qCreateExecTask failed, code:%s", tstrerror(code)); + QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); QW_ERR_JRET(code); } @@ -1043,10 +896,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - //TODO OPTIMIZE EMTYP RESULT QUERY RSP TO AVOID FURTHER FETCH - QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, code)); - QW_TASK_DLOG("query msg rsped, code:%d", code); + QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code)); queryRsped = true; @@ -1059,57 +910,52 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { _return: - if (code) { - rspCode = code; + input.code = code; + code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); + + if (TSDB_CODE_SUCCESS == code) { + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED); } if (!queryRsped) { - qwBuildAndSendQueryRsp(qwMsg->connection, rspCode); - QW_TASK_DLOG("query msg rsped, code:%x", rspCode); - } - - input.code = rspCode; - input.taskStatus = rspCode ? JOB_TASK_STATUS_FAILED : JOB_TASK_STATUS_PARTIAL_SUCCEED; - - QW_ERR_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, &output)); - - QW_RET(rspCode); + qwBuildAndSendQueryRsp(qwMsg->connection, code); + QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code)); + } } int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; SQWTaskCtx *ctx = NULL; int8_t phase = 0; - bool needRsp = false; - int32_t rspCode = 0; + bool needRsp = true; QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); QW_LOCK(QW_WRITE, &ctx->lock); - if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || - QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { - QW_TASK_WLOG("task already cancelled/dropped, phase:%d", phase); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); + if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { + QW_TASK_WLOG_E("task is dropping or already dropped"); + QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } - - phase = QW_GET_PHASE(ctx); - if (phase == QW_PHASE_PRE_QUERY) { + if (ctx->phase == QW_PHASE_PRE_QUERY) { QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY); ctx->readyConnection = qwMsg->connection; - QW_TASK_DLOG("ready msg not rsped, phase:%d", phase); - } else if (phase == QW_PHASE_POST_QUERY) { - QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); - needRsp = true; - rspCode = ctx->rspCode; - } else { - QW_TASK_ELOG("invalid phase when got ready msg, phase:%d", phase); - QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); - needRsp = true; - rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR; - QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR); + needRsp = false; + QW_TASK_DLOG_E("ready msg will not rsp now"); + goto _return; + } + + QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); + + if (ctx->phase == QW_PHASE_POST_QUERY) { + code = ctx->rspCode; + goto _return; } + + QW_TASK_ELOG("invalid phase when got ready msg, phase:%s", qwPhaseStr(ctx->phase)); + + QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR); _return: @@ -1117,14 +963,18 @@ _return: QW_UPDATE_RSP_CODE(ctx, code); } + if (code) { + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED); + } + if (ctx) { QW_UNLOCK(QW_WRITE, &ctx->lock); qwReleaseTaskCtx(mgmt, ctx); } if (needRsp) { - qwBuildAndSendReadyRsp(qwMsg->connection, rspCode); - QW_TASK_DLOG("ready msg rsped, code:%x", rspCode); + qwBuildAndSendReadyRsp(qwMsg->connection, code); + QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code)); } QW_RET(code); @@ -1134,25 +984,13 @@ _return: int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { SQWTaskCtx *ctx = NULL; int32_t code = 0; - bool queryRsped = false; - bool needStop = false; - struct SSubplan *plan = NULL; SQWPhaseInput input = {0}; - SQWPhaseOutput output = {0}; void *rsp = NULL; int32_t dataLen = 0; bool queryEnd = false; do { - QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, &output)); - - needStop = output.needStop; - code = output.rspCode; - - if (needStop) { - QW_TASK_DLOG("task need stop, phase:%d", QW_PHASE_PRE_CQUERY); - QW_ERR_JRET(code); - } + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); @@ -1166,7 +1004,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { - QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", sOutput.bufStatus); + QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus)); // RC WARNING atomic_store_8(&ctx->queryContinue, 1); @@ -1185,11 +1023,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } } - if (queryEnd) { - needStop = true; - } - - _return: +_return: if (NULL == ctx) { break; @@ -1200,51 +1034,33 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { qwFreeFetchRsp(rsp); rsp = NULL; qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code); - QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, 0); + QW_TASK_DLOG("fetch msg rsped, code:%x - %s", code, tstrerror(code)); } QW_LOCK(QW_WRITE, &ctx->lock); - if (needStop || code || 0 == atomic_load_8(&ctx->queryContinue)) { + if (queryEnd || code || 0 == atomic_load_8(&ctx->queryContinue)) { + // Note: if necessary, fetch need to put cquery to queue again atomic_store_8(&ctx->phase, 0); QW_UNLOCK(QW_WRITE,&ctx->lock); break; } - QW_UNLOCK(QW_WRITE,&ctx->lock); } while (true); input.code = code; - qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, &output); - - QW_RET(code); + QW_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL)); } int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; - int32_t needRsp = true; - void *data = NULL; - int32_t sinkStatus = 0; int32_t dataLen = 0; - bool queryEnd = false; - bool needStop = false; bool locked = false; SQWTaskCtx *ctx = NULL; - int8_t status = 0; void *rsp = NULL; - SQWPhaseInput input = {0}; - SQWPhaseOutput output = {0}; - QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, &output)); - - needStop = output.needStop; - code = output.rspCode; - - if (needStop) { - QW_TASK_DLOG("task need stop, phase:%d", QW_PHASE_PRE_FETCH); - QW_ERR_JRET(code); - } + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); @@ -1259,7 +1075,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { - QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", sOutput.bufStatus); + QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus)); QW_LOCK(QW_WRITE, &ctx->lock); locked = true; @@ -1268,16 +1084,11 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (QW_IS_QUERY_RUNNING(ctx)) { atomic_store_8(&ctx->queryContinue, 1); } else if (0 == atomic_load_8(&ctx->queryInQueue)) { - if (!ctx->multiExec) { - QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING)); - ctx->multiExec = true; - } + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING)); atomic_store_8(&ctx->queryInQueue, 1); QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), qwMsg->connection)); - - QW_TASK_DLOG("schedule query in queue, phase:%d", ctx->phase); } } @@ -1288,20 +1099,15 @@ _return: } input.code = code; - - qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, &output); - - if (output.rspCode) { - code = output.rspCode; - } + code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL); if (code) { qwFreeFetchRsp(rsp); rsp = NULL; dataLen = 0; - qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code); - QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen); - } else if (rsp) { + } + + if (code || rsp) { qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code); QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen); } @@ -1316,6 +1122,8 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { SQWTaskCtx *ctx = NULL; bool locked = false; + // TODO : TASK ALREADY REMOVED AND A NEW DROP MSG RECEIVED + QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx)); QW_LOCK(QW_WRITE, &ctx->lock); @@ -1323,22 +1131,18 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { locked = true; if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { - QW_TASK_WLOG("task already dropping, phase:%d", ctx->phase); + QW_TASK_WLOG_E("task already dropping"); QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); } if (QW_IS_QUERY_RUNNING(ctx)) { QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); - QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING)); } else if (ctx->phase > 0) { - QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); - QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE)); - - QW_SET_RSP_CODE(ctx, TSDB_CODE_QRY_TASK_DROPPED); - - locked = false; + QW_ERR_JRET(qwDropTask(QW_FPARAMS())); needRsp = true; + } else { + // task not started } if (!needRsp) { @@ -1351,6 +1155,8 @@ _return: if (code) { QW_UPDATE_RSP_CODE(ctx, code); + + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED); } if (locked) { diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 8db661f197..98b917c525 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -263,7 +263,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection) { QW_ERR_RET(code); } - QW_SCH_TASK_DLOG("put task continue exec msg to query queue, vgId:%d", mgmt->nodeId); + QW_SCH_TASK_DLOG("query continue msg put to queue, vgId:%d", mgmt->nodeId); return TSDB_CODE_SUCCESS; } -- GitLab