diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 2257ba8328869f09c4aa9975d975c0e3ea5af643..1b893739bd846f8e94911c03bbd562d13d6a0370 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -35,7 +35,7 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); - } + } if (taskStatus != JOB_TASK_STATUS_PART_SUCC) { SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), TMSG_INFO(msgType)); @@ -75,7 +75,7 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { // Note: no more task error processing, handled in function internal int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; - char *msg = pMsg->pData; + char *msg = pMsg->pData; int32_t msgSize = pMsg->len; int32_t msgType = pMsg->msgType; @@ -253,15 +253,15 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa rsp->sversion = ntohl(rsp->sversion); rsp->tversion = ntohl(rsp->tversion); rsp->affectedRows = be64toh(rsp->affectedRows); - + SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp)); atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); - taosMemoryFreeClear(msg); - + taosMemoryFreeClear(msg); + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); break; @@ -375,7 +375,8 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { SSchTask *pTask = NULL; SSchJob *pJob = NULL; - qDebug("begin to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode)); + qDebug("begin to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, + tstrerror(rspCode)); SCH_ERR_RET(schProcessOnCbBegin(&pJob, &pTask, pParam->queryId, pParam->refId, pParam->taskId)); @@ -387,7 +388,8 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { taosMemoryFreeClear(pMsg->pData); taosMemoryFreeClear(param); - qDebug("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode)); + qDebug("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, + tstrerror(rspCode)); SCH_RET(code); } @@ -424,7 +426,7 @@ int32_t schHandleCommitCallback(void *param, SDataBuf *pMsg, int32_t code) { } int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) { - SSchedulerHbRsp rsp = {0}; + SSchedulerHbRsp rsp = {0}; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; if (code) { @@ -453,8 +455,8 @@ _return: SCH_RET(code); } - -int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bool isHb, SSchTrans *trans, void **pParam) { +int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bool isHb, SSchTrans *trans, + void **pParam) { if (!isHb) { SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam)); if (NULL == param) { @@ -940,7 +942,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, if (NULL == addr) { addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); isCandidateAddr = true; - SCH_TASK_DLOG("target candidateIdx %d", pTask->candidateIdx); + SCH_TASK_DLOG("target candidateIdx %d, epInUse %d/%d", pTask->candidateIdx, addr->epSet.inUse, + addr->epSet.numOfEps); } switch (msgType) { diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index a6621d279d10ce72cfb4a5dad2945d9b71aa9b1f..4275bea3f092a4a07d01c8f7c81fa4f1b42cd343 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -21,11 +21,9 @@ #include "tref.h" #include "trpc.h" - - void schFreeTask(SSchJob *pJob, SSchTask *pTask) { schDeregisterTaskHb(pJob, pTask); - + if (pTask->candidateAddrs) { taosArrayDestroy(pTask->candidateAddrs); } @@ -45,17 +43,17 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) { } } - int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum) { int32_t code = 0; - + pTask->plan = pPlan; pTask->level = pLevel; pTask->execId = -1; pTask->maxExecTimes = SCH_TASK_MAX_EXEC_TIMES(pLevel->level, levelNum); pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC; pTask->taskId = schGenTaskId(); - pTask->execNodes = taosHashInit(SCH_MAX_CANDIDATE_EP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + pTask->execNodes = + taosHashInit(SCH_MAX_CANDIDATE_EP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pTask->profile.execTime = taosMemoryCalloc(pTask->maxExecTimes, sizeof(int64_t)); if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) { SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -110,8 +108,8 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_ } else { SCH_TASK_DLOG("execId %d removed from execNodeList", execId); } - - if (execId != pTask->execId) { // ignore it + + if (execId != pTask->execId) { // ignore it SCH_TASK_DLOG("execId %d is not current execId %d", execId, pTask->execId); SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR); } @@ -149,13 +147,13 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) { return TSDB_CODE_SCH_IGNORE_ERROR; } - + int8_t status = 0; if (schJobNeedToStop(pJob, &status)) { SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(status)); SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR); } - + if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) { SCH_TASK_ELOG("task already not in EXEC status, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); @@ -204,8 +202,6 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) SCH_RET(errCode); } - - // Note: no more task error processing, handled in function internal int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { bool moved = false; @@ -265,13 +261,14 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1); SCH_LOCK_TASK(parent); - SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE, - .taskId = pTask->taskId, - .schedId = schMgmt.sId, - .execId = pTask->execId, - .addr = pTask->succeedAddr, - .fetchMsgType = SCH_FETCH_TYPE(pTask), - }; + SDownstreamSourceNode source = { + .type = QUERY_NODE_DOWNSTREAM_SOURCE, + .taskId = pTask->taskId, + .schedId = schMgmt.sId, + .execId = pTask->execId, + .addr = pTask->succeedAddr, + .fetchMsgType = SCH_FETCH_TYPE(pTask), + }; qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source); SCH_UNLOCK_TASK(parent); @@ -291,29 +288,29 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } - if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && - pJob->fetchTask != pTask && taosArrayGetSize(pTask->candidateAddrs) > 1) { + if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && pJob->fetchTask != pTask && + taosArrayGetSize(pTask->candidateAddrs) > 1) { SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId); schDropTaskOnExecNode(pJob, pTask); taosHashClear(pTask->execNodes); - + SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR)); } return TSDB_CODE_SUCCESS; } -int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) { +int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { int32_t code = 0; - + if ((pTask->execId + 1) >= pTask->maxExecTimes) { SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId); - schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void*)&rspCode); + schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void *)&rspCode); return TSDB_CODE_SUCCESS; } SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); - + schDropTaskOnExecNode(pJob, pTask); taosHashClear(pTask->execNodes); SCH_ERR_JRET(schRemoveTaskFromExecList(pJob, pTask)); @@ -328,25 +325,24 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32 if (pData) { SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet)); } - + if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { if (JOB_TASK_STATUS_EXEC == SCH_GET_TASK_STATUS(pTask)) { SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask)); } - } + } SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); - + SCH_ERR_JRET(schLaunchTask(pJob, pTask)); return TSDB_CODE_SUCCESS; } - // merge plan - + pTask->childReady = 0; - + qClearSubplanExecutionNode(pTask->plan); // Note: current error task and upper level merge task @@ -355,10 +351,10 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32 } SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); - + int32_t childrenNum = taosArrayGetSize(pTask->children); for (int32_t i = 0; i < childrenNum; ++i) { - SSchTask* pChild = taosArrayGetP(pTask->children, i); + SSchTask *pChild = taosArrayGetP(pTask->children, i); SCH_LOCK_TASK(pChild); schDoTaskRedirect(pJob, pChild, NULL, rspCode); SCH_UNLOCK_TASK(pChild); @@ -371,7 +367,7 @@ _return: SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } -int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) { +int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { int32_t code = 0; if (SCH_IS_DATA_BIND_TASK(pTask)) { @@ -537,7 +533,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(schRemoveTaskFromExecList(pJob, pTask)); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); - + if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask)); } @@ -545,7 +541,8 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { schDeregisterTaskHb(pJob, pTask); if (SCH_IS_DATA_BIND_TASK(pTask)) { - SCH_SWITCH_EPSET(&pTask->plan->execNode); + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + SCH_SWITCH_EPSET(addr); } else { SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask)); } @@ -558,20 +555,21 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) { int32_t addNum = 0; int32_t nodeNum = 0; - + if (pJob->nodeList) { nodeNum = taosArrayGetSize(pJob->nodeList); for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i); SQueryNodeAddr *naddr = &nload->addr; - + if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) { SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_DLOG("set %dth candidate addr, id %d, fqdn:%s, port:%d", i, naddr->nodeId, SCH_GET_CUR_EP(naddr)->fqdn, SCH_GET_CUR_EP(naddr)->port); + SCH_TASK_DLOG("set %dth candidate addr, id %d, fqdn:%s, port:%d", i, naddr->nodeId, SCH_GET_CUR_EP(naddr)->fqdn, + SCH_GET_CUR_EP(naddr)->port); ++addNum; } @@ -585,7 +583,6 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } - int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { if (NULL != pTask->candidateAddrs) { return TSDB_CODE_SUCCESS; @@ -628,16 +625,17 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet) { +int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) { if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) { - SCH_TASK_ELOG("not able to update cndidate addr, addr num %d", (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs): 0)); + SCH_TASK_ELOG("not able to update cndidate addr, addr num %d", + (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0)); SCH_ERR_RET(TSDB_CODE_APP_ERROR); } - SQueryNodeAddr* pAddr = taosArrayGet(pTask->candidateAddrs, 0); + SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0); - SEp* pOld = &pAddr->epSet.eps[pAddr->epSet.inUse]; - SEp* pNew = &pEpSet->eps[pEpSet->inUse]; + SEp *pOld = &pAddr->epSet.eps[pAddr->epSet.inUse]; + SEp *pNew = &pEpSet->eps[pEpSet->inUse]; SCH_TASK_DLOG("update task ep from %s:%d to %s:%d", pOld->fqdn, pOld->port, pNew->fqdn, pNew->port); @@ -647,7 +645,7 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSe } int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) { - int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); + int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); if (++pTask->candidateIdx >= candidateNum) { pTask->candidateIdx = 0; } @@ -655,8 +653,6 @@ int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } - - int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) { int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId)); if (code) { @@ -692,33 +688,32 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { SCH_TASK_DLOG("task has been dropped on %d exec nodes", size); } - - -int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) { - int32_t taskNum = (int32_t)taosArrayGetSize(pStatusList); +int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { + int32_t taskNum = (int32_t)taosArrayGetSize(pStatusList); SSchTask *pTask = NULL; - SSchJob *pJob = NULL; + SSchJob *pJob = NULL; - qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn, pEpId->ep.port); + qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn, + pEpId->ep.port); for (int32_t i = 0; i < taskNum; ++i) { STaskStatus *pStatus = taosArrayGet(pStatusList, i); - int32_t code = 0; - - qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", - pStatus->queryId, pStatus->taskId, pStatus->execId, jobTaskStatusStr(pStatus->status)); + int32_t code = 0; + + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId, pStatus->taskId, + pStatus->execId, jobTaskStatusStr(pStatus->status)); if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) { continue; } if (pStatus->execId != pTask->execId) { - //TODO + // TODO SCH_TASK_DLOG("execId %d mis-match current execId %d", pStatus->execId, pTask->execId); schProcessOnCbEnd(pJob, pTask, 0); continue; } - + if (pStatus->status == JOB_TASK_STATUS_FAIL) { // RECORD AND HANDLE ERROR!!!! schProcessOnCbEnd(pJob, pTask, 0); @@ -832,7 +827,6 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { } } - // Note: no more error processing, handled in function internal int32_t schLaunchFetchTask(SSchJob *pJob) { int32_t code = 0; @@ -851,5 +845,3 @@ _return: SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code)); } - - diff --git a/tests/script/tsim/sync/vnodesnapshot-test.sim b/tests/script/tsim/sync/vnodesnapshot-test.sim index 837903af37fe40afacb0ae0fbb2e8194ab155648..3cf8cb4d9379ef137421b949ea57a1f85dd65ff4 100644 --- a/tests/script/tsim/sync/vnodesnapshot-test.sim +++ b/tests/script/tsim/sync/vnodesnapshot-test.sim @@ -201,7 +201,7 @@ system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode3 -s start system sh/exec.sh -n dnode4 -s start -sleep 3000 +sleep 7000 print =============== query data sql connect