From 3cab6df16b85e31d4e473d0d77efd176b77f8102 Mon Sep 17 00:00:00 2001 From: dapan Date: Sat, 5 Mar 2022 17:19:21 +0800 Subject: [PATCH] feature/scheduler --- include/libs/qcom/query.h | 2 +- include/libs/scheduler/scheduler.h | 2 +- source/libs/scheduler/inc/schedulerInt.h | 5 +- source/libs/scheduler/src/schFlowCtrl.c | 22 ++- source/libs/scheduler/src/scheduler.c | 8 +- source/libs/scheduler/test/schedulerTests.cpp | 138 ++++++++++++------ 6 files changed, 123 insertions(+), 54 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 84c246dbf5..5aa3ca1acf 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -134,7 +134,7 @@ typedef struct SQueryNodeAddr { } SQueryNodeAddr; typedef struct SQueryNodeStat { - double tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT + int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT } SQueryNodeStat; int32_t initTaskQueue(); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 4f0f0b4953..56da9ece6f 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -25,7 +25,7 @@ extern "C" { typedef struct SSchedulerCfg { uint32_t maxJobNum; - double maxNodeTableNum; + int32_t maxNodeTableNum; } SSchedulerCfg; typedef struct SQueryProfileSummary { diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 7813251bfb..2af10a0e2d 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -28,7 +28,8 @@ extern "C" { #define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000 #define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000 -#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 20.0 /* in million */ +#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 20 // unit is TSDB_TABLE_NUM_UNIT + #define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA @@ -78,7 +79,7 @@ typedef struct SSchCallbackParam { typedef struct SSchFlowControl { SRWLatch lock; - double tableNumSum; + int32_t tableNumSum; uint32_t execTaskNum; SArray *taskList; // Element is SQueryTask* } SSchFlowControl; diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c index 2b3b5e20ae..1dc99354a1 100644 --- a/source/libs/scheduler/src/schFlowCtrl.c +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -42,10 +42,11 @@ void schFreeFlowCtrl(SSchLevel *pLevel) { int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { if (!SCH_IS_QUERY_JOB(pJob)) { + SCH_JOB_DLOG("job no need flow ctrl, queryJob:%d", SCH_IS_QUERY_JOB(pJob)); return TSDB_CODE_SUCCESS; } - double sum = 0; + int32_t sum = 0; for (int32_t i = 0; i < pLevel->taskNum; ++i) { SSchTask *pTask = taosArrayGet(pLevel->subTasks, i); @@ -54,6 +55,7 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { } if (sum < schMgmt.cfg.maxNodeTableNum) { + SCH_JOB_DLOG("job no need flow ctrl, totalTableNum:%d", sum); return TSDB_CODE_SUCCESS; } @@ -65,6 +67,8 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { SCH_SET_JOB_NEED_FLOW_CTRL(pJob); + SCH_JOB_DLOG("job NEED flow ctrl, totalTableNum:%d", sum); + return TSDB_CODE_SUCCESS; } @@ -89,6 +93,9 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) { --ctrl->execTaskNum; ctrl->tableNumSum -= pTask->plan->execNodeStat.tableNum; + SCH_TASK_DLOG("task quota removed, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", + ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); + _return: SCH_UNLOCK(SCH_WRITE, &ctrl->lock); @@ -100,13 +107,14 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { SSchLevel *pLevel = pTask->level; int32_t code = 0; SSchFlowControl *ctrl = NULL; + SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); do { - ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, SCH_GET_CUR_EP(&pTask->plan->execNode), sizeof(SEp)); + ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp)); if (NULL == ctrl) { SSchFlowControl nctrl = {.tableNumSum = pTask->plan->execNodeStat.tableNum, .execTaskNum = 1}; - code = taosHashPut(pLevel->flowCtrl, SCH_GET_CUR_EP(&pTask->plan->execNode), sizeof(SEp), &nctrl, sizeof(nctrl)); + code = taosHashPut(pLevel->flowCtrl, ep, sizeof(SEp), &nctrl, sizeof(nctrl)); if (code) { if (HASH_NODE_EXIST(code)) { continue; @@ -116,6 +124,9 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + SCH_TASK_DLOG("task quota added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", + ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, nctrl.tableNumSum, nctrl.execTaskNum); + *enough = true; return TSDB_CODE_SUCCESS; } @@ -130,7 +141,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { break; } - double sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum; + int32_t sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum; if (sum <= schMgmt.cfg.maxNodeTableNum) { ctrl->tableNumSum = sum; @@ -160,6 +171,9 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { _return: + SCH_TASK_DLOG("task quota %s added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", + ((*enough)?"":"NOT"), ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); + SCH_UNLOCK(SCH_WRITE, &ctrl->lock); SCH_RET(code); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index a5e419e00b..32cac59b81 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -130,6 +130,8 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + atomic_store_32(&pTask->lastMsgType, -1); + return TSDB_CODE_SUCCESS; } @@ -500,6 +502,8 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) { if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) { SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask)); + } else { + SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks)); } int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES); @@ -735,6 +739,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { bool moved = false; int32_t code = 0; + SCH_TASK_DLOG("taskOnSuccess, status:%d", SCH_GET_TASK_STATUS(pTask)); + SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved)); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED); @@ -1689,7 +1695,7 @@ int32_t schedulerFetchRows(int64_t job, void** pData) { SCH_JOB_ELOG("job failed or dropping, status:%d", status); SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); } else if (status == JOB_TASK_STATUS_SUCCEED) { - SCH_JOB_ELOG("job already succeed, status:%d", status); + SCH_JOB_DLOG("job already succeed, status:%d", status); goto _return; } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { SCH_ERR_JRET(schFetchFromRemote(pJob)); diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 73060fcf6c..663df5bcd1 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -126,6 +126,68 @@ void schtBuildQueryDag(SQueryPlan *dag) { nodesListAppend(dag->pSubplans, (SNode*)scan); } +void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { + uint64_t qId = schtQueryId; + int32_t scanPlanNum = 20; + + dag->queryId = qId; + dag->numOfSubplans = 2; + dag->pSubplans = nodesMakeList(); + SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST); + SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST); + + SSubplan *scanPlan = (SSubplan *)calloc(scanPlanNum, sizeof(SSubplan)); + SSubplan *mergePlan = (SSubplan *)calloc(1, sizeof(SSubplan)); + + merge->pNodeList = nodesMakeList(); + scan->pNodeList = nodesMakeList(); + + mergePlan->pChildren = nodesMakeList(); + + for (int32_t i = 0; i < scanPlanNum; ++i) { + scanPlan[i].id.queryId = qId; + scanPlan[i].id.templateId = 0x0000000000000002; + scanPlan[i].id.subplanId = 0x0000000000000003 + i; + scanPlan[i].subplanType = SUBPLAN_TYPE_SCAN; + + scanPlan[i].execNode.nodeId = 1 + i; + scanPlan[i].execNode.epset.inUse = 0; + scanPlan[i].execNodeStat.tableNum = rand() % 30; + addEpIntoEpSet(&scanPlan[i].execNode.epset, "ep0", 6030); + addEpIntoEpSet(&scanPlan[i].execNode.epset, "ep1", 6030); + addEpIntoEpSet(&scanPlan[i].execNode.epset, "ep2", 6030); + scanPlan[i].execNode.epset.inUse = rand() % 3; + + scanPlan[i].pChildren = NULL; + scanPlan[i].level = 1; + scanPlan[i].pParents = nodesMakeList(); + scanPlan[i].pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode)); + scanPlan[i].msgType = TDMT_VND_QUERY; + + nodesListAppend(scanPlan[i].pParents, (SNode*)mergePlan); + nodesListAppend(mergePlan->pChildren, (SNode*)(scanPlan + i)); + + nodesListAppend(scan->pNodeList, (SNode*)(scanPlan + i)); + } + + mergePlan->id.queryId = qId; + mergePlan->id.templateId = schtMergeTemplateId; + mergePlan->id.subplanId = 0x5555; + mergePlan->subplanType = SUBPLAN_TYPE_MERGE; + mergePlan->level = 0; + mergePlan->execNode.epset.numOfEps = 0; + + mergePlan->pParents = NULL; + mergePlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode)); + mergePlan->msgType = TDMT_VND_QUERY; + + nodesListAppend(merge->pNodeList, (SNode*)mergePlan); + + nodesListAppend(dag->pSubplans, (SNode*)merge); + nodesListAppend(dag->pSubplans, (SNode*)scan); +} + + void schtFreeQueryDag(SQueryPlan *dag) { } @@ -650,6 +712,8 @@ TEST(queryTest, flowCtrlCase) { schtInitLogFile(); + srand(time(NULL)); + SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); SEp qnodeAddr = {0}; @@ -660,7 +724,7 @@ TEST(queryTest, flowCtrlCase) { int32_t code = schedulerInit(NULL); ASSERT_EQ(code, 0); - schtBuildQueryDag(&dag); + schtBuildQueryFlowCtrlDag(&dag); schtSetPlanToString(); schtSetExecNode(); @@ -671,54 +735,38 @@ TEST(queryTest, flowCtrlCase) { SSchJob *pJob = schAcquireJob(job); - - void *pIter = taosHashIterate(pJob->execTasks, NULL); - while (pIter) { - SSchTask *task = *(SSchTask **)pIter; - SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); - - ASSERT_EQ(code, 0); - taosHashCancelIterate(pJob->execTasks, pIter); - pIter = NULL; - } - - pIter = taosHashIterate(pJob->execTasks, NULL); - while (pIter) { - SSchTask *task = *(SSchTask **)pIter; - - SResReadyRsp rsp = {0}; - code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); - printf("code:%d", code); - ASSERT_EQ(code, 0); - taosHashCancelIterate(pJob->execTasks, pIter); - pIter = NULL; - } - - pIter = taosHashIterate(pJob->execTasks, NULL); - while (pIter) { - SSchTask *task = *(SSchTask **)pIter; - - SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + bool queryDone = false; + + while (!queryDone) { + void *pIter = taosHashIterate(pJob->execTasks, NULL); + if (NULL == pIter) { + break; + } - ASSERT_EQ(code, 0); - taosHashCancelIterate(pJob->execTasks, pIter); - pIter = NULL; - } + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; - pIter = taosHashIterate(pJob->execTasks, NULL); - while (pIter) { - SSchTask *task = *(SSchTask **)pIter; + taosHashCancelIterate(pJob->execTasks, pIter); + + if (task->lastMsgType == TDMT_VND_QUERY) { + SQueryTableRsp rsp = {0}; + code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + } else if (task->lastMsgType == TDMT_VND_RES_READY) { + SResReadyRsp rsp = {0}; + code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); + ASSERT_EQ(code, 0); + } else { + queryDone = true; + break; + } + + pIter = NULL; + } + } - SResReadyRsp rsp = {0}; - code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); - ASSERT_EQ(code, 0); - - taosHashCancelIterate(pJob->execTasks, pIter); - pIter = NULL; - } pthread_attr_t thattr; pthread_attr_init(&thattr); -- GitLab