From f9f5e6009e9946875e8bcdb1d8a5f1682ab6a919 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 24 Mar 2022 09:29:03 +0800 Subject: [PATCH] feature/scheduler --- source/libs/qworker/src/qworker.c | 29 ++++++++++++++++++++------- source/libs/scheduler/src/scheduler.c | 14 ++++++++++++- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index e61123ef91..876807d17c 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -519,7 +519,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { while (true) { QW_TASK_DLOG("start to execTask, loopIdx:%d", i++); - + + taosSsleep(20); code = qExecTask(*taskHandle, &pRes, &useconds); if (code) { QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); @@ -730,9 +731,13 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu } if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { + dropConnection = &ctx->connInfo; QW_ERR_JRET(qwDropTask(QW_FPARAMS())); + dropConnection = NULL; + + qwBuildAndSendDropRsp(&ctx->connInfo, code); + QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); - dropConnection = &ctx->connInfo; QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); break; } @@ -764,9 +769,13 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu } if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { + dropConnection = &ctx->connInfo; QW_ERR_JRET(qwDropTask(QW_FPARAMS())); + dropConnection = NULL; + + qwBuildAndSendDropRsp(&ctx->connInfo, code); + QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); - dropConnection = &ctx->connInfo; QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } @@ -847,6 +856,9 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase)); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } + + qwBuildAndSendDropRsp(&ctx->connInfo, code); + QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); QW_ERR_JRET(qwDropTask(QW_FPARAMS())); @@ -1163,7 +1175,7 @@ _return: int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; - bool needRsp = false; + bool rsped = false; SQWTaskCtx *ctx = NULL; bool locked = false; @@ -1184,13 +1196,16 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING); } else if (ctx->phase > 0) { + qwBuildAndSendDropRsp(&ctx->connInfo, code); + QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); + QW_ERR_JRET(qwDropTask(QW_FPARAMS())); - needRsp = true; + rsped = true; } else { // task not started } - if (!needRsp) { + if (!rsped) { ctx->connInfo.handle == qwMsg->connInfo.handle; ctx->connInfo.ahandle = qwMsg->connInfo.ahandle; @@ -1215,7 +1230,7 @@ _return: qwReleaseTaskCtx(mgmt, ctx); } - if (TSDB_CODE_SUCCESS != code || needRsp) { + if (TSDB_CODE_SUCCESS != code) { qwBuildAndSendDropRsp(&qwMsg->connInfo, code); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 5a1a8581b0..77f8ccf8cc 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -147,11 +147,23 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } + SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); + return TSDB_CODE_SUCCESS; + case TDMT_VND_FETCH_RSP: + if (lastMsgType != reqMsgType && -1 != lastMsgType) { + SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType)); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) { + SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), TMSG_INFO(msgType)); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); return TSDB_CODE_SUCCESS; case TDMT_VND_CREATE_TABLE_RSP: case TDMT_VND_SUBMIT_RSP: - case TDMT_VND_FETCH_RSP: break; default: SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus)); -- GitLab