提交 a8e526a6 编写于 作者: D dapan1121

feat: support query redirect

上级 407db99a
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
#include "tmsg.h" #include "tmsg.h"
#include "tname.h" #include "tname.h"
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .tmp = true}; SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .tmp = false};
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
if (!gQWDebug.statusEnable) { if (!gQWDebug.statusEnable) {
...@@ -138,6 +138,7 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int ...@@ -138,6 +138,7 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int
.code = code, .code = code,
.info = *pConn, .info = *pConn,
}; };
rpcRsp.info.hasEpSet = 1;
tmsgSendRsp(&rpcRsp); tmsgSendRsp(&rpcRsp);
...@@ -146,6 +147,35 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int ...@@ -146,6 +147,35 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwDbgResponseREdirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) {
if (gQWDebug.tmp) {
if (TDMT_SCH_QUERY == qwMsg->msgType) {
SEpSet epSet = {0};
epSet.inUse = 1;
epSet.numOfEps = 3;
strcpy(epSet.eps[0].fqdn, "localhost");
epSet.eps[0].port = 7100;
strcpy(epSet.eps[1].fqdn, "localhost");
epSet.eps[1].port = 7200;
strcpy(epSet.eps[2].fqdn, "localhost");
epSet.eps[2].port = 7300;
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet);
gQWDebug.tmp = false;
return TSDB_CODE_SUCCESS;
}
if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType) {
ctx->phase = QW_PHASE_POST_QUERY;
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL);
gQWDebug.tmp = false;
return TSDB_CODE_SUCCESS;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t qwDbgEnableDebug(char *option) { int32_t qwDbgEnableDebug(char *option) {
if (0 == strcasecmp(option, "lock")) { if (0 == strcasecmp(option, "lock")) {
......
...@@ -564,35 +564,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex ...@@ -564,35 +564,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
} }
if (gQWDebug.tmp) {
#if 1
if (TDMT_SCH_QUERY == qwMsg->msgType) {
SEpSet epSet = {0};
epSet.inUse = 1;
epSet.numOfEps = 3;
strcpy(epSet.eps[0].fqdn, "localhost");
epSet.eps[0].port = 7100;
strcpy(epSet.eps[1].fqdn, "localhost");
epSet.eps[1].port = 7200;
strcpy(epSet.eps[2].fqdn, "localhost");
epSet.eps[2].port = 7300;
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet);
gQWDebug.tmp = false;
return TSDB_CODE_SUCCESS;
}
#else
if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType) {
ctx->phase = QW_PHASE_POST_QUERY;
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL);
gQWDebug.tmp = false;
return TSDB_CODE_SUCCESS;
}
#endif
}
_return: _return:
input.code = code; input.code = code;
......
...@@ -1684,7 +1684,7 @@ _return: ...@@ -1684,7 +1684,7 @@ _return:
SCH_RET(code); SCH_RET(code);
} }
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, int32_t rspCode) { int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
int8_t status = 0; int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) { if (schJobNeedToStop(pJob, &status)) {
...@@ -1711,6 +1711,10 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, int32_t rspCode) { ...@@ -1711,6 +1711,10 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, int32_t rspCode) {
memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr)); memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
if (pData) {
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
}
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
if (JOB_TASK_STATUS_EXECUTING == SCH_GET_TASK_STATUS(pTask)) { if (JOB_TASK_STATUS_EXECUTING == SCH_GET_TASK_STATUS(pTask)) {
SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask)); SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
...@@ -1737,7 +1741,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, int32_t rspCode) { ...@@ -1737,7 +1741,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, int32_t rspCode) {
for (int32_t i = 0; i < childrenNum; ++i) { for (int32_t i = 0; i < childrenNum; ++i) {
SSchTask* pChild = taosArrayGetP(pTask->children, i); SSchTask* pChild = taosArrayGetP(pTask->children, i);
SCH_LOCK_TASK(pChild); SCH_LOCK_TASK(pChild);
schDoTaskRedirect(pJob, pChild, rspCode); schDoTaskRedirect(pJob, pChild, NULL, rspCode);
SCH_UNLOCK_TASK(pChild); SCH_UNLOCK_TASK(pChild);
} }
...@@ -1758,11 +1762,9 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32 ...@@ -1758,11 +1762,9 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32
SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode)); SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode));
SCH_ERR_JRET(rspCode); SCH_ERR_JRET(rspCode);
} }
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
} }
SCH_RET(schDoTaskRedirect(pJob, pTask, rspCode)); SCH_RET(schDoTaskRedirect(pJob, pTask, pData, rspCode));
_return: _return:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册