提交 7b6b3a72 编写于 作者: D dapan1121

fix send on broken rpc connection issue

上级 52988b6d
......@@ -300,7 +300,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask);
int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp);
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp);
void schProcessOnDataFetched(SSchJob *job);
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask);
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode);
void schFreeRpcCtxVal(const void *arg);
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb);
......@@ -314,12 +314,11 @@ int32_t schCancelJob(SSchJob *pJob);
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode);
uint64_t schGenTaskId(void);
void schCloseJobRef(void);
int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
int64_t startTs, SSchResInfo *pRes);
int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
int64_t startTs, SSchResInfo *pRes);
int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SSchResInfo *pRes);
int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SSchResInfo *pRes);
int32_t schFetchRows(SSchJob *pJob);
int32_t schAsyncFetchRows(SSchJob *pJob);
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t rspCode);
#ifdef __cplusplus
......
......@@ -342,6 +342,36 @@ int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *ad
return TSDB_CODE_SUCCESS;
}
int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle) {
if (NULL == pTask->execNodes) {
return TSDB_CODE_SUCCESS;
}
int32_t num = taosArrayGetSize(pTask->execNodes);
for (int32_t i = 0; i < num; ++i) {
SSchNodeInfo* pNode = taosArrayGet(pTask->execNodes, i);
if (pNode->handle == handle) {
taosArrayRemove(pTask->execNodes, i);
break;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t rspCode) {
SCH_SET_TASK_HANDLE(pTask, handle);
schUpdateTaskExecNodeHandle(pTask, handle, rspCode);
if (msgType == TDMT_SCH_LINK_BROKEN) {
schDropTaskExecNode(pJob, pTask, handle);
}
return TSDB_CODE_SUCCESS;
}
int32_t schRecordQueryDataSrc(SSchJob *pJob, SSchTask *pTask) {
if (!SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
return TSDB_CODE_SUCCESS;
......@@ -1009,19 +1039,19 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
*/
for (int32_t i = 0; i < parentNum; ++i) {
SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i);
int32_t readyNum = atomic_add_fetch_32(&par->childReady, 1);
SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);
int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
SCH_LOCK(SCH_WRITE, &par->lock);
SCH_LOCK(SCH_WRITE, &parent->lock);
SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE,
.taskId = pTask->taskId,
.schedId = schMgmt.sId,
.addr = pTask->succeedAddr};
qSetSubplanExecutionNode(par->plan, pTask->plan->id.groupId, &source);
SCH_UNLOCK(SCH_WRITE, &par->lock);
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
SCH_UNLOCK(SCH_WRITE, &parent->lock);
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, par)) {
SCH_ERR_RET(schLaunchTask(pJob, par));
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
SCH_ERR_RET(schLaunchTask(pJob, parent));
}
}
......@@ -1095,7 +1125,7 @@ int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) {
return TSDB_CODE_SUCCESS;
}
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
int32_t s = taosHashGetSize(pTaskList);
if (s <= 0) {
return TSDB_CODE_SUCCESS;
......@@ -1111,6 +1141,21 @@ int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **
return TSDB_CODE_SUCCESS;
}
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
schGetTaskFromList(pJob->execTasks, taskId, pTask);
if (NULL == *pTask) {
schGetTaskFromList(pJob->succTasks, taskId, pTask);
if (NULL == *pTask) {
SCH_JOB_ELOG("task not found in execList & succList, taskId:%" PRIx64, taskId);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) {
if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 ||
taosArrayGetSize(pTask->execNodes) <= 0) {
......
......@@ -358,27 +358,11 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
}
schGetTaskFromTaskList(pJob->execTasks, pParam->taskId, &pTask);
if (NULL == pTask) {
if (TDMT_VND_EXPLAIN_RSP == msgType) {
schGetTaskFromTaskList(pJob->succTasks, pParam->taskId, &pTask);
} else {
SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
}
if (NULL == pTask) {
SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_ERR_JRET(schGetTaskInJob(pJob, pParam->taskId, &pTask));
SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
SCH_SET_TASK_HANDLE(pTask, pMsg->handle);
schUpdateTaskExecNodeHandle(pTask, pMsg->handle, rspCode);
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, msgType, pMsg->handle, rspCode));
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册