未验证 提交 5250f305 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #14719 from taosdata/enh/redirect

enh: enhance query redirect processing
...@@ -325,11 +325,13 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) { ...@@ -325,11 +325,13 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
if (pInfo->pQnodeList) { if (pInfo->pQnodeList) {
taosArrayDestroy(pInfo->pQnodeList); taosArrayDestroy(pInfo->pQnodeList);
pInfo->pQnodeList = NULL; pInfo->pQnodeList = NULL;
tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
} }
if (pNodeList) { if (pNodeList) {
pInfo->pQnodeList = taosArrayDup(pNodeList); pInfo->pQnodeList = taosArrayDup(pNodeList);
taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad); taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%d", pInfo->clusterId, taosArrayGetSize(pInfo->pQnodeList));
} }
taosThreadMutexUnlock(&pInfo->qnodeMutex); taosThreadMutexUnlock(&pInfo->qnodeMutex);
......
...@@ -381,9 +381,11 @@ static int32_t tDeserializeSClientHbRsp(SDecoder *pDecoder, SClientHbRsp *pRsp) ...@@ -381,9 +381,11 @@ static int32_t tDeserializeSClientHbRsp(SDecoder *pDecoder, SClientHbRsp *pRsp)
if (pQnodeNum > 0) { if (pQnodeNum > 0) {
pRsp->query->pQnodeList = taosArrayInit(pQnodeNum, sizeof(SQueryNodeLoad)); pRsp->query->pQnodeList = taosArrayInit(pQnodeNum, sizeof(SQueryNodeLoad));
if (NULL == pRsp->query->pQnodeList) return -1; if (NULL == pRsp->query->pQnodeList) return -1;
SQueryNodeLoad load = {0}; for (int32_t i = 0; i < pQnodeNum; ++i) {
if (tDecodeSQueryNodeLoad(pDecoder, &load) < 0) return -1; SQueryNodeLoad load = {0};
taosArrayPush(pRsp->query->pQnodeList, &load); if (tDecodeSQueryNodeLoad(pDecoder, &load) < 0) return -1;
taosArrayPush(pRsp->query->pQnodeList, &load);
}
} }
} }
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
#include "tmsg.h" #include "tmsg.h"
#include "tname.h" #include "tname.h"
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .tmp = false}; SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .tmp = true};
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
if (!gQWDebug.statusEnable) { if (!gQWDebug.statusEnable) {
......
...@@ -35,7 +35,6 @@ extern "C" { ...@@ -35,7 +35,6 @@ extern "C" {
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000 #define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000 #define SCH_MAX_TASK_TIMEOUT_USEC 60000000
#define SCH_TASK_MAX_EXEC_TIMES 8
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA #define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
enum { enum {
...@@ -179,10 +178,10 @@ typedef struct SSchLevel { ...@@ -179,10 +178,10 @@ typedef struct SSchLevel {
} SSchLevel; } SSchLevel;
typedef struct SSchTaskProfile { typedef struct SSchTaskProfile {
int64_t startTs; int64_t startTs;
int64_t execUseTime[SCH_TASK_MAX_EXEC_TIMES]; int64_t* execTime;
int64_t waitTime; int64_t waitTime;
int64_t endTs; int64_t endTs;
} SSchTaskProfile; } SSchTaskProfile;
typedef struct SSchTask { typedef struct SSchTask {
...@@ -260,33 +259,7 @@ typedef struct SSchJob { ...@@ -260,33 +259,7 @@ typedef struct SSchJob {
extern SSchedulerMgmt schMgmt; extern SSchedulerMgmt schMgmt;
#define SCH_LOG_TASK_START_TS(_task) \ #define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.execTime[(_task)->execId % (_task)->maxExecTimes]) > (_task)->timeoutUsec)
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId % SCH_TASK_MAX_EXEC_TIMES; \
(_task)->profile.execUseTime[idx] = us; \
if (0 == (_task)->execId) { \
(_task)->profile.startTs = us; \
} \
} while (0)
#define SCH_LOG_TASK_WAIT_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId % SCH_TASK_MAX_EXEC_TIMES; \
(_task)->profile.waitTime += us - (_task)->profile.execUseTime[idx]; \
} while (0)
#define SCH_LOG_TASK_END_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId % SCH_TASK_MAX_EXEC_TIMES; \
(_task)->profile.execUseTime[idx] = us - (_task)->profile.execUseTime[idx]; \
(_task)->profile.endTs = us; \
} while (0)
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.execUseTime[(_task)->execId % SCH_TASK_MAX_EXEC_TIMES]) > (_task)->timeoutUsec)
#define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children)) #define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
...@@ -320,6 +293,7 @@ extern SSchedulerMgmt schMgmt; ...@@ -320,6 +293,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) #define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) #define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY) #define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
#define SCH_TASK_MAX_EXEC_TIMES(_levelIdx, _levelNum) (SCH_MAX_CANDIDATE_EP_NUM * ((_levelNum) - (_levelIdx)))
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0) #define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
...@@ -328,16 +302,43 @@ extern SSchedulerMgmt schMgmt; ...@@ -328,16 +302,43 @@ extern SSchedulerMgmt schMgmt;
#define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job)) #define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode) #define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) #define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (SCH_NETWORK_ERR(_code) && ((_len) > 0)) #define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) (SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task))))
#define SCH_NEED_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH) #define SCH_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
#define SCH_NEED_REDIRECT(_msgType, _code, _rspLen) (SCH_NEED_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_SUB_TASK_NETWORK_ERR(_code, _rspLen))) #define SCH_TASK_NEED_REDIRECT(_task, _msgType, _code, _rspLen) (SCH_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_MERGE_TASK_NETWORK_ERR((_task), (_code), (_rspLen))))
#define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_NEED_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) #define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum) #define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse]) #define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
#define SCH_SWITCH_EPSET(_addr) ((_addr)->epSet.inUse = ((_addr)->epSet.inUse + 1) % (_addr)->epSet.numOfEps) #define SCH_SWITCH_EPSET(_addr) ((_addr)->epSet.inUse = ((_addr)->epSet.inUse + 1) % (_addr)->epSet.numOfEps)
#define SCH_TASK_NUM_OF_EPS(_addr) ((_addr)->epSet.numOfEps) #define SCH_TASK_NUM_OF_EPS(_addr) ((_addr)->epSet.numOfEps)
#define SCH_LOG_TASK_START_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId % (_task)->maxExecTimes; \
(_task)->profile.execTime[idx] = us; \
if (0 == (_task)->execId) { \
(_task)->profile.startTs = us; \
} \
} while (0)
#define SCH_LOG_TASK_WAIT_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId % (_task)->maxExecTimes; \
(_task)->profile.waitTime += us - (_task)->profile.execTime[idx]; \
} while (0)
#define SCH_LOG_TASK_END_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId % (_task)->maxExecTimes; \
(_task)->profile.execTime[idx] = us - (_task)->profile.execTime[idx]; \
(_task)->profile.endTs = us; \
} while (0)
#define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) #define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) #define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
...@@ -431,7 +432,8 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask); ...@@ -431,7 +432,8 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask);
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list); void schDropTaskInHashList(SSchJob *pJob, SHashObj *list);
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level); int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask); int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel); int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum);
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -337,7 +337,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { ...@@ -337,7 +337,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_SET_JOB_TYPE(pJob, plan->subplanType); SCH_SET_JOB_TYPE(pJob, plan->subplanType);
SSchTask task = {0}; SSchTask task = {0};
SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel)); SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel, levelNum));
SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task); SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task);
if (NULL == pTask) { if (NULL == pTask) {
......
...@@ -85,7 +85,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa ...@@ -85,7 +85,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
SCH_ERR_JRET(schValidateRspMsgType(pJob, pTask, msgType)); SCH_ERR_JRET(schValidateRspMsgType(pJob, pTask, msgType));
int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1); int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1);
if (SCH_NEED_REDIRECT(reqType, rspCode, pMsg->len)) { if (SCH_TASK_NEED_REDIRECT(pTask, reqType, rspCode, pMsg->len)) {
SCH_RET(schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode)); SCH_RET(schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode));
} }
......
...@@ -46,21 +46,31 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) { ...@@ -46,21 +46,31 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
} }
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) { int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum) {
int32_t code = 0;
pTask->plan = pPlan; pTask->plan = pPlan;
pTask->level = pLevel; pTask->level = pLevel;
pTask->execId = -1; pTask->execId = -1;
pTask->maxExecTimes = SCH_TASK_MAX_EXEC_TIMES; pTask->maxExecTimes = SCH_TASK_MAX_EXEC_TIMES(pLevel->level, levelNum);
pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC; pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
pTask->taskId = schGenTaskId(); pTask->taskId = schGenTaskId();
pTask->execNodes = taosHashInit(SCH_MAX_CANDIDATE_EP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pTask->execNodes = taosHashInit(SCH_MAX_CANDIDATE_EP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (NULL == pTask->execNodes) { pTask->profile.execTime = taosMemoryCalloc(pTask->maxExecTimes, sizeof(int64_t));
SCH_TASK_ELOG("taosHashInit %d execNodes failed", SCH_MAX_CANDIDATE_EP_NUM); if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return:
taosMemoryFreeClear(pTask->profile.execTime);
taosHashCleanup(pTask->execNodes);
SCH_RET(code);
} }
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) { int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
...@@ -338,6 +348,11 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32 ...@@ -338,6 +348,11 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32
qClearSubplanExecutionNode(pTask->plan); qClearSubplanExecutionNode(pTask->plan);
// Note: current error task and upper level merge task
if ((pData && 0 == pData->len) || NULL == pData) {
SCH_ERR_JRET(schSwitchTaskCandidateAddr(pJob, pTask));
}
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
int32_t childrenNum = taosArrayGetSize(pTask->children); int32_t childrenNum = taosArrayGetSize(pTask->children);
...@@ -531,10 +546,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { ...@@ -531,10 +546,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
if (SCH_IS_DATA_BIND_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
SCH_SWITCH_EPSET(&pTask->plan->execNode); SCH_SWITCH_EPSET(&pTask->plan->execNode);
} else { } else {
int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
if (++pTask->candidateIdx >= candidateNum) {
pTask->candidateIdx = 0;
}
} }
SCH_ERR_RET(schLaunchTask(pJob, pTask)); SCH_ERR_RET(schLaunchTask(pJob, pTask));
...@@ -633,6 +645,16 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSe ...@@ -633,6 +645,16 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSe
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
if (++pTask->candidateIdx >= candidateNum) {
pTask->candidateIdx = 0;
}
SCH_TASK_DLOG("switch task candiateIdx to %d", pTask->candidateIdx);
return TSDB_CODE_SUCCESS;
}
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) { int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId)); int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册