diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 0fa01a304ca605b6feb1dda7528cc8a27a397b4b..68058334ab6985d13eed6edc2c80a86ffbeb0c86 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -9,7 +9,7 @@ #include "tmsg.h" #include "tname.h" -SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .tmp = true}; +SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .tmp = false}; int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { if (!gQWDebug.statusEnable) { @@ -138,6 +138,7 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int .code = code, .info = *pConn, }; + rpcRsp.info.hasEpSet = 1; tmsgSendRsp(&rpcRsp); @@ -146,6 +147,35 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int return TSDB_CODE_SUCCESS; } +int32_t qwDbgResponseREdirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) { + if (gQWDebug.tmp) { + if (TDMT_SCH_QUERY == qwMsg->msgType) { + SEpSet epSet = {0}; + epSet.inUse = 1; + epSet.numOfEps = 3; + strcpy(epSet.eps[0].fqdn, "localhost"); + epSet.eps[0].port = 7100; + strcpy(epSet.eps[1].fqdn, "localhost"); + epSet.eps[1].port = 7200; + strcpy(epSet.eps[2].fqdn, "localhost"); + epSet.eps[2].port = 7300; + + qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet); + gQWDebug.tmp = false; + return TSDB_CODE_SUCCESS; + } + + if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType) { + ctx->phase = QW_PHASE_POST_QUERY; + qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL); + gQWDebug.tmp = false; + return TSDB_CODE_SUCCESS; + } + } + + return TSDB_CODE_SUCCESS; +} + int32_t qwDbgEnableDebug(char *option) { if (0 == strcasecmp(option, "lock")) { diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index cd48452df2334cf14d6ff8058c7f6e50ac4abb69..949b67249fbfff0819d3fa7c3de0e1c03f172272 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -564,35 +564,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); } - - if (gQWDebug.tmp) { -#if 1 - if (TDMT_SCH_QUERY == qwMsg->msgType) { - SEpSet epSet = {0}; - epSet.inUse = 1; - epSet.numOfEps = 3; - strcpy(epSet.eps[0].fqdn, "localhost"); - epSet.eps[0].port = 7100; - strcpy(epSet.eps[1].fqdn, "localhost"); - epSet.eps[1].port = 7200; - strcpy(epSet.eps[2].fqdn, "localhost"); - epSet.eps[2].port = 7300; - - qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet); - gQWDebug.tmp = false; - return TSDB_CODE_SUCCESS; - } -#else - if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType) { - ctx->phase = QW_PHASE_POST_QUERY; - qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL); - gQWDebug.tmp = false; - return TSDB_CODE_SUCCESS; - } -#endif - } - - _return: input.code = code; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 29179536df554ab7621be3756e46d719137fb5ce..b2a96cbb23946153a75f503f45d82b0c49b206eb 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -1684,7 +1684,7 @@ _return: SCH_RET(code); } -int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, int32_t rspCode) { +int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) { int32_t code = 0; int8_t status = 0; if (schJobNeedToStop(pJob, &status)) { @@ -1711,6 +1711,10 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, int32_t rspCode) { memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr)); if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { + if (pData) { + SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet)); + } + if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { if (JOB_TASK_STATUS_EXECUTING == SCH_GET_TASK_STATUS(pTask)) { SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask)); @@ -1737,7 +1741,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, int32_t rspCode) { for (int32_t i = 0; i < childrenNum; ++i) { SSchTask* pChild = taosArrayGetP(pTask->children, i); SCH_LOCK_TASK(pChild); - schDoTaskRedirect(pJob, pChild, rspCode); + schDoTaskRedirect(pJob, pChild, NULL, rspCode); SCH_UNLOCK_TASK(pChild); } @@ -1758,11 +1762,9 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32 SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode)); SCH_ERR_JRET(rspCode); } - - SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet)); } - SCH_RET(schDoTaskRedirect(pJob, pTask, rspCode)); + SCH_RET(schDoTaskRedirect(pJob, pTask, pData, rspCode)); _return: