diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index e1eef1c3f51174a79c33080c90451f386f628d71..da88366f11cb9ce4598605a940182da9796a19ab 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -87,13 +87,14 @@ typedef struct SUseDbOutput { SDBVgroupInfo dbVgroup; } SUseDbOutput; -typedef enum { +enum { META_TYPE_NON_TABLE = 1, META_TYPE_CTABLE, META_TYPE_TABLE, - META_TYPE_BOTH_TABLE, + META_TYPE_BOTH_TABLE }; + typedef struct STableMetaOutput { int32_t metaType; char ctbFname[TSDB_TABLE_FNAME_LEN]; diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 0982411cf575fb25e462d9c987dcc0422b3b4712..014385c3a66be0518120db3c133dfda479ca5c0a 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -97,7 +97,7 @@ typedef struct SSchJob { SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* int8_t status; - SEpAddr resEp; + SQueryNodeAddr resNode; tsem_t rspSem; int32_t userFetch; int32_t remoteFetch; @@ -109,7 +109,7 @@ typedef struct SSchJob { } SSchJob; #define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE -#define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE +#define SCH_TASK_READY_TO_LUNCH(task) (atomic_load_32(&(task)->childReady) >= taosArrayGetSize((task)->children)) #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN) #define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY) @@ -130,8 +130,8 @@ typedef struct SSchJob { #define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) -extern int32_t schLaunchTask(SSchJob *job, SSchTask *task); -extern int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType); +static int32_t schLaunchTask(SSchJob *job, SSchTask *task); +static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType); #ifdef __cplusplus } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 300f7154a48b00a73deb7072a3ed4373b9630780..6c6d5d838584df1ef63350c5cef7574ad2aef768 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -20,7 +20,7 @@ static SSchedulerMgmt schMgmt = {0}; -static int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { +int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { for (int32_t i = 0; i < pJob->levelNum; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); @@ -31,6 +31,11 @@ static int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { int32_t parentNum = pPlan->pParents ? (int32_t)taosArrayGetSize(pPlan->pParents) : 0; if (childNum > 0) { + if (pJob->levelIdx == pLevel->level) { + SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + pTask->children = taosArrayInit(childNum, POINTER_BYTES); if (NULL == pTask->children) { SCH_TASK_ELOG("taosArrayInit %d children failed", childNum); @@ -53,6 +58,11 @@ static int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { } if (parentNum > 0) { + if (0 == pLevel->level) { + SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + pTask->parents = taosArrayInit(parentNum, POINTER_BYTES); if (NULL == pTask->parents) { SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum); @@ -84,19 +94,10 @@ static int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - for (int32_t i = 0; i < pLevel->taskNum; ++i) { - SSchTask *pTask = taosArrayGet(pLevel->subTasks, i); - - if (pTask->parents && taosArrayGetSize(pTask->parents) > 0) { - SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", (int32_t)taosArrayGetSize(pTask->parents)); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); - } - } - return TSDB_CODE_SUCCESS; } -static int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) { +int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) { pTask->plan = pPlan; pTask->level = pLevel; pTask->status = JOB_TASK_STATUS_NOT_START; @@ -105,11 +106,11 @@ static int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSch return TSDB_CODE_SUCCESS; } -static void schFreeTask(SSchTask* pTask) { +void schFreeTask(SSchTask* pTask) { taosArrayDestroy(pTask->candidateAddrs); } -static int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) { +int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) { int32_t code = 0; pJob->queryId = pDag->queryId; @@ -217,41 +218,48 @@ _return: SCH_RET(code); } -static int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) { - if (task->candidateAddrs) { +int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { + if (NULL != pTask->candidateAddrs) { return TSDB_CODE_SUCCESS; } - task->candidateIdx = 0; - task->candidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); - if (NULL == task->candidateAddrs) { - qError("taosArrayInit failed"); + pTask->candidateIdx = 0; + pTask->candidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); + if (NULL == pTask->candidateAddrs) { + SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CONDIDATE_EP_NUM); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - if (task->plan->execNode.numOfEps > 0) { - if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) { - qError("taosArrayPush failed"); + if (pTask->plan->execNode.numOfEps > 0) { + if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) { + SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + SCH_TASK_DLOG("use execNode from plan as candidate addr"); + return TSDB_CODE_SUCCESS; } int32_t addNum = 0; - int32_t nodeNum = taosArrayGetSize(job->nodeList); + int32_t nodeNum = taosArrayGetSize(pJob->nodeList); for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { - SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i); + SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i); - if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) { - qError("taosArrayPush failed"); + if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) { + SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } ++addNum; } + if (addNum <= 0) { + SCH_TASK_ELOG("no available execNode as condidate addr, nodeNum:%d", nodeNum); + return TSDB_CODE_QRY_INVALID_INPUT; + } + /* for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); @@ -264,7 +272,7 @@ static int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) { return TSDB_CODE_SUCCESS; } -static int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { +int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { if (0 != taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) { qError("failed to add new task, taskId:0x%"PRIx64", reqId:0x"PRIx64", out of memory", pJob->queryId); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -275,7 +283,7 @@ static int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } -static int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { +int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { qError("remove task taskId:0x%"PRIx64" from execTasks failed, reqId:0x%"PRIx64, task->taskId, job->queryId); return TSDB_CODE_SUCCESS; @@ -291,7 +299,7 @@ static int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) return TSDB_CODE_SUCCESS; } -static int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { +int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { qWarn("remove task[%"PRIx64"] from execTasks failed, it may not exist", task->taskId); } @@ -306,7 +314,7 @@ static int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) return TSDB_CODE_SUCCESS; } -static int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { +int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { // TODO set retry or not based on task type/errCode/retry times/job status/available eps... // TODO if needRetry, set task retry info @@ -316,7 +324,7 @@ static int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t err } -static int32_t schFetchFromRemote(SSchJob *job) { +int32_t schFetchFromRemote(SSchJob *job) { int32_t code = 0; if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) { @@ -335,7 +343,7 @@ _return: } -static int32_t schProcessOnJobPartialSuccess(SSchJob *job) { +int32_t schProcessOnJobPartialSuccess(SSchJob *job) { job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED; bool needFetch = job->userFetch; @@ -350,7 +358,7 @@ static int32_t schProcessOnJobPartialSuccess(SSchJob *job) { return TSDB_CODE_SUCCESS; } -static int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) { +int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) { job->status = JOB_TASK_STATUS_FAILED; job->errCode = errCode; @@ -363,57 +371,58 @@ static int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) { return TSDB_CODE_SUCCESS; } -static int32_t schProcessOnDataFetched(SSchJob *job) { +int32_t schProcessOnDataFetched(SSchJob *job) { atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); tsem_post(&job->rspSem); } -static int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { +int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { bool moved = false; - SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); + SCH_ERR_RET(schMoveTaskToSuccList(pJob, pTask, &moved)); if (!moved) { - SCH_TASK_ELOG(" task may already moved, status:%d", task->status); + SCH_TASK_ELOG(" task may already moved, status:%d", pTask->status); return TSDB_CODE_SUCCESS; } - task->status = JOB_TASK_STATUS_SUCCEED; + pTask->status = JOB_TASK_STATUS_SUCCEED; - int32_t parentNum = task->parents ? (int32_t)taosArrayGetSize(task->parents) : 0; + int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0; if (parentNum == 0) { - if (task->plan->level != 0) { - qError("level error"); + if (pTask->level->level != 0) { + SCH_TASK_ELOG("no parent task level error, level:%d", pTask->level->level); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } int32_t taskDone = 0; - if (SCH_TASK_NEED_WAIT_ALL(task)) { - SCH_LOCK(SCH_WRITE, &task->level->lock); - task->level->taskSucceed++; - taskDone = task->level->taskSucceed + task->level->taskFailed; - SCH_UNLOCK(SCH_WRITE, &task->level->lock); + if (SCH_TASK_NEED_WAIT_ALL(pTask)) { + SCH_LOCK(SCH_WRITE, &pTask->level->lock); + pTask->level->taskSucceed++; + taskDone = pTask->level->taskSucceed + pTask->level->taskFailed; + SCH_UNLOCK(SCH_WRITE, &pTask->level->lock); - if (taskDone < task->level->taskNum) { - qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum); + if (taskDone < pTask->level->taskNum) { + SCH_TASK_ELOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum); return TSDB_CODE_SUCCESS; + } else if (taskDone > pTask->level->taskNum) { + assert(0); } - if (task->level->taskFailed > 0) { - job->status = JOB_TASK_STATUS_FAILED; - SCH_ERR_RET(schProcessOnJobFailure(job, TSDB_CODE_QRY_APP_ERROR)); + if (pTask->level->taskFailed > 0) { + pJob->status = JOB_TASK_STATUS_FAILED; + SCH_ERR_RET(schProcessOnJobFailure(pJob, TSDB_CODE_QRY_APP_ERROR)); return TSDB_CODE_SUCCESS; } } else { - strncpy(job->resEp.fqdn, task->execAddr.epAddr[task->execAddr.inUse].fqdn, sizeof(job->resEp.fqdn)); - job->resEp.port = task->execAddr.epAddr[task->execAddr.inUse].port; + pJob->resNode = pTask->execAddr; } - job->fetchTask = task; - SCH_ERR_RET(schProcessOnJobPartialSuccess(job)); + pJob->fetchTask = pTask; + SCH_ERR_RET(schProcessOnJobPartialSuccess(pJob)); return TSDB_CODE_SUCCESS; } @@ -428,58 +437,58 @@ static int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { */ for (int32_t i = 0; i < parentNum; ++i) { - SSchTask *par = *(SSchTask **)taosArrayGet(task->parents, i); + SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i); - ++par->childReady; + atomic_add_fetch_32(&par->childReady, 1); - SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, task->plan->id.templateId, &task->execAddr)); + SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &pTask->execAddr)); if (SCH_TASK_READY_TO_LUNCH(par)) { - SCH_ERR_RET(schLaunchTask(job, par)); + SCH_ERR_RET(schLaunchTask(pJob, par)); } } return TSDB_CODE_SUCCESS; } -static int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { +int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { bool needRetry = false; bool moved = false; int32_t taskDone = 0; - SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry)); + SCH_ERR_RET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry)); if (!needRetry) { SCH_TASK_ELOG("task failed[%x], no more retry", errCode); - SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved)); + SCH_ERR_RET(schMoveTaskToFailList(pJob, pTask, &moved)); if (!moved) { - SCH_TASK_ELOG("task may already moved, status:%d", task->status); + SCH_TASK_ELOG("task may already moved, status:%d", pTask->status); } - if (SCH_TASK_NEED_WAIT_ALL(task)) { - SCH_LOCK(SCH_WRITE, &task->level->lock); - task->level->taskFailed++; - taskDone = task->level->taskSucceed + task->level->taskFailed; - SCH_UNLOCK(SCH_WRITE, &task->level->lock); + if (SCH_TASK_NEED_WAIT_ALL(pTask)) { + SCH_LOCK(SCH_WRITE, &pTask->level->lock); + pTask->level->taskFailed++; + taskDone = pTask->level->taskSucceed + pTask->level->taskFailed; + SCH_UNLOCK(SCH_WRITE, &pTask->level->lock); - if (taskDone < task->level->taskNum) { - qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum); + if (taskDone < pTask->level->taskNum) { + qDebug("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum); return TSDB_CODE_SUCCESS; } } - job->status = JOB_TASK_STATUS_FAILED; - SCH_ERR_RET(schProcessOnJobFailure(job, errCode)); + pJob->status = JOB_TASK_STATUS_FAILED; + SCH_ERR_RET(schProcessOnJobFailure(pJob, errCode)); return TSDB_CODE_SUCCESS; } - SCH_ERR_RET(schLaunchTask(job, task)); + SCH_ERR_RET(schLaunchTask(pJob, pTask)); return TSDB_CODE_SUCCESS; } -static int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { +int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; switch (msgType) { @@ -567,7 +576,7 @@ _return: } -static int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) { +int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) { int32_t code = 0; SSchCallbackParam *pParam = (SSchCallbackParam *)param; @@ -593,32 +602,32 @@ _return: SCH_RET(code); } -static int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code); } -static int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code); } -static int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code); } -static int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code); } -static int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code); } -static int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) { SSchCallbackParam *pParam = (SSchCallbackParam *)param; qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code); } -static int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { +int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { switch (msgType) { case TDMT_VND_CREATE_TABLE: *fp = schHandleCreateTableCallback; @@ -647,7 +656,7 @@ static int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { } -static int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) { +int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) { int32_t code = 0; SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { @@ -686,7 +695,7 @@ _return: SCH_RET(code); } -static void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { +void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { epSet->inUse = addr->inUse; epSet->numOfEps = addr->numOfEps; @@ -697,7 +706,7 @@ static void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { } -static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { +int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { uint32_t msgSize = 0; void *msg = NULL; int32_t code = 0; @@ -810,37 +819,40 @@ _return: } -static int32_t schLaunchTask(SSchJob *job, SSchTask *task) { - SSubplan *plan = task->plan; - SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen)); - SCH_ERR_RET(schSetTaskCandidateAddrs(job, task)); +int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { + SSubplan *plan = pTask->plan; + + SCH_ERR_RET(qSubPlanToString(plan, &pTask->msg, &pTask->msgLen)); + SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask)); - if (NULL == task->candidateAddrs || taosArrayGetSize(task->candidateAddrs) <= 0) { - SCH_TASK_ELOG("no valid candidate node for task:%"PRIx64, task->taskId); + if (NULL == pTask->candidateAddrs || taosArrayGetSize(pTask->candidateAddrs) <= 0) { + SCH_TASK_ELOG("no valid candidate node for task:%"PRIx64, pTask->taskId); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } // NOTE: race condition: the task should be put into the hash table before send msg to server - SCH_ERR_RET(schPushTaskToExecList(job, task)); - SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType)); + SCH_ERR_RET(schPushTaskToExecList(pJob, pTask)); + SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, plan->msgType)); - task->status = JOB_TASK_STATUS_EXECUTING; + pTask->status = JOB_TASK_STATUS_EXECUTING; + return TSDB_CODE_SUCCESS; } -static int32_t schLaunchJob(SSchJob *job) { - SSchLevel *level = taosArrayGet(job->levels, job->levelIdx); +int32_t schLaunchJob(SSchJob *pJob) { + SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx); + for (int32_t i = 0; i < level->taskNum; ++i) { - SSchTask *task = taosArrayGet(level->subTasks, i); - SCH_ERR_RET(schLaunchTask(job, task)); + SSchTask *pTask = taosArrayGet(level->subTasks, i); + SCH_ERR_RET(schLaunchTask(pJob, pTask)); } - job->status = JOB_TASK_STATUS_EXECUTING; + pJob->status = JOB_TASK_STATUS_EXECUTING; return TSDB_CODE_SUCCESS; } -static void schDropJobAllTasks(SSchJob *job) { +void schDropJobAllTasks(SSchJob *job) { void *pIter = taosHashIterate(job->succTasks, NULL); while (pIter) { SSchTask *task = *(SSchTask **)pIter; @@ -868,7 +880,7 @@ static void schDropJobAllTasks(SSchJob *job) { } } -static int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** job, bool syncSchedule) { +int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** job, bool syncSchedule) { if (nodeList && taosArrayGetSize(nodeList) <= 0) { qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId); } @@ -876,7 +888,7 @@ static int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag int32_t code = 0; SSchJob *pJob = calloc(1, sizeof(SSchJob)); if (NULL == pJob) { - qError("QID:%"PRIx64" calloc %d failed", sizeof(SSchJob)); + qError("QID:%"PRIx64" calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob)); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -909,23 +921,27 @@ static int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag code = taosHashPut(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId), &pJob, POINTER_BYTES); if (0 != code) { if (HASH_NODE_EXIST(code)) { - SCH_JOB_ELOG("job already exist, type:%d", pJob->attr.queryJob); + SCH_JOB_ELOG("job already exist, isQueryJob:%d", pJob->attr.queryJob); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } else { - qError("taosHashPut queryId:0x%"PRIx64" failed", pJob->queryId); - SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + SCH_JOB_ELOG("taosHashPut job failed, errno:%d", errno); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } pJob->status = JOB_TASK_STATUS_NOT_START; + SCH_ERR_JRET(schLaunchJob(pJob)); *(SSchJob **)job = pJob; - + if (syncSchedule) { + SCH_JOB_DLOG("will wait for rsp now"); tsem_wait(&pJob->rspSem); } + SCH_JOB_DLOG("job exec done"); + return TSDB_CODE_SUCCESS; _return: @@ -960,12 +976,12 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - if (taosGetSystemUUID(&schMgmt.sId, sizeof(schMgmt.sId))) { + if (taosGetSystemUUID((char *)&schMgmt.sId, sizeof(schMgmt.sId))) { qError("generate schdulerId failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR); } - qInfo("scheduler %"PRIx64" initizlized, maxJob:%u", schMgmt.cfg.maxJobNum); + qInfo("scheduler %"PRIx64" initizlized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum); return TSDB_CODE_SUCCESS; }