未验证 提交 94f976ca 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #12860 from taosdata/feature/qnode

feat: support schedule based on level
......@@ -132,7 +132,7 @@ typedef struct SSchLevel {
int32_t taskSucceed;
int32_t taskNum;
int32_t taskLaunchedNum;
SHashObj *flowCtrl; // key is ep, element is SSchFlowControl
int32_t taskDoneNum;
SArray *subTasks; // Element is SQueryTask
} SSchLevel;
......@@ -175,11 +175,13 @@ typedef struct SSchJob {
SArray *levels; // starting from 0. SArray<SSchLevel>
SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler
SArray *dataSrcTasks; // SArray<SQueryTask*>
int32_t levelIdx;
SEpSet dataSrcEps;
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
SHashObj *flowCtrl; // key is ep, element is SSchFlowControl
SExplainCtx *explainCtx;
int8_t status;
......@@ -200,7 +202,7 @@ typedef struct SSchJob {
extern SSchedulerMgmt schMgmt;
#define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
#define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
#define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0)
......@@ -223,7 +225,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEAF_TASK(_job, _task) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
......@@ -261,7 +263,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task);
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
SSchJob *schAcquireJob(int64_t refId);
int32_t schReleaseJob(int64_t refId);
void schFreeFlowCtrl(SSchLevel *pLevel);
void schFreeFlowCtrl(SSchJob *pJob);
int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
......
......@@ -19,13 +19,13 @@
#include "catalog.h"
#include "tref.h"
void schFreeFlowCtrl(SSchLevel *pLevel) {
if (NULL == pLevel->flowCtrl) {
void schFreeFlowCtrl(SSchJob *pJob) {
if (NULL == pJob->flowCtrl) {
return;
}
SSchFlowControl *ctrl = NULL;
void *pIter = taosHashIterate(pLevel->flowCtrl, NULL);
void *pIter = taosHashIterate(pJob->flowCtrl, NULL);
while (pIter) {
ctrl = (SSchFlowControl *)pIter;
......@@ -33,11 +33,11 @@ void schFreeFlowCtrl(SSchLevel *pLevel) {
taosArrayDestroy(ctrl->taskList);
}
pIter = taosHashIterate(pLevel->flowCtrl, pIter);
pIter = taosHashIterate(pJob->flowCtrl, pIter);
}
taosHashCleanup(pLevel->flowCtrl);
pLevel->flowCtrl = NULL;
taosHashCleanup(pJob->flowCtrl);
pJob->flowCtrl = NULL;
}
int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
......@@ -47,9 +47,9 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
}
int32_t sum = 0;
for (int32_t i = 0; i < pLevel->taskNum; ++i) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);
int32_t taskNum = taosArrayGetSize(pJob->dataSrcTasks);
for (int32_t i = 0; i < taskNum; ++i) {
SSchTask *pTask = *(SSchTask **)taosArrayGet(pJob->dataSrcTasks, i);
sum += pTask->plan->execNodeStat.tableNum;
}
......@@ -59,9 +59,9 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
return TSDB_CODE_SUCCESS;
}
pLevel->flowCtrl = taosHashInit(pLevel->taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pLevel->flowCtrl) {
SCH_JOB_ELOG("taosHashInit %d flowCtrl failed", pLevel->taskNum);
pJob->flowCtrl = taosHashInit(pJob->taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pJob->flowCtrl) {
SCH_JOB_ELOG("taosHashInit %d flowCtrl failed", pJob->taskNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -78,7 +78,7 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) {
int32_t code = 0;
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp));
ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
if (NULL == ctrl) {
SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
......@@ -110,11 +110,11 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
do {
ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp));
ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
if (NULL == ctrl) {
SSchFlowControl nctrl = {.tableNumSum = pTask->plan->execNodeStat.tableNum, .execTaskNum = 1};
code = taosHashPut(pLevel->flowCtrl, ep, sizeof(SEp), &nctrl, sizeof(nctrl));
code = taosHashPut(pJob->flowCtrl, ep, sizeof(SEp), &nctrl, sizeof(nctrl));
if (code) {
if (HASH_NODE_EXIST(code)) {
continue;
......@@ -273,10 +273,9 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));
SSchLevel *pLevel = pTask->level;
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
SSchFlowControl *ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp));
SSchFlowControl *ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
if (NULL == ctrl) {
SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
......
......@@ -391,6 +391,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("children info, the %d child TID %" PRIx64, n, (*childTask)->taskId);
}
if (parentNum > 0) {
......@@ -423,6 +425,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("parents info, the %d parent TID %" PRIx64, n, (*parentTask)->taskId);
}
SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
......@@ -464,6 +468,17 @@ int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *ad
return TSDB_CODE_SUCCESS;
}
int32_t schRecordQueryDataSrc(SSchJob *pJob, SSchTask *pTask) {
if (!SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
return TSDB_CODE_SUCCESS;
}
taosArrayPush(pJob->dataSrcTasks, &pTask);
return TSDB_CODE_SUCCESS;
}
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
int32_t code = 0;
pJob->queryId = pDag->queryId;
......@@ -473,6 +488,11 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
pJob->dataSrcTasks = taosArrayInit(pDag->numOfSubplans, POINTER_BYTES);
if (NULL == pJob->dataSrcTasks) {
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
if (levelNum <= 0) {
SCH_JOB_ELOG("invalid level num:%d", levelNum);
......@@ -551,6 +571,8 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_ERR_JRET(schRecordQueryDataSrc(pJob, p));
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -629,6 +651,17 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
if (code) {
SCH_TASK_ELOG("task failed to rm from execTask list, code:%x", code);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
return TSDB_CODE_SUCCESS;
}
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
if (0 != code) {
......@@ -774,6 +807,9 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
SCH_ERR_RET(schRemoveTaskFromExecList(pJob, pTask));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));
SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
......@@ -947,6 +983,32 @@ _return:
SCH_RET(schProcessOnJobFailure(pJob, errCode));
}
int32_t schLaunchNextLevelTasks(SSchJob *pJob, SSchTask *pTask) {
if (!SCH_IS_QUERY_JOB(pJob)) {
return TSDB_CODE_SUCCESS;
}
SSchLevel *pLevel = pTask->level;
int32_t doneNum = atomic_add_fetch_32(&pLevel->taskDoneNum, 1);
if (doneNum == pLevel->taskNum) {
pJob->levelIdx--;
pLevel = taosArrayGet(pJob->levels, pJob->levelIdx);
for (int32_t i = 0; i < pLevel->taskNum; ++i) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);
if (pTask->children && taosArrayGetSize(pTask->children) > 0) {
continue;
}
SCH_ERR_RET(schLaunchTask(pJob, pTask));
}
}
return TSDB_CODE_SUCCESS;
}
// Note: no more task error processing, handled in function internal
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
bool moved = false;
......@@ -1015,11 +1077,13 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
qSetSubplanExecutionNode(par->plan, pTask->plan->id.groupId, &source);
SCH_UNLOCK(SCH_WRITE, &par->lock);
if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) {
SCH_ERR_RET(schLaunchTaskImpl(pJob, par));
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, par)) {
SCH_ERR_RET(schLaunchTask(pJob, par));
}
}
SCH_ERR_RET(schLaunchNextLevelTasks(pJob, pTask));
return TSDB_CODE_SUCCESS;
_return:
......@@ -2400,8 +2464,6 @@ void schFreeJobImpl(void *job) {
for (int32_t i = 0; i < numOfLevels; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
schFreeFlowCtrl(pLevel);
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
for (int32_t j = 0; j < numOfTasks; ++j) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
......@@ -2411,12 +2473,15 @@ void schFreeJobImpl(void *job) {
taosArrayDestroy(pLevel->subTasks);
}
schFreeFlowCtrl(pJob);
taosHashCleanup(pJob->execTasks);
taosHashCleanup(pJob->failTasks);
taosHashCleanup(pJob->succTasks);
taosArrayDestroy(pJob->levels);
taosArrayDestroy(pJob->nodeList);
taosArrayDestroy(pJob->dataSrcTasks);
qExplainFreeCtx(pJob->explainCtx);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册