diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 375851ff18bad3a0c59e297fbe9a859b72bd0bd0..57bb73c500cb0285b1bcd064b6d0129c99540a13 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -254,6 +254,19 @@ int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ return TSDB_CODE_SUCCESS; } +int32_t qwGetTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) { + char id[sizeof(qId) + sizeof(tId)] = {0}; + QW_SET_QTID(id, qId, tId); + + *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); + if (NULL == (*ctx)) { + QW_TASK_ELOG("ctx not in ctxHash, id:%s", id); + QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST); + } + + return TSDB_CODE_SUCCESS; +} + int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, int32_t status, SQWTaskCtx **ctx) { char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); @@ -294,19 +307,6 @@ int32_t qwAddTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tI QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), 0, 0, NULL)); } -int32_t qwGetTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) { - char id[sizeof(qId) + sizeof(tId)] = {0}; - QW_SET_QTID(id, qId, tId); - - *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); - if (NULL == (*ctx)) { - QW_TASK_ELOG("ctx not in ctxHash, id:%s", id); - QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST); - } - - return TSDB_CODE_SUCCESS; - -} int32_t qwAddAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWTaskCtx **ctx) { @@ -322,16 +322,33 @@ void qwReleaseTaskCtx(int32_t rwType, SQWorkerMgmt *mgmt) { QW_UNLOCK(rwType, &mgmt->ctxLock); } +void qwFreeTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { + // RC WARNING + qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle); + if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) { + qDestroyTask(taskHandle); + } +} -void qwFreeTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *ctx) { - if (ctx->taskHandle) { - qDestroyTask(ctx->taskHandle); - ctx->taskHandle = NULL; +int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { + int32_t code = 0; + // RC WARNING + qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle); + if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) { + code = qKillTask(taskHandle); + atomic_store_ptr(&ctx->taskHandle, taskHandle); } - // TODO - if (ctx->sinkHandle) { + QW_RET(code); +} + +void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { + qwFreeTaskHandle(QW_FPARAMS(), ctx); + + if (ctx->sinkHandle) { + dsDestroyDataSinker(ctx->sinkHandle); + ctx->sinkHandle = NULL; } } @@ -425,7 +442,7 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *ctx = NULL; bool locked = false; - QW_ERR_JRET(qwAddGetTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); + QW_ERR_JRET(qwAddGetTaskCtx(QW_FPARAMS(), &ctx)); QW_LOCK(QW_WRITE, &ctx->lock); @@ -437,9 +454,7 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, } if (QW_IN_EXECUTOR(ctx)) { - if (ctx->taskHandle) { - QW_ERR_JRET(qKillTask(ctx->taskHandle)); - } + QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING)); } else if (ctx->phase > 0) { @@ -587,7 +602,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S switch (phase) { case QW_PHASE_PRE_QUERY: { - QW_ERR_JRET(qwAddGetTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); + QW_ERR_JRET(qwAddGetTaskCtx(QW_FPARAMS(), &ctx)); ctx->phase = phase; @@ -607,7 +622,6 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S output->needStop = true; QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED)); - qwFreeTask(QW_FPARAMS(), ctx); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL); @@ -620,7 +634,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S break; } case QW_PHASE_POST_QUERY: { - QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); + QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); QW_LOCK(QW_WRITE, &ctx->lock); @@ -702,7 +716,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S break; } case QW_PHASE_POST_CQUERY: { - QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); + QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); QW_LOCK(QW_WRITE, &ctx->lock); @@ -738,7 +752,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S break; } case QW_PHASE_PRE_FETCH: { - QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); + QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); QW_LOCK(QW_WRITE, &ctx->lock); @@ -786,7 +800,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S break; } case QW_PHASE_POST_FETCH: { - QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); + QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); QW_LOCK(QW_WRITE, &ctx->lock); @@ -983,7 +997,8 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", sOutput.bufStatus); - + + // RC WARNING atomic_store_8(&ctx->queryContinue, 1); } @@ -1061,6 +1076,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t QW_LOCK(QW_WRITE, &ctx->lock); locked = true; + // RC WARNING if (QW_IN_EXECUTOR(ctx)) { atomic_store_8(&ctx->queryContinue, 1); } else if (0 == atomic_load_8(&ctx->inQueue)) {