提交 3cab6df1 编写于 作者: D dapan

feature/scheduler

上级 21af31f4
......@@ -134,7 +134,7 @@ typedef struct SQueryNodeAddr {
} SQueryNodeAddr;
typedef struct SQueryNodeStat {
double tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT
int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT
} SQueryNodeStat;
int32_t initTaskQueue();
......
......@@ -25,7 +25,7 @@ extern "C" {
typedef struct SSchedulerCfg {
uint32_t maxJobNum;
double maxNodeTableNum;
int32_t maxNodeTableNum;
} SSchedulerCfg;
typedef struct SQueryProfileSummary {
......
......@@ -28,7 +28,8 @@ extern "C" {
#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 20.0 /* in million */
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 20 // unit is TSDB_TABLE_NUM_UNIT
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
......@@ -78,7 +79,7 @@ typedef struct SSchCallbackParam {
typedef struct SSchFlowControl {
SRWLatch lock;
double tableNumSum;
int32_t tableNumSum;
uint32_t execTaskNum;
SArray *taskList; // Element is SQueryTask*
} SSchFlowControl;
......
......@@ -42,10 +42,11 @@ void schFreeFlowCtrl(SSchLevel *pLevel) {
int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
if (!SCH_IS_QUERY_JOB(pJob)) {
SCH_JOB_DLOG("job no need flow ctrl, queryJob:%d", SCH_IS_QUERY_JOB(pJob));
return TSDB_CODE_SUCCESS;
}
double sum = 0;
int32_t sum = 0;
for (int32_t i = 0; i < pLevel->taskNum; ++i) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);
......@@ -54,6 +55,7 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
}
if (sum < schMgmt.cfg.maxNodeTableNum) {
SCH_JOB_DLOG("job no need flow ctrl, totalTableNum:%d", sum);
return TSDB_CODE_SUCCESS;
}
......@@ -65,6 +67,8 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
SCH_SET_JOB_NEED_FLOW_CTRL(pJob);
SCH_JOB_DLOG("job NEED flow ctrl, totalTableNum:%d", sum);
return TSDB_CODE_SUCCESS;
}
......@@ -89,6 +93,9 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) {
--ctrl->execTaskNum;
ctrl->tableNumSum -= pTask->plan->execNodeStat.tableNum;
SCH_TASK_DLOG("task quota removed, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
_return:
SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
......@@ -100,13 +107,14 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
SSchLevel *pLevel = pTask->level;
int32_t code = 0;
SSchFlowControl *ctrl = NULL;
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
do {
ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, SCH_GET_CUR_EP(&pTask->plan->execNode), sizeof(SEp));
ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp));
if (NULL == ctrl) {
SSchFlowControl nctrl = {.tableNumSum = pTask->plan->execNodeStat.tableNum, .execTaskNum = 1};
code = taosHashPut(pLevel->flowCtrl, SCH_GET_CUR_EP(&pTask->plan->execNode), sizeof(SEp), &nctrl, sizeof(nctrl));
code = taosHashPut(pLevel->flowCtrl, ep, sizeof(SEp), &nctrl, sizeof(nctrl));
if (code) {
if (HASH_NODE_EXIST(code)) {
continue;
......@@ -116,6 +124,9 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("task quota added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, nctrl.tableNumSum, nctrl.execTaskNum);
*enough = true;
return TSDB_CODE_SUCCESS;
}
......@@ -130,7 +141,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
break;
}
double sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum;
int32_t sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum;
if (sum <= schMgmt.cfg.maxNodeTableNum) {
ctrl->tableNumSum = sum;
......@@ -160,6 +171,9 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
_return:
SCH_TASK_DLOG("task quota %s added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
((*enough)?"":"NOT"), ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
SCH_RET(code);
......
......@@ -130,6 +130,8 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
atomic_store_32(&pTask->lastMsgType, -1);
return TSDB_CODE_SUCCESS;
}
......@@ -500,6 +502,8 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask));
} else {
SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
}
int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
......@@ -735,6 +739,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
bool moved = false;
int32_t code = 0;
SCH_TASK_DLOG("taskOnSuccess, status:%d", SCH_GET_TASK_STATUS(pTask));
SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED);
......@@ -1689,7 +1695,7 @@ int32_t schedulerFetchRows(int64_t job, void** pData) {
SCH_JOB_ELOG("job failed or dropping, status:%d", status);
SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
} else if (status == JOB_TASK_STATUS_SUCCEED) {
SCH_JOB_ELOG("job already succeed, status:%d", status);
SCH_JOB_DLOG("job already succeed, status:%d", status);
goto _return;
} else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_ERR_JRET(schFetchFromRemote(pJob));
......
......@@ -126,6 +126,68 @@ void schtBuildQueryDag(SQueryPlan *dag) {
nodesListAppend(dag->pSubplans, (SNode*)scan);
}
void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
uint64_t qId = schtQueryId;
int32_t scanPlanNum = 20;
dag->queryId = qId;
dag->numOfSubplans = 2;
dag->pSubplans = nodesMakeList();
SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
SSubplan *scanPlan = (SSubplan *)calloc(scanPlanNum, sizeof(SSubplan));
SSubplan *mergePlan = (SSubplan *)calloc(1, sizeof(SSubplan));
merge->pNodeList = nodesMakeList();
scan->pNodeList = nodesMakeList();
mergePlan->pChildren = nodesMakeList();
for (int32_t i = 0; i < scanPlanNum; ++i) {
scanPlan[i].id.queryId = qId;
scanPlan[i].id.templateId = 0x0000000000000002;
scanPlan[i].id.subplanId = 0x0000000000000003 + i;
scanPlan[i].subplanType = SUBPLAN_TYPE_SCAN;
scanPlan[i].execNode.nodeId = 1 + i;
scanPlan[i].execNode.epset.inUse = 0;
scanPlan[i].execNodeStat.tableNum = rand() % 30;
addEpIntoEpSet(&scanPlan[i].execNode.epset, "ep0", 6030);
addEpIntoEpSet(&scanPlan[i].execNode.epset, "ep1", 6030);
addEpIntoEpSet(&scanPlan[i].execNode.epset, "ep2", 6030);
scanPlan[i].execNode.epset.inUse = rand() % 3;
scanPlan[i].pChildren = NULL;
scanPlan[i].level = 1;
scanPlan[i].pParents = nodesMakeList();
scanPlan[i].pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode));
scanPlan[i].msgType = TDMT_VND_QUERY;
nodesListAppend(scanPlan[i].pParents, (SNode*)mergePlan);
nodesListAppend(mergePlan->pChildren, (SNode*)(scanPlan + i));
nodesListAppend(scan->pNodeList, (SNode*)(scanPlan + i));
}
mergePlan->id.queryId = qId;
mergePlan->id.templateId = schtMergeTemplateId;
mergePlan->id.subplanId = 0x5555;
mergePlan->subplanType = SUBPLAN_TYPE_MERGE;
mergePlan->level = 0;
mergePlan->execNode.epset.numOfEps = 0;
mergePlan->pParents = NULL;
mergePlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode));
mergePlan->msgType = TDMT_VND_QUERY;
nodesListAppend(merge->pNodeList, (SNode*)mergePlan);
nodesListAppend(dag->pSubplans, (SNode*)merge);
nodesListAppend(dag->pSubplans, (SNode*)scan);
}
void schtFreeQueryDag(SQueryPlan *dag) {
}
......@@ -650,6 +712,8 @@ TEST(queryTest, flowCtrlCase) {
schtInitLogFile();
srand(time(NULL));
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
SEp qnodeAddr = {0};
......@@ -660,7 +724,7 @@ TEST(queryTest, flowCtrlCase) {
int32_t code = schedulerInit(NULL);
ASSERT_EQ(code, 0);
schtBuildQueryDag(&dag);
schtBuildQueryFlowCtrlDag(&dag);
schtSetPlanToString();
schtSetExecNode();
......@@ -671,54 +735,38 @@ TEST(queryTest, flowCtrlCase) {
SSchJob *pJob = schAcquireJob(job);
void *pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
taosHashCancelIterate(pJob->execTasks, pIter);
pIter = NULL;
}
pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
printf("code:%d", code);
ASSERT_EQ(code, 0);
taosHashCancelIterate(pJob->execTasks, pIter);
pIter = NULL;
}
pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
bool queryDone = false;
while (!queryDone) {
void *pIter = taosHashIterate(pJob->execTasks, NULL);
if (NULL == pIter) {
break;
}
ASSERT_EQ(code, 0);
taosHashCancelIterate(pJob->execTasks, pIter);
pIter = NULL;
}
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
taosHashCancelIterate(pJob->execTasks, pIter);
if (task->lastMsgType == TDMT_VND_QUERY) {
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
} else if (task->lastMsgType == TDMT_VND_RES_READY) {
SResReadyRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
} else {
queryDone = true;
break;
}
pIter = NULL;
}
}
SResReadyRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
taosHashCancelIterate(pJob->execTasks, pIter);
pIter = NULL;
}
pthread_attr_t thattr;
pthread_attr_init(&thattr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册