From 93e94127e287a22eb4e92d5a6683a4d0a67f78ab Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 10 Jan 2022 18:22:12 +0800 Subject: [PATCH] feature/qnode --- include/libs/qcom/query.h | 1 + source/libs/scheduler/inc/schedulerInt.h | 16 +- source/libs/scheduler/src/scheduler.c | 724 ++++++++++++------ source/libs/scheduler/test/schedulerTests.cpp | 14 +- 4 files changed, 500 insertions(+), 255 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index da88366f11..92bbb074f1 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -35,6 +35,7 @@ enum { JOB_TASK_STATUS_CANCELLING, JOB_TASK_STATUS_CANCELLED, JOB_TASK_STATUS_DROPPING, + JOB_TASK_STATUS_FREEING, }; typedef struct STableComInfo { diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 270f255ec0..4195376812 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -61,14 +61,17 @@ typedef struct SSchLevel { typedef struct SSchTask { uint64_t taskId; // task id + SRWLatch lock; // task lock SSchLevel *level; // level SSubplan *plan; // subplan char *msg; // operator tree int32_t msgLen; // msg length int8_t status; // task status - SQueryNodeAddr execAddr; // task actual executed node address + int32_t lastMsgType; // last sent msg type + SQueryNodeAddr succeedAddr; // task executed success node address int8_t candidateIdx; // current try condidation index SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr + SArray *execAddrs; // all tried node for current task, element is SQueryNodeAddr SQueryProfileSummary summary; // task execution summary int32_t childReady; // child task ready number SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* @@ -96,22 +99,24 @@ typedef struct SSchJob { SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* + int32_t ref; int8_t status; SQueryNodeAddr resNode; tsem_t rspSem; - int32_t userFetch; + int8_t userFetch; int32_t remoteFetch; SSchTask *fetchTask; int32_t errCode; - void *res; + void *res; //TODO free it or not int32_t resNumOfRows; SQueryProfileSummary summary; } SSchJob; -#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE #define SCH_TASK_READY_TO_LUNCH(task) (atomic_load_32(&(task)->childReady) >= taosArrayGetSize((task)->children)) + #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN) #define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY) +#define SCH_TASK_NO_NEED_DROP(task) ((task)->plan->type == QUERY_TYPE_MODIFY) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) @@ -127,6 +132,7 @@ typedef struct SSchJob { #define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__) #define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__) +#define SCH_TASK_WLOG(param, ...) qWarn("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) @@ -137,7 +143,7 @@ typedef struct SSchJob { static int32_t schLaunchTask(SSchJob *job, SSchTask *task); -static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType); +static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType); #ifdef __cplusplus } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 25b8ec981e..c13aacb2ad 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -20,9 +20,75 @@ static SSchedulerMgmt schMgmt = {0}; -int32_t schValidateStatus(SSchJob *pJob, int8_t oriStatus, int8_t newStatus) { + +int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) { + pTask->plan = pPlan; + pTask->level = pLevel; + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); + pTask->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); + pTask->execAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); + if (NULL == pTask->execAddrs) { + SCH_TASK_ELOG("taosArrayInit %d exec addrs failed", SCH_MAX_CONDIDATE_EP_NUM); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + return TSDB_CODE_SUCCESS; +} + +void schFreeTask(SSchTask* pTask) { + if (pTask->candidateAddrs) { + taosArrayDestroy(pTask->candidateAddrs); + } + + // TODO NEED TO VERFY WITH ASYNC_SEND MEMORY FREE + //tfree(pTask->msg); + + if (pTask->children) { + taosArrayDestroy(pTask->children); + } + + if (pTask->parents) { + taosArrayDestroy(pTask->parents); + } +} + + +int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { + int32_t lastMsgType = atomic_load_32(&pTask->lastMsgType); + + switch (msgType) { + case TDMT_VND_CREATE_TABLE_RSP: + case TDMT_VND_SUBMIT_RSP: + case TDMT_VND_QUERY_RSP: + case TDMT_VND_RES_READY_RSP: + case TDMT_VND_FETCH_RSP: + case TDMT_VND_DROP_TASK: + if (lastMsgType != (msgType - 1)) { + SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%d, rspType:%d", lastMsgType, msgType); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) { + SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%d", SCH_GET_TASK_STATUS(pTask), msgType); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + break; + default: + SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%d", msgType, SCH_GET_TASK_STATUS(pTask)); + + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { int32_t code = 0; + int8_t oriStatus = SCH_GET_JOB_STATUS(pJob); + /* if (oriStatus == newStatus) { SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); @@ -80,6 +146,10 @@ int32_t schValidateStatus(SSchJob *pJob, int8_t oriStatus, int8_t newStatus) { } */ + SCH_SET_JOB_STATUS(pJob, newStatus); + + SCH_JOB_DLOG("status updated from %d to %d", oriStatus, newStatus); + return TSDB_CODE_SUCCESS; _return: @@ -137,6 +207,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + } else { + if (0 != pLevel->level) { + SCH_TASK_ELOG("invalid task info, level:%d, parentNum:%d", pLevel->level, parentNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } } for (int32_t n = 0; n < parentNum; ++n) { @@ -166,19 +241,28 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { return TSDB_CODE_SUCCESS; } -int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) { - pTask->plan = pPlan; - pTask->level = pLevel; - SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); - pTask->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); + +int32_t schRecordTaskSucceedNode(SSchTask *pTask) { + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx)); + + assert(NULL != addr); + + pTask->succeedAddr = *addr; return TSDB_CODE_SUCCESS; } -void schFreeTask(SSchTask* pTask) { - taosArrayDestroy(pTask->candidateAddrs); + +int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr) { + if (NULL == taosArrayPush(pTask->execAddrs, addr)) { + SCH_TASK_ELOG("taosArrayPush addr to execAddr list failed, errno:%d", errno); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + return TSDB_CODE_SUCCESS; } + int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) { int32_t code = 0; @@ -257,7 +341,7 @@ int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) { SSchTask task = {0}; SSchTask *pTask = &task; - schInitTask(pJob, &task, plan, pLevel); + SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel)); void *p = taosArrayPush(pLevel->subTasks, &task); if (NULL == p) { @@ -346,46 +430,67 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES); if (0 != code) { if (HASH_NODE_EXIST(code)) { - SCH_TASK_ELOG("task already in exec list, code:%x", code); + SCH_TASK_ELOG("task already in execTask list, code:%x", code); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - SCH_TASK_ELOG("taosHashPut task to exec list failed, errno:%d", errno); + SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_DLOG("task added to exec list, numOfTasks:%d", taosHashGetSize(pJob->execTasks)); + SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks)); return TSDB_CODE_SUCCESS; } -int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { - if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { - qError("remove task taskId:0x%"PRIx64" from execTasks failed, reqId:0x%"PRIx64, task->taskId, job->queryId); - return TSDB_CODE_SUCCESS; +int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) { + if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) { + SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask)); } - if (0 != taosHashPut(job->succTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { - qError("taosHashPut failed"); + int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES); + if (0 != code) { + if (HASH_NODE_EXIST(code)) { + *moved = true; + + SCH_TASK_ELOG("task already in succTask list, status:%d", SCH_GET_TASK_STATUS(pTask)); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } *moved = true; + + SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks)); return TSDB_CODE_SUCCESS; } -int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { - if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { - qWarn("remove task[%"PRIx64"] from execTasks failed, it may not exist", task->taskId); +int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) { + *moved = false; + + if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) { + SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask)); } - if (0 != taosHashPut(job->failTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { - qError("taosHashPut failed"); + int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES); + if (0 != code) { + if (HASH_NODE_EXIST(code)) { + *moved = true; + + SCH_TASK_WLOG("task already in failTask list, status:%d", SCH_GET_TASK_STATUS(pTask)); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } *moved = true; + + SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks)); return TSDB_CODE_SUCCESS; } @@ -393,6 +498,8 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { // TODO set retry or not based on task type/errCode/retry times/job status/available eps... // TODO if needRetry, set task retry info + // TODO set condidateIdx + // TODO record failed but tried task *needRetry = false; @@ -400,60 +507,70 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b } -int32_t schFetchFromRemote(SSchJob *job) { +int32_t schFetchFromRemote(SSchJob *pJob) { int32_t code = 0; - if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) { - qInfo("prior fetching not finished"); + if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) { + SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch)); return TSDB_CODE_SUCCESS; } - SCH_ERR_JRET(schBuildAndSendMsg(job, job->fetchTask, TDMT_VND_FETCH)); + if (atomic_load_ptr(&pJob->res)) + + SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH)); return TSDB_CODE_SUCCESS; _return: - atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); + + atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); return code; } -int32_t schProcessOnJobPartialSuccess(SSchJob *job) { - job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED; - - bool needFetch = job->userFetch; - if ((!SCH_JOB_NEED_FETCH(&job->attr)) && job->attr.syncSchedule) { - tsem_post(&job->rspSem); - } +// Note: no more error processing, handled in function internal +int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { + SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_FAILED)); - if (needFetch) { - SCH_ERR_RET(schFetchFromRemote(job)); + if (errCode) { + atomic_store_32(&pJob->errCode, errCode); } - return TSDB_CODE_SUCCESS; + if (atomic_load_8(&pJob->userFetch) || ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule)) { + tsem_post(&pJob->rspSem); + } + + SCH_ERR_RET(atomic_load_32(&pJob->errCode)); + + assert(0); } -int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { - int8_t status = SCH_GET_JOB_STATUS(pJob); - if (schValidateStatus(pJob, status, JOB_TASK_STATUS_FAILED)) { - SCH_ERR_RET(atomic_load_32(&pJob->errCode)); - } - - SCH_SET_JOB_STATUS(pJob, JOB_TASK_STATUS_FAILED); +// Note: no more error processing, handled in function internal +int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { + int32_t code = 0; - atomic_store_32(&pJob->errCode, errCode); - - atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); + SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_PARTIAL_SUCCEED)); - if (pJob->userFetch || ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule)) { + if ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule) { tsem_post(&pJob->rspSem); } + + if (atomic_load_8(&pJob->userFetch)) { + SCH_ERR_JRET(schFetchFromRemote(pJob)); + } return TSDB_CODE_SUCCESS; + +_return: + + SCH_ERR_RET(schProcessOnJobFailure(pJob, code)); + + SCH_RET(code); } + int32_t schProcessOnDataFetched(SSchJob *job) { atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); @@ -461,25 +578,77 @@ int32_t schProcessOnDataFetched(SSchJob *job) { } -int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { + +// Note: no more error processing, handled in function internal +int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { + bool needRetry = false; bool moved = false; + int32_t taskDone = 0; int32_t code = 0; + + SCH_TASK_DLOG("taskOnFailure, code:%x", errCode); - SCH_ERR_RET(schMoveTaskToSuccList(pJob, pTask, &moved)); - if (!moved) { - SCH_TASK_ELOG(" task may already moved, status:%d", pTask->status); + SCH_ERR_JRET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry)); + + if (!needRetry) { + SCH_TASK_ELOG("task failed and no more retry, code:%x", errCode); + + if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) { + code = schMoveTaskToFailList(pJob, pTask, &moved); + if (code && moved) { + SCH_ERR_RET(errCode); + } + } + + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED); + + if (SCH_TASK_NEED_WAIT_ALL(pTask)) { + SCH_LOCK(SCH_WRITE, &pTask->level->lock); + pTask->level->taskFailed++; + taskDone = pTask->level->taskSucceed + pTask->level->taskFailed; + SCH_UNLOCK(SCH_WRITE, &pTask->level->lock); + + atomic_store_32(&pJob->errCode, errCode); + + if (taskDone < pTask->level->taskNum) { + SCH_TASK_DLOG("not all tasks done, done:%d, all:%d", taskDone, pTask->level->taskNum); + SCH_ERR_RET(errCode); + } + } + } else { + // Note: no more error processing, already handled + SCH_ERR_RET(schLaunchTask(pJob, pTask)); + return TSDB_CODE_SUCCESS; } +_return: + + SCH_ERR_RET(schProcessOnJobFailure(pJob, errCode)); + + SCH_ERR_RET(errCode); +} + + +// Note: no more error processing, handled in function internal +int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { + bool moved = false; + int32_t code = 0; + SSchTask *pErrTask = pTask; + + SCH_TASK_DLOG("taskOnSuccess, status:%d", SCH_GET_TASK_STATUS(pTask)); + + code = schMoveTaskToSuccList(pJob, pTask, &moved); + if (code && moved) { + SCH_ERR_RET(code); + } + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED); + + SCH_ERR_JRET(schRecordTaskSucceedNode(pTask)); int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0; if (parentNum == 0) { - if (pTask->level->level != 0) { - SCH_TASK_ELOG("no parent task level error, level:%d", pTask->level->level); - SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } - int32_t taskDone = 0; if (SCH_TASK_NEED_WAIT_ALL(pTask)) { @@ -490,22 +659,23 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { if (taskDone < pTask->level->taskNum) { SCH_TASK_ELOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum); + return TSDB_CODE_SUCCESS; } else if (taskDone > pTask->level->taskNum) { assert(0); } if (pTask->level->taskFailed > 0) { - pJob->status = JOB_TASK_STATUS_FAILED; - SCH_ERR_RET(schProcessOnJobFailure(pJob, TSDB_CODE_QRY_APP_ERROR)); - - return TSDB_CODE_SUCCESS; + SCH_RET(schProcessOnJobFailure(pJob, 0)); + } else { + SCH_RET(schProcessOnJobPartialSuccess(pJob)); } } else { - pJob->resNode = pTask->execAddr; + pJob->resNode = pTask->succeedAddr; } pJob->fetchTask = pTask; + SCH_ERR_RET(schProcessOnJobPartialSuccess(pJob)); return TSDB_CODE_SUCCESS; @@ -522,14 +692,18 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { for (int32_t i = 0; i < parentNum; ++i) { SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i); - + pErrTask = par; + atomic_add_fetch_32(&par->childReady, 1); - code = qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &pTask->execAddr); + SCH_LOCK(SCH_WRITE, &par->lock); + code = qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &pTask->succeedAddr); if (code) { + SCH_UNLOCK(SCH_WRITE, &par->lock); SCH_TASK_ELOG("qSetSubplanExecutionNode failed, code:%x, templateId:%"PRIx64, code, pTask->plan->id.templateId); - SCH_ERR_RET(code); + SCH_ERR_JRET(code); } + SCH_UNLOCK(SCH_WRITE, &par->lock); if (SCH_TASK_READY_TO_LUNCH(par)) { SCH_ERR_RET(schLaunchTask(pJob, par)); @@ -537,158 +711,142 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { } return TSDB_CODE_SUCCESS; -} - -int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { - bool needRetry = false; - bool moved = false; - int32_t taskDone = 0; - SCH_ERR_RET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry)); - - if (!needRetry) { - SCH_TASK_ELOG("task failed and no more retry, code:%x", errCode); - - if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) { - SCH_ERR_RET(schMoveTaskToFailList(pJob, pTask, &moved)); - if (!moved) { - SCH_TASK_ELOG("task may already moved, status:%d", pTask->status); - } - } - - SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED); - - if (SCH_TASK_NEED_WAIT_ALL(pTask)) { - SCH_LOCK(SCH_WRITE, &pTask->level->lock); - pTask->level->taskFailed++; - taskDone = pTask->level->taskSucceed + pTask->level->taskFailed; - SCH_UNLOCK(SCH_WRITE, &pTask->level->lock); - - if (taskDone < pTask->level->taskNum) { - qDebug("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum); - return TSDB_CODE_SUCCESS; - } - } - - SCH_ERR_RET(schProcessOnJobFailure(pJob, errCode)); - return errCode; - } +_return: - SCH_ERR_RET(schLaunchTask(pJob, pTask)); + SCH_ERR_RET(schProcessOnTaskFailure(pJob, pErrTask, code)); - return TSDB_CODE_SUCCESS; + SCH_ERR_RET(code); } -int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { +int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; + SCH_ERR_JRET(schValidateTaskReceivedMsgType(pJob, pTask, msgType)); + switch (msgType) { case TDMT_VND_CREATE_TABLE_RSP: { - if (rspCode != TSDB_CODE_SUCCESS) { - SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); - } else { - code = schProcessOnTaskSuccess(job, task); - if (code) { - goto _task_error; + if (rspCode != TSDB_CODE_SUCCESS) { + SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); } - } + + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); - break; - } + break; + } case TDMT_VND_SUBMIT_RSP: { - if (rspCode != TSDB_CODE_SUCCESS || NULL == msg) { - SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); - } else { - SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg; - job->resNumOfRows += rsp->affectedRows; - - code = schProcessOnTaskSuccess(job, task); - if (code) { - goto _task_error; - } + #if 0 //TODO OPEN THIS + SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg; + + if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { + SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); + } + + pJob->resNumOfRows += rsp->affectedRows; + #else + if (rspCode != TSDB_CODE_SUCCESS) { + SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); } + #endif + + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); + break; } case TDMT_VND_QUERY_RSP: { SQueryTableRsp *rsp = (SQueryTableRsp *)msg; - if (rsp->code != TSDB_CODE_SUCCESS || NULL == msg) { - SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); - } else { - code = schBuildAndSendMsg(job, task, TDMT_VND_RES_READY); - if (code) { - goto _task_error; - } + if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { + SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rsp->code)); } + + SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY)); + break; } case TDMT_VND_RES_READY_RSP: { SResReadyRsp *rsp = (SResReadyRsp *)msg; - if (rsp->code != TSDB_CODE_SUCCESS || NULL == msg) { - SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); - } else { - code = schProcessOnTaskSuccess(job, task); - if (code) { - goto _task_error; - } + if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { + SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rsp->code)); } + + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); + break; } case TDMT_VND_FETCH_RSP: { - SCH_ERR_JRET(rspCode); SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; - job->res = rsp; - if (rsp) { - job->resNumOfRows = rsp->numOfRows; + if (rspCode != TSDB_CODE_SUCCESS || NULL == msg) { + SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); } + + atomic_store_ptr(&pJob->res, rsp); + atomic_store_32(&pJob->resNumOfRows, rsp->numOfRows); + + SCH_ERR_JRET(schProcessOnDataFetched(pJob)); - SCH_ERR_JRET(schProcessOnDataFetched(job)); break; } case TDMT_VND_DROP_TASK: { - + // SHOULD NEVER REACH HERE + assert(0); + break; } default: - qError("unknown msg type:%d received", msgType); - return TSDB_CODE_QRY_INVALID_INPUT; + SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%d", msgType, SCH_GET_TASK_STATUS(pTask)); + + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } return TSDB_CODE_SUCCESS; -_task_error: - SCH_ERR_JRET(schProcessOnTaskFailure(job, task, code)); - return TSDB_CODE_SUCCESS; - _return: - code = schProcessOnJobFailure(job, code); - return code; + + SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, code)); + + SCH_RET(code); } int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) { int32_t code = 0; SSchCallbackParam *pParam = (SSchCallbackParam *)param; + SSchJob *pJob = NULL; + SSchTask *pTask = NULL; SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); if (NULL == job || NULL == (*job)) { - qError("taosHashGet queryId:%"PRIx64" not exist", pParam->queryId); + qError("QID:%"PRIx64" taosHashGet queryId not exist", pParam->queryId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } - int32_t s = taosHashGetSize((*job)->execTasks); + pJob = *job; + + atomic_add_fetch_32(&pJob->ref, 1); + + int32_t s = taosHashGetSize(pJob->execTasks); assert(s != 0); - SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId)); + SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId)); if (NULL == task || NULL == (*task)) { - qError("failed to get task, taskId:%"PRIx64" not exist, reqId:0x%"PRIx64, pParam->taskId, (*job)->queryId); + qError("QID:%"PRIx64",TID:%"PRIx64" taosHashGet taskId not exist", pParam->queryId, pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } + + pTask = *task; + + SCH_TASK_DLOG("rsp msg received, type:%d, code:%x", msgType, rspCode); - schProcessRspMsg(*job, *task, msgType, pMsg->pData, pMsg->len, rspCode); + SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); _return: + + if (pJob) { + atomic_sub_fetch_32(&pJob->ref, 1); + } + tfree(param); SCH_RET(code); } @@ -715,7 +873,7 @@ int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) { SSchCallbackParam *pParam = (SSchCallbackParam *)param; - qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code); + qDebug("QID:%"PRIx64",TID:%"PRIx64" drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code); } int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { @@ -739,7 +897,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { *fp = schHandleDropCallback; break; default: - qError("unknown msg type:%d", msgType); + qError("unknown msg type for callback, msgType:%d", msgType); SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } @@ -771,12 +929,15 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t pMsgSendInfo->msgInfo.pData = msg; pMsgSendInfo->msgInfo.len = msgSize; pMsgSendInfo->msgType = msgType; - pMsgSendInfo->fp = fp; int64_t transporterId = 0; - SCH_ERR_JRET(asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo)); + code = asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo); + if (code) { + qError("QID:%"PRIx64 ",TID:%"PRIx64 " asyncSendMsgToServer failed, code:%x", qId, tId, code); + SCH_ERR_JRET(code); + } return TSDB_CODE_SUCCESS; @@ -798,15 +959,19 @@ void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { } } - -int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { +int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) { uint32_t msgSize = 0; void *msg = NULL; int32_t code = 0; + bool isCandidateAddr = false; SEpSet epSet; - - SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); - + + if (NULL == addr) { + addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx)); + + isCandidateAddr = true; + } + schConvertAddrToEpSet(addr, &epSet); switch (msgType) { @@ -887,18 +1052,25 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { break; } default: - SCH_TASK_ELOG("unknown msg type:%d", msgType); + SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); break; } + atomic_store_32(&pTask->lastMsgType, msgType); SCH_ERR_JRET(schAsyncSendMsg(pJob->transport, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize)); + if (isCandidateAddr) { + SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr)); + } + return TSDB_CODE_SUCCESS; _return: + atomic_store_32(&pTask->lastMsgType, -1); + tfree(msg); SCH_RET(code); } @@ -913,13 +1085,19 @@ static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { || status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING); } + +// Note: no more error processing, handled in function internal int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { int8_t status = 0; int32_t code = 0; if (schJobNeedToStop(pJob, &status)) { SCH_TASK_ELOG("no need to launch task cause of job status, job status:%d", status); - SCH_ERR_RET(atomic_load_32(&pJob->errCode)); + + code = atomic_load_32(&pJob->errCode); + SCH_ERR_RET(code); + + SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); } SSubplan *plan = pTask->plan; @@ -941,56 +1119,68 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING); } - SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, plan->msgType)); + SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); return TSDB_CODE_SUCCESS; _return: - code = schProcessOnTaskFailure(pJob, pTask, code); + SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, code)); SCH_RET(code); } int32_t schLaunchJob(SSchJob *pJob) { SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx); + + SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_EXECUTING)); for (int32_t i = 0; i < level->taskNum; ++i) { SSchTask *pTask = taosArrayGet(level->subTasks, i); SCH_ERR_RET(schLaunchTask(pJob, pTask)); } - - pJob->status = JOB_TASK_STATUS_EXECUTING; return TSDB_CODE_SUCCESS; } -void schDropJobAllTasks(SSchJob *job) { - void *pIter = taosHashIterate(job->succTasks, NULL); - while (pIter) { - SSchTask *task = *(SSchTask **)pIter; +void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) { + if (NULL == pTask->execAddrs) { + SCH_TASK_DLOG("no exec address, status:%d", SCH_GET_TASK_STATUS(pTask)); + return; + } - int32_t msgType = task->plan->msgType; - if (msgType == TDMT_VND_CREATE_TABLE || msgType == TDMT_VND_SUBMIT) { - break; - } + int32_t size = (int32_t)taosArrayGetSize(pTask->execAddrs); + + if (size <= 0) { + SCH_TASK_DLOG("empty exec address, status:%d", SCH_GET_TASK_STATUS(pTask)); + return; + } - schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK); - pIter = taosHashIterate(job->succTasks, pIter); - } + SQueryNodeAddr *addr = NULL; + for (int32_t i = 0; i < size; ++i) { + addr = (SQueryNodeAddr *)taosArrayGet(pTask->execAddrs, i); - pIter = taosHashIterate(job->failTasks, NULL); + schBuildAndSendMsg(pJob, pTask, addr, TDMT_VND_DROP_TASK); + } +} + +void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { + void *pIter = taosHashIterate(list, NULL); while (pIter) { - SSchTask *task = *(SSchTask **)pIter; + SSchTask *pTask = *(SSchTask **)pIter; - int32_t msgType = task->plan->msgType; - if (msgType == TDMT_VND_CREATE_TABLE || msgType == TDMT_VND_SUBMIT) { - break; + if (!SCH_TASK_NO_NEED_DROP(pTask)) { + schDropTaskOnExecutedNode(pJob, pTask); } + + pIter = taosHashIterate(list, pIter); + } +} - schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK); - pIter = taosHashIterate(job->succTasks, pIter); - } +void schDropJobAllTasks(SSchJob *pJob) { + schDropTaskInHashList(pJob, pJob->execTasks); + schDropTaskInHashList(pJob, pJob->succTasks); + schDropTaskInHashList(pJob, pJob->failTasks); } int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** job, bool syncSchedule) { @@ -1066,6 +1256,13 @@ _return: SCH_RET(code); } +int32_t schCancelJob(SSchJob *pJob) { + //TODO + + //TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST + +} + int32_t schedulerInit(SSchedulerCfg *cfg) { if (schMgmt.jobs) { @@ -1104,10 +1301,12 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, true)); + SSchJob *job = NULL; + + SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, (void **)&job, true)); + + *pJob = job; - SSchJob *job = *(SSchJob **)pJob; - pRes->code = atomic_load_32(&job->errCode); pRes->numOfRows = job->resNumOfRows; @@ -1119,93 +1318,129 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - return schExecJobImpl(transport, nodeList, pDag, pJob, false); + SSchJob *job = NULL; + + SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, (void **)&job, false)); + + *pJob = job; + + return TSDB_CODE_SUCCESS; } -int32_t scheduleFetchRows(void *pJob, void **data) { - if (NULL == pJob || NULL == data) { +int32_t scheduleFetchRows(void *job, void **data) { + if (NULL == job || NULL == data) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SSchJob *job = pJob; + SSchJob *pJob = job; int32_t code = 0; - if (!SCH_JOB_NEED_FETCH(&job->attr)) { - qError("no need to fetch data"); + atomic_add_fetch_32(&pJob->ref, 1); + + if (!SCH_JOB_NEED_FETCH(&pJob->attr)) { + SCH_JOB_ELOG("no need to fetch data, status:%d", SCH_GET_JOB_STATUS(pJob)); + atomic_sub_fetch_32(&pJob->ref, 1); SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } - if (job->status == JOB_TASK_STATUS_FAILED) { - job->res = NULL; - SCH_RET(atomic_load_32(&job->errCode)); + if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) { + SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch)); + atomic_sub_fetch_32(&pJob->ref, 1); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } - if (job->status == JOB_TASK_STATUS_SUCCEED) { - job->res = NULL; - return TSDB_CODE_SUCCESS; - } + int8_t status = SCH_GET_JOB_STATUS(pJob); - if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) { - qError("prior fetching not finished"); - SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + if (status == JOB_TASK_STATUS_FAILED) { + *data = atomic_load_ptr(&pJob->res); + atomic_store_ptr(&pJob->res, NULL); + SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); + } else if (status == JOB_TASK_STATUS_SUCCEED) { + *data = atomic_load_ptr(&pJob->res); + atomic_store_ptr(&pJob->res, NULL); + goto _return; + } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { + SCH_ERR_JRET(schFetchFromRemote(pJob)); } - if (job->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { - SCH_ERR_JRET(schFetchFromRemote(job)); - } + tsem_wait(&pJob->rspSem); - tsem_wait(&job->rspSem); + status = SCH_GET_JOB_STATUS(pJob); - if (job->status == JOB_TASK_STATUS_FAILED) { - code = atomic_load_32(&job->errCode); + if (status == JOB_TASK_STATUS_FAILED) { + code = atomic_load_32(&pJob->errCode); } - if (job->res && ((SRetrieveTableRsp *)job->res)->completed) { - job->status = JOB_TASK_STATUS_SUCCEED; + if (pJob->res && ((SRetrieveTableRsp *)pJob->res)->completed) { + SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED)); } - *data = job->res; - job->res = NULL; + while (true) { + *data = atomic_load_ptr(&pJob->res); + + if (*data != atomic_val_compare_exchange_ptr(&pJob->res, *data, NULL)) { + continue; + } + + break; + } _return: - atomic_val_compare_exchange_32(&job->userFetch, 1, 0); + + atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); + + atomic_sub_fetch_32(&pJob->ref, 1); SCH_RET(code); } -int32_t scheduleCancelJob(void *pJob) { - //TODO +int32_t scheduleCancelJob(void *job) { + SSchJob *pJob = (SSchJob *)job; - //TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST + atomic_add_fetch_32(&pJob->ref, 1); - return TSDB_CODE_SUCCESS; + int32_t code = schCancelJob(pJob); + + atomic_sub_fetch_32(&pJob->ref, 1); + + SCH_RET(code); } -void scheduleFreeJob(void *pJob) { - if (NULL == pJob) { +void scheduleFreeJob(void *job) { + if (NULL == job) { return; } - SSchJob *job = pJob; + SSchJob *pJob = job; - if (job->status > 0) { - if (0 != taosHashRemove(schMgmt.jobs, &job->queryId, sizeof(job->queryId))) { - qError("remove job:%"PRIx64"from mgmt failed", job->queryId); // maybe already freed - return; - } + if (0 != taosHashRemove(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId))) { + SCH_JOB_ELOG("taosHashRemove job from list failed, may already freed, pJob:%p", pJob); + return; + } - if (job->status == JOB_TASK_STATUS_EXECUTING) { - scheduleCancelJob(pJob); + while (true) { + int32_t ref = atomic_load_32(&pJob->ref); + if (0 == ref) { + break; + } else if (ref > 0) { + usleep(1); + } else { + assert(0); } + } - schDropJobAllTasks(job); + if (pJob->status == JOB_TASK_STATUS_EXECUTING) { + schCancelJob(pJob); } - job->subPlans = NULL; // it is a reference to pDag->pSubplans - int32_t numOfLevels = taosArrayGetSize(job->levels); + schDropJobAllTasks(pJob); + + pJob->subPlans = NULL; // it is a reference to pDag->pSubplans + + int32_t numOfLevels = taosArrayGetSize(pJob->levels); for(int32_t i = 0; i < numOfLevels; ++i) { - SSchLevel *pLevel = taosArrayGet(job->levels, i); + SSchLevel *pLevel = taosArrayGet(pJob->levels, i); int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); for(int32_t j = 0; j < numOfTasks; ++j) { @@ -1216,12 +1451,15 @@ void scheduleFreeJob(void *pJob) { taosArrayDestroy(pLevel->subTasks); } - taosHashCleanup(job->execTasks); - taosHashCleanup(job->failTasks); - taosHashCleanup(job->succTasks); - taosArrayDestroy(job->levels); + taosHashCleanup(pJob->execTasks); + taosHashCleanup(pJob->failTasks); + taosHashCleanup(pJob->succTasks); + + taosArrayDestroy(pJob->levels); + + tfree(pJob->res); - tfree(job); + tfree(pJob); } void schedulerDestroy(void) { diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 790778b736..9bc08fa24b 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -36,7 +36,7 @@ namespace { -extern "C" int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); +extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); void schtBuildQueryDag(SQueryDag *dag) { uint64_t qId = 0x0000000000000001; @@ -188,7 +188,7 @@ void *schtSendRsp(void *param) { SShellSubmitRspMsg rsp = {0}; rsp.affectedRows = 10; - schProcessRspMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0); + schHandleResponseMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0); pIter = taosHashIterate(job->execTasks, pIter); } @@ -233,7 +233,7 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SQueryTableRsp rsp = {0}; - code = schProcessRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); @@ -244,7 +244,7 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SResReadyRsp rsp = {0}; - code = schProcessRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); @@ -255,7 +255,7 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SQueryTableRsp rsp = {0}; - code = schProcessRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); @@ -266,7 +266,7 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SResReadyRsp rsp = {0}; - code = schProcessRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); @@ -275,7 +275,7 @@ TEST(queryTest, normalCase) { SRetrieveTableRsp rsp = {0}; rsp.completed = 1; rsp.numOfRows = 10; - code = schProcessRspMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); -- GitLab