diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index cbf0c4b2fe35ddb7948fc2f9a724b7bc4a7442a1..df14b0b34e18921196ded3b6dcfec3bc6cd892a3 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -201,12 +201,13 @@ enum { TD_NEW_MSG_SEG(TDMT_SCH_MSG) TD_DEF_MSG_TYPE(TDMT_SCH_QUERY, "query", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SCH_MERGE_QUERY, "merge-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_CONTINUE, "query-continue", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_HEARTBEAT, "query-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_FETCH, "fetch", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_SCH_CANCEL_TASK, "vnode-cancel-task", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "vnode-drop-task", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "vnode-explain", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SCH_CANCEL_TASK, "cancel-task", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "drop-task", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL) TD_NEW_MSG_SEG(TDMT_STREAM_MSG) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 0b767e96f64e7f4528b51480d907b21ac2d0c2ad..3681c2f071ee2ae2bbf6f840c875e4920fe79de9 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -138,6 +138,7 @@ typedef struct SDataBuf { void* pData; uint32_t len; void* handle; + SEpSet* pEpSet; } SDataBuf; typedef struct STargetInfo { @@ -234,13 +235,24 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define NEED_CLIENT_HANDLE_ERROR(_code) \ (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \ NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) +#define NEED_REDIRECT_ERROR(_code) \ + ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \ + (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || (_code) == TSDB_CODE_SYN_NOT_LEADER || \ + (_code) == TSDB_CODE_APP_NOT_READY) + #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_VND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ (_type) == TDMT_VND_DROP_STB) +#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \ + ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \ + (_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_APP_NOT_READY) + #define NEED_SCHEDULER_RETRY_ERROR(_code) \ - ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \ - (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) + (NEED_SCHEDULER_REDIRECT_ERROR(_code) || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) + + + #define REQUEST_TOTAL_EXEC_TIMES 2 diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index a36c7a004844d1a8616eb518c19f369cf914a4a4..4cc79096d157bb460c3c967ccf197d05eff396fc 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -89,9 +89,8 @@ void closeTransporter(SAppInstInfo *pAppInfo) { } static bool clientRpcRfp(int32_t code, tmsg_t msgType) { - if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || - code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) { - if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_FETCH) { + if (NEED_REDIRECT_ERROR(code)) { + if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) { return false; } return true; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 632b8441c486cba140df24cc93af214c21525ac4..68a225f64c011e760e414de014ccc52af8b85eef 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -808,7 +808,8 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset); break; } - case TDMT_SCH_QUERY: { + case TDMT_SCH_QUERY: + case TDMT_SCH_MERGE_QUERY: { code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset); break; } @@ -1306,7 +1307,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet); - SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle}; + SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle, .pEpSet = pEpSet}; if (pMsg->contLen > 0) { buf.pData = taosMemoryCalloc(1, pMsg->contLen); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index fbf337949031710b8c995ad4450c110a4cc86b87..59d68b2110cfd002a9bb02658df344a872db27b7 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -210,6 +210,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_SERVER_VERSION, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c index 1a7b5c1a1b069cda06461ff6a033d59c9e350e1d..5911daaa2bee45f869cd1f855b747234f83ab2c9 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c @@ -108,6 +108,7 @@ SArray *qmGetMsgHandles() { // Requests handled by VNODE if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSP, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 938e5f32b0bc36d213e21d1d289daee9c7b0fde7..5bc95825270439446802d38f3e9eaaec68eb2bdf 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -325,6 +325,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index c1b5b86d4cca4cd4ecdd9d7d09bad6619a449e4e..75e69e3716979018e8d4df228cf64dbe509f78ff 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -251,7 +251,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { static bool rpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) { - if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_FETCH) { + if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) { return false; } return true; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 0a9c5a82c8dc6a9fe50011325f8dcc56da1b3833..0f353b20a305236d9323d0fd2e34759401406d7f 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -527,9 +527,9 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { if (!IsReq(pMsg)) return 0; - if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || - pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT || pMsg->msgType == TDMT_SCH_FETCH || - pMsg->msgType == TDMT_SCH_DROP_TASK) { + if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY || + pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT || + pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) { return 0; } if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0; diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index b67a11997e7ded55f6bec336bc8165d566fe6c19..aec99fa3b75f65ebdb72d67ee6be4287be52cc89 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -19,13 +19,13 @@ #include "qworker.h" int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg) { - if (TDMT_SCH_QUERY != pMsg->msgType) return 0; + if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) return 0; SMnode *pMnode = pMsg->info.node; return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg); } void mndPostProcessQueryMsg(SRpcMsg *pMsg) { - if (TDMT_SCH_QUERY != pMsg->msgType) return; + if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) return; SMnode *pMnode = pMsg->info.node; qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg); } @@ -38,6 +38,7 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { mTrace("msg:%p, in query queue is processing", pMsg); switch (pMsg->msgType) { case TDMT_SCH_QUERY: + case TDMT_SCH_MERGE_QUERY: code = qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pMsg, 0); break; case TDMT_SCH_QUERY_CONTINUE: @@ -68,6 +69,7 @@ int32_t mndInitQuery(SMnode *pMnode) { } mndSetMsgHandle(pMnode, TDMT_SCH_QUERY, mndProcessQueryMsg); + mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_QUERY, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_CONTINUE, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_FETCH, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg); diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index a0aa640ed609653474833e5fa8903240aaafd8a3..5909d6f599af97c0ffe8f4b2eea9fea97f504821 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -65,7 +65,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { } int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg * pMsg) { - if (TDMT_SCH_QUERY != pMsg->msgType) { + if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) { return 0; } @@ -79,6 +79,7 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) { switch (pMsg->msgType) { case TDMT_SCH_QUERY: + case TDMT_SCH_MERGE_QUERY: code = qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg, ts); break; case TDMT_SCH_QUERY_CONTINUE: diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 63b7289661a809d0669bafd8a9c4f7e5a75788b3..64fa6c705d36a415d03ca2979264f0f7a3928023 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -216,7 +216,7 @@ _err: } int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { - if (TDMT_SCH_QUERY != pMsg->msgType) { + if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) { return 0; } @@ -228,6 +228,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; switch (pMsg->msgType) { case TDMT_SCH_QUERY: + case TDMT_SCH_MERGE_QUERY: return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0); case TDMT_SCH_QUERY_CONTINUE: return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index c6c1f00395ba65204955662f20a486efeed4e272..4aad544e1910a31267c529620358b3a6ef86fadb 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -549,7 +549,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { static bool udfdRpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) { - if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_FETCH) { + if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) { return false; } return true; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 77df96f82d4c5e2bae0db2d9f25c5359a875dc99..cbb12fee0ec8edcef8e3f68cdeb9ace2d718d7fb 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1555,7 +1555,11 @@ static int32_t createPhysiSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogic if (SUBPLAN_TYPE_MODIFY == pLogicSubplan->subplanType) { code = buildVnodeModifySubplan(pCxt, pLogicSubplan, pSubplan); } else { - pSubplan->msgType = TDMT_SCH_QUERY; + if (SUBPLAN_TYPE_SCAN == pSubplan->subplanType) { + pSubplan->msgType = TDMT_SCH_QUERY; + } else { + pSubplan->msgType = TDMT_SCH_MERGE_QUERY; + } code = createPhysiNode(pCxt, pLogicSubplan->pNode, pSubplan, &pSubplan->pNode); if (TSDB_CODE_SUCCESS == code && !pCxt->pPlanCxt->streamQuery && !pCxt->pPlanCxt->topicQuery) { code = createDataDispatcher(pCxt, pSubplan->pNode, &pSubplan->pDataSink); diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 40c5807057e1e9906b9ada21030f397993e64ccd..69d48b21790c80fbb8d30bbe3fce7eb6d5dde256 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -216,7 +216,8 @@ void destroyQueryExecRes(SQueryExecRes* pRes) { tFreeSSubmitRsp((SSubmitRsp*)pRes->res); break; } - case TDMT_SCH_QUERY: { + case TDMT_SCH_QUERY: + case TDMT_SCH_MERGE_QUERY: { taosArrayDestroy((SArray*)pRes->res); break; } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index a11979578788e4e4dc22cabc8da20ec414d9db33..4e36bd3f10846d4cdb351182df92cadbc8a0bc6a 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -383,6 +383,7 @@ char* schGetOpStr(SCH_OP_TYPE type); int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync); int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob); int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes); +int32_t schUpdateTaskCandidateAddr(SSchTask *pTask, SEpSet* pEpSet); #ifdef __cplusplus diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 54d0d2f2e540712ba965b8ec75fc178ad5ab68fd..fd27193b0f8dc75805156eae005b637a6e497502 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -682,6 +682,25 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } +int32_t schUpdateTaskCandidateAddr(SSchTask *pTask, SEpSet* pEpSet) { + if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) { + SCH_TASK_ELOG("not able to update cndidate addr, addr num %d", (pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs): 0)); + SCH_ERR_RET(TSDB_CODE_APP_ERROR); + } + + SQueryNodeAddr* pAddr = taosArrayGet(pTask->candidateAddrs, 0); + + SEp* pOld = &pAddr->epSet.eps[pAddr->epSet.inUse]; + SEp* pNew = &pEpSet->eps[pEpSet->inUse]; + + SCH_TASK_DLOG("update task ep from %s:%d to %s:%d", pOld->fqdn, pOld->port, pNew->fqdn, pNew->port); + + memcpy(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet)); + + return TSDB_CODE_SUCCESS; +} + + int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) { int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId)); if (code) { @@ -821,7 +840,6 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo return TSDB_CODE_SUCCESS; } - // TODO CHECK epList/condidateList if (SCH_IS_DATA_SRC_TASK(pTask)) { if ((pTask->execIdx + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) { *needRetry = false; @@ -853,7 +871,6 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { - SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask)); SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask)); } @@ -1237,7 +1254,8 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { } SCH_LOCK_TASK(pTask); - if (JOB_TASK_STATUS_EXECUTING == pTask->status && pJob->fetchTask != pTask && taosArrayGetSize(pTask->candidateAddrs) > 1) { + if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXECUTING == pTask->status && + pJob->fetchTask != pTask && taosArrayGetSize(pTask->candidateAddrs) > 1) { SCH_TASK_DLOG("task execIdx %d will be rescheduled now", pTask->execIdx); schDropTaskOnExecNode(pJob, pTask); taosHashClear(pTask->execNodes); @@ -1281,7 +1299,7 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) { continue; } - if (taskStatus->status == JOB_TASK_STATUS_NOT_START && SCH_TASK_TIMEOUT(pTask)) { + if (taskStatus->status == JOB_TASK_STATUS_NOT_START) { schRescheduleTask(pJob, pTask); } @@ -1657,5 +1675,86 @@ _return: SCH_RET(code); } +int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, int32_t rspCode) { + int32_t code = 0; + + if ((pTask->execIdx + 1) >= pTask->maxExecTimes) { + SCH_TASK_DLOG("task no more retry since reach max try times, execIdx:%d", pTask->execIdx); + SCH_UNLOCK_TASK(pTask); + schProcessOnJobFailure(pJob, rspCode); + return TSDB_CODE_SUCCESS; + } + + SCH_TASK_DLOG("task will be redirected now, status:%d", SCH_GET_TASK_STATUS_STR(pTask)); + + schDropTaskOnExecNode(pJob, pTask); + taosHashClear(pTask->execNodes); + SCH_ERR_JRET(schRemoveTaskFromExecList(pJob, pTask)); + schDeregisterTaskHb(pJob, pTask); + atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1); + taosMemoryFreeClear(pTask->msg); + pTask->msgLen = 0; + pTask->lastMsgType = 0; + memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr)); + + if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { + if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { + if (JOB_TASK_STATUS_EXECUTING == SCH_GET_TASK_STATUS(pTask)) { + SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask)); + } + } + } else { + pTask->childReady = 0; + + int32_t childrenNum = taosArrayGetSize(pTask->children); + for (int32_t i = 0; i < childrenNum; ++i) { + SSchTask* pChild = taosArrayGetP(pTask->children, i); + SCH_LOCK_TASK(pChild); + code = schDoTaskRedirect(pJob, pChild, rspCode); + SCH_UNLOCK_TASK(pChild); + SCH_ERR_JRET(code); + } + + qClearSubplanExecutionNode(pTask->plan); + } + + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); + + SCH_ERR_JRET(schLaunchTask(pJob, pTask)); + + SCH_UNLOCK_TASK(pTask); + + return TSDB_CODE_SUCCESS; + +_return: + + code = schProcessOnTaskFailure(pJob, pTask, code); + + SCH_UNLOCK_TASK(pTask); + + SCH_RET(code); +} + +int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, int32_t msgType, SDataBuf* pData, int32_t rspCode) { + int32_t code = 0; + + if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { + if (NULL == pData->pEpSet) { + SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode)); + SCH_ERR_JRET(rspCode); + } + + SCH_ERR_JRET(schUpdateTaskCandidateAddr(pTask, pData->pEpSet)); + } + + schDoTaskRedirect(pJob, pTask, rspCode); + +_return: + + schProcessOnTaskFailure(pJob, pTask, code); + + SCH_RET(code); +} + diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index a1691a3dd310e08e122baef6e6d7fa4e13fa6d7c..76d2061936f4224a0258a152be4fe7e6b352dc95 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -90,15 +90,6 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; - int8_t status = 0; - - if (schJobNeedToStop(pJob, &status)) { - SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status), rspCode); - taosMemoryFreeClear(msg); - SCH_RET(atomic_load_32(&pJob->errCode)); - } - - SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType)); switch (msgType) { case TDMT_VND_CREATE_TABLE_RSP: { @@ -392,8 +383,23 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || rspCode == TSDB_CODE_RPC_NETWORK_UNAVAIL); SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, pParam->execIdx)); + + int8_t status = 0; + if (schJobNeedToStop(pJob, &status)) { + SCH_TASK_ELOG("rsp will not be processed cause of job status %s, rspCode:0x%x", jobTaskStatusStr(status), rspCode); + code = atomic_load_32(&pJob->errCode); + goto _return; + } + + SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType)); + + if (NEED_SCHEDULER_REDIRECT_ERROR(rspCode) || ((rspCode == TSDB_CODE_RPC_NETWORK_UNAVAIL) && msgSize > 0)) { + code = schHandleRedirect(pJob, pTask, msgType, pMsg, rspCode); + goto _return; + } SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); + pMsg->pData = NULL; _return: @@ -405,6 +411,7 @@ _return: schReleaseJob(pParam->refId); } + taosMemoryFreeClear(pMsg->pData); taosMemoryFreeClear(param); SCH_RET(code); } @@ -569,6 +576,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { *fp = schHandleSubmitCallback; break; case TDMT_SCH_QUERY: + case TDMT_SCH_MERGE_QUERY: *fp = schHandleQueryCallback; break; case TDMT_VND_DELETE: @@ -1032,7 +1040,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, tSerializeSVDeleteReq(msg, msgSize, &req); break; } - case TDMT_SCH_QUERY: { + case TDMT_SCH_QUERY: + case TDMT_SCH_MERGE_QUERY: { SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx)); uint32_t len = strlen(pJob->sql); @@ -1135,7 +1144,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL))); - if (msgType == TDMT_SCH_QUERY) { + if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) { SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execIdx)); }