提交 8dd9cc74 编写于 作者: D dapan

scheduler job code

上级 d61e2bd1
......@@ -105,6 +105,8 @@ typedef struct SSubplan {
} SSubplan;
typedef struct SQueryDag {
uint64_t queryId;
int32_t numOfSubplans;
SArray *pSubplans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
} SQueryDag;
......
......@@ -44,14 +44,16 @@ typedef struct SSchedulerMgmt {
} SSchedulerMgmt;
typedef struct SQueryTask {
uint64_t taskId; // task id
SSubplan *plan; // subplan
char *msg; // operator tree
int8_t status; // task status
SEpAddr execAddr; // task actual executed node address
SQueryProfileSummary summary; // task execution summary
SArray *childern; // the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask*
uint64_t taskId; // task id
SSubplan *plan; // subplan
char *msg; // operator tree
int8_t status; // task status
SEpAddr execAddr; // task actual executed node address
SQueryProfileSummary summary; // task execution summary
int32_t childReady; // child task ready number
SArray *childSrcEp; // child Eps, element is SEpAddr
SArray *childern; // the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask*
} SQueryTask;
typedef struct SQueryLevel {
......@@ -67,11 +69,16 @@ typedef struct SQueryJob {
int32_t levelIdx;
int8_t status;
SQueryProfileSummary summary;
SArray *levels; // Element is SQueryLevel, starting from 0.
SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
SArray *levels; // Element is SQueryLevel, starting from 0.
SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
} SQueryJob;
#define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE
#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__)
#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
......
......@@ -53,7 +53,7 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_
int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
for (int32_t i = 0; i < job->levelNum; ++i) {
SQueryLevel *level = taosArrayGet(job->levels, i);
for (int32_t m = 0; m < level->taskNum; ++m) {
SQueryTask *task = taosArrayGet(level->subTasks, m);
SSubplan *plan = task->plan;
......@@ -106,12 +106,31 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
}
}
SQueryLevel *level = taosArrayGet(job->levels, 0);
if (level->taskNum > 1) {
qError("invalid plan info, level 0, taskNum:%d", level->taskNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SQueryTask *task = taosArrayGet(level->subTasks, 0);
if (task->parents && taosArrayGetSize(task->parents) > 0) {
qError("invalid plan info, level 0, parentNum:%d", (int32_t)taosArrayGetSize(task->parents));
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
return TSDB_CODE_SUCCESS;
}
int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
int32_t code = 0;
if (dag->numOfSubplans <= 0) {
qError("invalid subplan num:%d", dag->numOfSubplans);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans);
if (levelNum <= 0) {
qError("invalid level num:%d", levelNum);
......@@ -213,47 +232,126 @@ int32_t schAsyncLaunchTask(SQueryJob *job, SQueryTask *task) {
}
int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCode, bool *needRetry) {
}
int32_t schProcessOnJobSuccess(SQueryJob *job) {
}
int32_t schProcessOnJobFailure(SQueryJob *job) {
}
int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
bool moved = false;
SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved));
if (!moved) {
qWarn("task[%d] already moved", task->taskId);
return TSDB_CODE_SUCCESS;
}
int32_t parentNum = (int32_t)taosArrayGetSize(task->parents);
if (parentNum == 0) {
if (task->plan->level != 0) {
qError("level error");
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SCH_ERR_RET(schProcessOnJobSuccess());
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < parentNum; ++i) {
SQueryTask *par = taosArrayGet(task->parents, i);
++par->childReady;
if (NULL == taosArrayPush(par->childSrcEp, &task->execAddr)) {
qError("taosArrayPush failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (SCH_TASK_READY_TO_LUNCH(par)) {
SCH_ERR_RET(schTaskRun(job, task));
}
}
return TSDB_CODE_SUCCESS;
}
int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCode) {
bool needRetry = false;
SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry));
if (!needRetry) {
SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode);
job->status = SCH_STATUS_FAILED;
SCH_ERR_RET(schProcessOnJobFailure(job));
return TSDB_CODE_SUCCESS;
}
SCH_ERR_RET(schTaskRun(job, task));
return TSDB_CODE_SUCCESS;
}
int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) {
if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
qError("taosHashPut failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
return TSDB_CODE_SUCCESS;
}
int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) {
if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId);
return TSDB_CODE_SUCCESS
}
if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
qError("taosHashPut failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
*moved = true;
return TSDB_CODE_SUCCESS;
}
int32_t schTaskRun(SQueryJob *job, SQueryTask *task) {
SSubplan *plan = task->plan;
switch (task->status) {
case SCH_STATUS_NOT_START:
SCH_ERR_RET(qSubPlanToString(plan, &task->msg));
if (plan->execEpSet.numOfEps <= 0) {
SCH_ERR_RET(schAvailableEpSet(&plan->execEpSet));
}
SCH_ERR_RET(schAsyncLaunchTask(job, task));
break;
case SCH_STATUS_EXECUTING:
break;
case SCH_STATUS_SUCCEED:
break;
default:
SCH_JOB_ERR_LOG("invalid level status:%d, levelIdx:%d", job->status, job->levelIdx);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
SCH_ERR_RET(qSubPlanToString(plan, &task->msg));
if (plan->execEpSet.numOfEps <= 0) {
SCH_ERR_RET(schAvailableEpSet(&plan->execEpSet));
}
SCH_ERR_RET(schAsyncLaunchTask(job, task));
SCH_ERR_RET(schPushTaskToExecList(job, task))
return TSDB_CODE_SUCCESS;
}
int32_t schJobRun(SQueryJob *job) {
bool cont = true;
while (cont) {
switch (job->status) {
case SCH_STATUS_NOT_START:
case SCH_STATUS_EXECUTING:
break;
default:
SCH_JOB_ERR_LOG("invalid job status:%d", job->status);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
SQueryLevel *level = taosArrayGet(job->levels, job->levelIdx);
for (int32_t i = 0; i < level->taskNum; ++i) {
SQueryTask *task = taosArrayGet(level->subTasks, i);
SCH_ERR_RET(schTaskRun(job, task));
}
return TSDB_CODE_SUCCESS;
}
......@@ -281,6 +379,18 @@ int32_t scheduleQueryJob(void *pRpc, SQueryDag* pDag, void** pJob) {
SCH_ERR_JRET(schValidateAndBuildJob(pDag, job));
job->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == job->execTasks) {
qError("taosHashInit %d failed", pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
job->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == job->succTasks) {
qError("taosHashInit %d failed", pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_ERR_JRET(schJobRun(job));
*(SQueryJob **)pJob = job;
......@@ -299,8 +409,10 @@ int32_t scheduleFetchRows(void *pRpc, void *pJob, void *data);
int32_t scheduleCancelJob(void *pRpc, void *pJob);
void scheduleFreeJob(void *pJob) {
void scheduleFreeJob(void *job) {
if (NULL == job) {
return;
}
}
void schedulerDestroy(void) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册