From 30556c676f4e6187f5926992e76523f39aec5094 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Sat, 8 Jan 2022 10:35:29 +0800 Subject: [PATCH] feature/qnode --- source/libs/scheduler/inc/schedulerInt.h | 6 ++ source/libs/scheduler/src/scheduler.c | 131 ++++++++++++++--------- 2 files changed, 89 insertions(+), 48 deletions(-) diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 014385c3a6..270f255ec0 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -113,6 +113,12 @@ typedef struct SSchJob { #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN) #define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY) +#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) +#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) + +#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st) +#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status) + #define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != QUERY_TYPE_MODIFY) #define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 6c6d5d8385..56e0fe9a5c 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -100,7 +100,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) { pTask->plan = pPlan; pTask->level = pLevel; - pTask->status = JOB_TASK_STATUS_NOT_START; + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); pTask->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); return TSDB_CODE_SUCCESS; @@ -236,7 +236,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_DLOG("use execNode from plan as candidate addr"); + SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.numOfEps); return TSDB_CODE_SUCCESS; } @@ -273,13 +273,19 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { } int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { - if (0 != taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) { - qError("failed to add new task, taskId:0x%"PRIx64", reqId:0x"PRIx64", out of memory", pJob->queryId); + int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES); + if (0 != code) { + if (HASH_NODE_EXIST(code)) { + SCH_TASK_ELOG("task already in exec list, code:%x", code); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + SCH_TASK_ELOG("taosHashPut task to exec list failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - qDebug("add one task, taskId:0x%"PRIx64", numOfTasks:%d, reqId:0x%"PRIx64, pTask->taskId, taosHashGetSize(pJob->execTasks), - pJob->queryId); + SCH_TASK_DLOG("task added to exec list, numOfTasks:%d", taosHashGetSize(pJob->execTasks)); + return TSDB_CODE_SUCCESS; } @@ -387,7 +393,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } - pTask->status = JOB_TASK_STATUS_SUCCEED; + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED); int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0; if (parentNum == 0) { @@ -460,10 +466,14 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) if (!needRetry) { SCH_TASK_ELOG("task failed[%x], no more retry", errCode); - SCH_ERR_RET(schMoveTaskToFailList(pJob, pTask, &moved)); - if (!moved) { - SCH_TASK_ELOG("task may already moved, status:%d", pTask->status); - } + if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) { + SCH_ERR_RET(schMoveTaskToFailList(pJob, pTask, &moved)); + if (!moved) { + SCH_TASK_ELOG("task may already moved, status:%d", pTask->status); + } + } + + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED); if (SCH_TASK_NEED_WAIT_ALL(pTask)) { SCH_LOCK(SCH_WRITE, &pTask->level->lock); @@ -476,11 +486,10 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) return TSDB_CODE_SUCCESS; } } - - pJob->status = JOB_TASK_STATUS_FAILED; + SCH_ERR_RET(schProcessOnJobFailure(pJob, errCode)); - return TSDB_CODE_SUCCESS; + return errCode; } SCH_ERR_RET(schLaunchTask(pJob, pTask)); @@ -706,7 +715,7 @@ void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { } -int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { +int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { uint32_t msgSize = 0; void *msg = NULL; int32_t code = 0; @@ -714,22 +723,22 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { switch (msgType) { case TDMT_VND_CREATE_TABLE: case TDMT_VND_SUBMIT: { - if (NULL == task->msg || task->msgLen <= 0) { + if (NULL == pTask->msg || pTask->msgLen <= 0) { qError("submit msg is NULL"); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - msgSize = task->msgLen; - msg = task->msg; + msgSize = pTask->msgLen; + msg = pTask->msg; break; } case TDMT_VND_QUERY: { - if (NULL == task->msg) { + if (NULL == pTask->msg) { qError("query msg is NULL"); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - msgSize = sizeof(SSubQueryMsg) + task->msgLen; + msgSize = sizeof(SSubQueryMsg) + pTask->msgLen; msg = calloc(1, msgSize); if (NULL == msg) { qError("calloc %d failed", msgSize); @@ -738,12 +747,12 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { SSubQueryMsg *pMsg = msg; - pMsg->header.vgId = htonl(task->plan->execNode.nodeId); + pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId); pMsg->sId = htobe64(schMgmt.sId); - pMsg->queryId = htobe64(job->queryId); - pMsg->taskId = htobe64(task->taskId); - pMsg->contentLen = htonl(task->msgLen); - memcpy(pMsg->msg, task->msg, task->msgLen); + pMsg->queryId = htobe64(pJob->queryId); + pMsg->taskId = htobe64(pTask->taskId); + pMsg->contentLen = htonl(pTask->msgLen); + memcpy(pMsg->msg, pTask->msg, pTask->msgLen); break; } case TDMT_VND_RES_READY: { @@ -756,14 +765,14 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { SResReadyMsg *pMsg = msg; - pMsg->header.vgId = htonl(task->plan->execNode.nodeId); + pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId); pMsg->sId = htobe64(schMgmt.sId); - pMsg->queryId = htobe64(job->queryId); - pMsg->taskId = htobe64(task->taskId); + pMsg->queryId = htobe64(pJob->queryId); + pMsg->taskId = htobe64(pTask->taskId); break; } case TDMT_VND_FETCH: { - if (NULL == task) { + if (NULL == pTask) { SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } msgSize = sizeof(SResFetchMsg); @@ -775,10 +784,10 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { SResFetchMsg *pMsg = msg; - pMsg->header.vgId = htonl(task->plan->execNode.nodeId); + pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId); pMsg->sId = htobe64(schMgmt.sId); - pMsg->queryId = htobe64(job->queryId); - pMsg->taskId = htobe64(task->taskId); + pMsg->queryId = htobe64(pJob->queryId); + pMsg->taskId = htobe64(pTask->taskId); break; } case TDMT_VND_DROP_TASK:{ @@ -791,10 +800,10 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { STaskDropMsg *pMsg = msg; - pMsg->header.vgId = htonl(task->plan->execNode.nodeId); + pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId); pMsg->sId = htobe64(schMgmt.sId); - pMsg->queryId = htobe64(job->queryId); - pMsg->taskId = htobe64(task->taskId); + pMsg->queryId = htobe64(pJob->queryId); + pMsg->taskId = htobe64(pTask->taskId); break; } default: @@ -804,11 +813,11 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { } SEpSet epSet; - SQueryNodeAddr *addr = taosArrayGet(task->candidateAddrs, task->candidateIdx); + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); schConvertAddrToEpSet(addr, &epSet); - SCH_ERR_JRET(schAsyncSendMsg(job->transport, &epSet, job->queryId, task->taskId, msgType, msg, msgSize)); + SCH_ERR_JRET(schAsyncSendMsg(pJob->transport, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize)); return TSDB_CODE_SUCCESS; @@ -818,25 +827,51 @@ _return: SCH_RET(code); } +static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { + int8_t status = SCH_GET_JOB_STATUS(pJob); + if (pStatus) { + *pStatus = status; + } + + return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED + || status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING); +} int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { - SSubplan *plan = pTask->plan; + int8_t status = 0; + int32_t code = 0; + + if (schJobNeedToStop(pJob, &status)) { + SCH_TASK_ELOG("no need to launch task cause of job status, job status:%d", status); + SCH_ERR_RET(pJob->errCode); + } - SCH_ERR_RET(qSubPlanToString(plan, &pTask->msg, &pTask->msgLen)); - SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask)); + SSubplan *plan = pTask->plan; - if (NULL == pTask->candidateAddrs || taosArrayGetSize(pTask->candidateAddrs) <= 0) { - SCH_TASK_ELOG("no valid candidate node for task:%"PRIx64, pTask->taskId); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + if (NULL == pTask->msg) { + code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen); + if (TSDB_CODE_SUCCESS != code || NULL == pTask->msg || pTask->msgLen <= 0) { + SCH_TASK_ELOG("subplanToString error, code:%x, msg:%p, len:%d", code, pTask->msg, pTask->msgLen); + SCH_ERR_JRET(code); + } } + + SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); // NOTE: race condition: the task should be put into the hash table before send msg to server - SCH_ERR_RET(schPushTaskToExecList(pJob, pTask)); - SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, plan->msgType)); + SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask)); + + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING); - pTask->status = JOB_TASK_STATUS_EXECUTING; + SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, plan->msgType)); return TSDB_CODE_SUCCESS; + +_return: + + code = schProcessOnTaskFailure(pJob, pTask, code); + + SCH_RET(code); } int32_t schLaunchJob(SSchJob *pJob) { @@ -936,11 +971,11 @@ int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void* *(SSchJob **)job = pJob; if (syncSchedule) { - SCH_JOB_DLOG("will wait for rsp now"); + SCH_JOB_DLOG("will wait for rsp now, job status:%d", SCH_GET_JOB_STATUS(pJob)); tsem_wait(&pJob->rspSem); } - SCH_JOB_DLOG("job exec done"); + SCH_JOB_DLOG("job exec done, job status:%d", SCH_GET_JOB_STATUS(pJob)); return TSDB_CODE_SUCCESS; -- GitLab