From 7b6b3a72b18df0031b97b49605fccc5284ae87e0 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 31 May 2022 17:05:27 +0800 Subject: [PATCH] fix send on broken rpc connection issue --- source/libs/scheduler/inc/schedulerInt.h | 9 ++-- source/libs/scheduler/src/schJob.c | 61 ++++++++++++++++++++---- source/libs/scheduler/src/schRemote.c | 20 +------- 3 files changed, 59 insertions(+), 31 deletions(-) diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 3302a4b61a..09a4f322d4 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -300,7 +300,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask); int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp); int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp); void schProcessOnDataFetched(SSchJob *job); -int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask); +int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask); int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode); void schFreeRpcCtxVal(const void *arg); int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb); @@ -314,12 +314,11 @@ int32_t schCancelJob(SSchJob *pJob); int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode); uint64_t schGenTaskId(void); void schCloseJobRef(void); -int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, - int64_t startTs, SSchResInfo *pRes); -int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, - int64_t startTs, SSchResInfo *pRes); +int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SSchResInfo *pRes); +int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SSchResInfo *pRes); int32_t schFetchRows(SSchJob *pJob); int32_t schAsyncFetchRows(SSchJob *pJob); +int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t rspCode); #ifdef __cplusplus diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 00bebcd5ac..1c60dcccfd 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -342,6 +342,36 @@ int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *ad return TSDB_CODE_SUCCESS; } +int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle) { + if (NULL == pTask->execNodes) { + return TSDB_CODE_SUCCESS; + } + + int32_t num = taosArrayGetSize(pTask->execNodes); + for (int32_t i = 0; i < num; ++i) { + SSchNodeInfo* pNode = taosArrayGet(pTask->execNodes, i); + if (pNode->handle == handle) { + taosArrayRemove(pTask->execNodes, i); + break; + } + } + + return TSDB_CODE_SUCCESS; +} + +int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t rspCode) { + SCH_SET_TASK_HANDLE(pTask, handle); + + schUpdateTaskExecNodeHandle(pTask, handle, rspCode); + + if (msgType == TDMT_SCH_LINK_BROKEN) { + schDropTaskExecNode(pJob, pTask, handle); + } + + return TSDB_CODE_SUCCESS; +} + + int32_t schRecordQueryDataSrc(SSchJob *pJob, SSchTask *pTask) { if (!SCH_IS_DATA_SRC_QRY_TASK(pTask)) { return TSDB_CODE_SUCCESS; @@ -1009,19 +1039,19 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { */ for (int32_t i = 0; i < parentNum; ++i) { - SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i); - int32_t readyNum = atomic_add_fetch_32(&par->childReady, 1); + SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i); + int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1); - SCH_LOCK(SCH_WRITE, &par->lock); + SCH_LOCK(SCH_WRITE, &parent->lock); SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE, .taskId = pTask->taskId, .schedId = schMgmt.sId, .addr = pTask->succeedAddr}; - qSetSubplanExecutionNode(par->plan, pTask->plan->id.groupId, &source); - SCH_UNLOCK(SCH_WRITE, &par->lock); + qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source); + SCH_UNLOCK(SCH_WRITE, &parent->lock); - if (SCH_TASK_READY_FOR_LAUNCH(readyNum, par)) { - SCH_ERR_RET(schLaunchTask(pJob, par)); + if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) { + SCH_ERR_RET(schLaunchTask(pJob, parent)); } } @@ -1095,7 +1125,7 @@ int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) { return TSDB_CODE_SUCCESS; } -int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) { +int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) { int32_t s = taosHashGetSize(pTaskList); if (s <= 0) { return TSDB_CODE_SUCCESS; @@ -1111,6 +1141,21 @@ int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask ** return TSDB_CODE_SUCCESS; } +int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) { + schGetTaskFromList(pJob->execTasks, taskId, pTask); + if (NULL == *pTask) { + schGetTaskFromList(pJob->succTasks, taskId, pTask); + + if (NULL == *pTask) { + SCH_JOB_ELOG("task not found in execList & succList, taskId:%" PRIx64, taskId); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + } + + return TSDB_CODE_SUCCESS; +} + + int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) { if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 || taosArrayGetSize(pTask->execNodes) <= 0) { diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 312d587b6f..c15649106e 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -358,27 +358,11 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); } - schGetTaskFromTaskList(pJob->execTasks, pParam->taskId, &pTask); - if (NULL == pTask) { - if (TDMT_VND_EXPLAIN_RSP == msgType) { - schGetTaskFromTaskList(pJob->succTasks, pParam->taskId, &pTask); - } else { - SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, - pParam->taskId); - SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); - } - } - - if (NULL == pTask) { - SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, - pParam->taskId); - SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); - } + SCH_ERR_JRET(schGetTaskInJob(pJob, pParam->taskId, &pTask)); SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode)); - SCH_SET_TASK_HANDLE(pTask, pMsg->handle); - schUpdateTaskExecNodeHandle(pTask, pMsg->handle, rspCode); + SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, msgType, pMsg->handle, rspCode)); SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); -- GitLab