diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 10aba946569a1aa368a7d00a9295bcb5b38d0572..7f3a73f96e0827ed99d2991794e46304e3542d5b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1090,6 +1090,7 @@ typedef struct SResFetchMsg { } SResFetchMsg; typedef struct SSchTasksStatusMsg { + SMsgHead header; uint64_t sId; } SSchTasksStatusMsg; @@ -1105,6 +1106,7 @@ typedef struct SSchedulerStatusRsp { } SSchedulerStatusRsp; typedef struct STaskCancelMsg { + SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; @@ -1115,6 +1117,7 @@ typedef struct STaskCancelRsp { } STaskCancelRsp; typedef struct STaskDropMsg { + SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 2770f7e21a4dce62a7461eede23cc3e3f60332cf..fa4ae0d1521456392dc065734f24e252b5fc0598 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -60,17 +60,19 @@ typedef struct SSchLevel { typedef struct SSchTask { - uint64_t taskId; // task id - SSchLevel *level; // level - SSubplan *plan; // subplan - char *msg; // operator tree - int32_t msgLen; // msg length - int8_t status; // task status - SEpAddr execAddr; // task actual executed node address - SQueryProfileSummary summary; // task execution summary - int32_t childReady; // child task ready number - SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* - SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* + uint64_t taskId; // task id + SSchLevel *level; // level + SSubplan *plan; // subplan + char *msg; // operator tree + int32_t msgLen; // msg length + int8_t status; // task status + SQueryNodeAddr execAddr; // task actual executed node address + int8_t condidateIdx; // current try condidation index + SArray *condidateAddrs; // condidate node addresses, element is SQueryNodeAddr + SQueryProfileSummary summary; // task execution summary + int32_t childReady; // child task ready number + SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* + SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* } SSchTask; typedef struct SSchJobAttr { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 20eb94c2ff65333fda5834a77359188b9ee2cb2f..090c7b6fa75a71c9ceb82c36f8e9a18df3483b56 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -215,28 +215,49 @@ _return: SCH_RET(code); } -int32_t schSetTaskExecEpSet(SSchJob *job, SEpSet *epSet) { - if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) { +int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) { + if (task->condidateAddrs) { return TSDB_CODE_SUCCESS; } + task->condidateIdx = 0; + task->condidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); + if (NULL == task->condidateAddrs) { + qError("taosArrayInit failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + if (task->plan->execNode.numOfEps > 0) { + if (NULL == taosArrayPush(task->condidateAddrs, &task->plan->execNode)) { + qError("taosArrayPush failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + return TSDB_CODE_SUCCESS; + } + + int32_t addNum = 0; int32_t nodeNum = taosArrayGetSize(job->nodeList); - for (int32_t i = 0; i < nodeNum && epSet->numOfEps < tListLen(epSet->port); ++i) { - SEpAddr *addr = taosArrayGet(job->nodeList, i); + for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { + SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i); - strncpy(epSet->fqdn[epSet->numOfEps], addr->fqdn, sizeof(addr->fqdn)); - epSet->port[epSet->numOfEps] = addr->port; + if (NULL == taosArrayPush(task->condidateAddrs, &task->plan->execNode)) { + qError("taosArrayPush failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } - ++epSet->numOfEps; + ++addNum; } - for (int32_t i = 0; i < job->dataSrcEps.numOfEps && epSet->numOfEps < tListLen(epSet->port); ++i) { +/* + for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; ++epSet->numOfEps; } +*/ return TSDB_CODE_SUCCESS; } @@ -394,12 +415,14 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { return TSDB_CODE_SUCCESS; } +/* if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) { strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn)); job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port; ++job->dataSrcEps.numOfEps; } +*/ for (int32_t i = 0; i < parentNum; ++i) { SSchTask *par = *(SSchTask **)taosArrayGet(task->parents, i); @@ -633,6 +656,16 @@ _return: SCH_RET(code); } +void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { + epSet->inUse = addr->inUse; + epSet->numOfEps = addr->numOfEps; + + for (int8_t i = 0; i < epSet->numOfEps; ++i) { + strncpy(epSet->fqdn[i], addr->epAddr[i].fqdn, sizeof(addr->epAddr[i].fqdn)); + epSet->port[i] = addr->epAddr[i].port; + } +} + int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { uint32_t msgSize = 0; @@ -665,6 +698,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { SSubQueryMsg *pMsg = msg; + pMsg->header.vgId = htonl(task->plan->execNode.nodeId); pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); @@ -681,6 +715,8 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { } SResReadyMsg *pMsg = msg; + + pMsg->header.vgId = htonl(task->plan->execNode.nodeId); pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); @@ -698,6 +734,8 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { } SResFetchMsg *pMsg = msg; + + pMsg->header.vgId = htonl(task->plan->execNode.nodeId); pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); @@ -712,6 +750,8 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { } STaskDropMsg *pMsg = msg; + + pMsg->header.vgId = htonl(task->plan->execNode.nodeId); pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); @@ -723,7 +763,12 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { break; } - SCH_ERR_JRET(schAsyncSendMsg(job->transport, &task->plan->execEpSet, job->queryId, task->taskId, msgType, msg, msgSize)); + SEpSet epSet; + SQueryNodeAddr *addr = taosArrayGet(task->condidateAddrs, task->condidateIdx); + + schConvertAddrToEpSet(addr, &epSet); + + SCH_ERR_JRET(schAsyncSendMsg(job->transport, &epSet, job->queryId, task->taskId, msgType, msg, msgSize)); return TSDB_CODE_SUCCESS; @@ -737,12 +782,10 @@ _return: int32_t schLaunchTask(SSchJob *job, SSchTask *task) { SSubplan *plan = task->plan; SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen)); - if (plan->execEpSet.numOfEps <= 0) { - SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet)); - } + SCH_ERR_RET(schSetTaskCondidateAddrs(job, task)); - if (plan->execEpSet.numOfEps <= 0) { - SCH_TASK_ERR_LOG("invalid execEpSet num:%d", plan->execEpSet.numOfEps); + if (NULL == task->condidateAddrs || taosArrayGetSize(task->condidateAddrs) <= 0) { + SCH_TASK_ERR_LOG("no valid condidate node for task:%"PRIx64, task->taskId); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); }