提交 7f61dbc8 编写于 作者: D dapan1121

fix rpc connection fail issue

上级 fc04aa84
...@@ -365,7 +365,7 @@ int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *p ...@@ -365,7 +365,7 @@ int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *p
int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SSchResInfo *pRes); int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SSchResInfo *pRes);
int32_t schFetchRows(SSchJob *pJob); int32_t schFetchRows(SSchJob *pJob);
int32_t schAsyncFetchRows(SSchJob *pJob); int32_t schAsyncFetchRows(SSchJob *pJob);
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t execIdx); int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execIdx);
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList); int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList);
......
...@@ -396,8 +396,8 @@ int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) { ...@@ -396,8 +396,8 @@ int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t execIdx) { int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execIdx) {
if (msgType == TDMT_SCH_LINK_BROKEN) { if (dropExecNode) {
SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execIdx)); SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execIdx));
} }
......
...@@ -372,7 +372,8 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in ...@@ -372,7 +372,8 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
goto _return; goto _return;
} }
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, msgType, pMsg->handle, pParam->execIdx)); bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || rspCode == TSDB_CODE_RPC_NETWORK_UNAVAIL);
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, pParam->execIdx));
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册