From 072a73ec3d1d2b3547bc1ba8b2adec3d553978e4 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 21 Nov 2022 11:35:52 +0800 Subject: [PATCH] enh: support max retry wait time configuration --- include/common/tglobal.h | 4 +++ source/common/src/tglobal.c | 8 +++++ source/libs/scheduler/inc/schInt.h | 2 ++ source/libs/scheduler/src/schRemote.c | 11 ++++-- source/libs/scheduler/src/schTask.c | 49 ++++++++++++++++----------- 5 files changed, 51 insertions(+), 23 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 2076906f70..d23b1f519a 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -90,6 +90,10 @@ extern int32_t tsQueryNodeChunkSize; extern bool tsQueryUseNodeAllocator; extern bool tsKeepColumnName; extern bool tsEnableQueryHb; +extern int32_t tsRedirectPeriod; +extern int32_t tsRedirectFactor; +extern int32_t tsRedirectMaxPeriod; +extern int32_t tsMaxRetryWaitTime; // client extern int32_t tsMinSlidingTime; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 27dcbd5be3..cb82ad300b 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -87,6 +87,10 @@ bool tsQueryPlannerTrace = false; int32_t tsQueryNodeChunkSize = 32 * 1024; bool tsQueryUseNodeAllocator = true; bool tsKeepColumnName = false; +int32_t tsRedirectPeriod = 100; +int32_t tsRedirectFactor = 5; +int32_t tsRedirectMaxPeriod = 10000; +int32_t tsMaxRetryWaitTime = 60000; /* * denote if the server needs to compress response message at the application layer to client, including query rsp, @@ -301,6 +305,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1; if (cfgAddInt32(pCfg, "rpcRetryLimit", tsRpcRetryLimit, 1, 100000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "rpcRetryInterval", tsRpcRetryInterval, 1, 100000, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, 0) != 0) return -1; tsNumOfTaskQueueThreads = tsNumOfCores / 2; tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); @@ -645,6 +650,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsRpcRetryLimit = cfgGetItem(pCfg, "rpcRetryLimit")->i32; tsRpcRetryInterval = cfgGetItem(pCfg, "rpcRetryInterval")->i32; + tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32; return 0; } @@ -860,6 +866,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32; } else if (strcasecmp("maxMemUsedByInsert", name) == 0) { tsMaxMemUsedByInsert = cfgGetItem(pCfg, "maxMemUsedByInsert")->i32; + } else if (strcasecmp("maxRetryWaitTime", name) == 0) { + tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32; } break; } diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 7cd8d608c2..83b58a77d2 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -27,6 +27,7 @@ extern "C" { #include "tarray.h" #include "thash.h" #include "trpc.h" +#include "ttimer.h" enum { SCH_READ = 1, @@ -507,6 +508,7 @@ extern SSchedulerMgmt schMgmt; void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask); void schCleanClusterHb(void *pTrans); int32_t schLaunchTask(SSchJob *job, SSchTask *task); +int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask); int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType); SSchJob *schAcquireJob(int64_t refId); int32_t schReleaseJob(int64_t refId); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index a6a2a6c301..4ebe07cf58 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -887,9 +887,14 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, msg, msgSize, msgType, trans, isHb, &pMsgSendInfo)); SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask)); - qDebug("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId, - epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle); - + if (pJob && pTask) { + SCH_TASK_DLOG("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId, + epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle); + } else { + qDebug("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId, + epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle); + } + if (pTask) { pTask->lastMsgType = msgType; } diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 9262a9257c..0b235e63e9 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -340,7 +340,7 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t schChkUpdateRedirectCtx(SSchTask *pTask, SEpSet *pEpSet) { +int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) { SSchRedirectCtx *pCtx = &pTask->redirectCtx; if (!pCtx->inRedirect) { pCtx->inRedirect = true; @@ -362,13 +362,6 @@ int32_t schChkUpdateRedirectCtx(SSchTask *pTask, SEpSet *pEpSet) { } pCtx->totalTimes++; - - int64_t nowTs = taosGetTimestampMs(); - if ((nowTs - pCtx->startTs) > tsMaxRetryWaitTime) { - SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d", - nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes); - SCH_ERR_RET(TSDB_CODE_TIMEOUT_ERROR); - } if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) { pCtx->roundTotal = pEpSet->numOfEps; @@ -382,12 +375,21 @@ int32_t schChkUpdateRedirectCtx(SSchTask *pTask, SEpSet *pEpSet) { pCtx->roundTimes++; if (pCtx->roundTimes >= pCtx->roundTotal) { + int64_t nowTs = taosGetTimestampMs(); + int64_t lastTime = nowTs - pCtx->startTs; + if (lastTime > tsMaxRetryWaitTime) { + SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d", + nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes); + SCH_ERR_RET(TSDB_CODE_TIMEOUT_ERROR); + } + pCtx->periodMs *= tsRedirectFactor; if (pCtx->periodMs > tsRedirectMaxPeriod) { pCtx->periodMs = tsRedirectMaxPeriod; } - pTask->delayExecMs = pCtx->periodMs; + int64_t leftTime = tsMaxRetryWaitTime - lastTime; + pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs; goto _return; } @@ -410,7 +412,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 pTask->retryTimes = 0; } - SCH_ERR_JRET(schChkUpdateRedirectCtx(pTask, pData ? pData->pEpSet : NULL)); + SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL)); pTask->waitRetry = true; @@ -431,6 +433,10 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); SCH_SWITCH_EPSET(addr); SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps); + } else { + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + SEp *pEp = &addr->epSet.eps[addr->epSet.inUse]; + SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps, pEp->fqdn, pEp->port); } if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { @@ -1141,15 +1147,13 @@ void schHandleTimerEvent(void *param, void *tmrId) { SSchJob *pJob = NULL; int32_t code = 0; - SCH_ERR_RET(schProcessOnCbBegin(&pJob, &pTask, pTimerParam->queryId, pTimerParam->rId, pTimerParam->taskId)); - - SCH_ERR_JRET(schLaunchTask(pJob, pTask)); - - return; + if (schProcessOnCbBegin(&pJob, &pTask, pTimerParam->queryId, pTimerParam->rId, pTimerParam->taskId)) { + return; + } -_return: + code = schLaunchTask(pJob, pTask); - schHandleJobFailure(pJob, code); + schProcessOnCbEnd(pJob, pTask, code); } int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) { @@ -1157,7 +1161,7 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) { SSchTimerParam *param = taosMemoryMalloc(sizeof(SSchTimerParam)); if (NULL == param) { SCH_TASK_ELOG("taosMemoryMalloc %d failed", sizeof(SSchTimerParam)); - QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } param->rId = pJob->refId; @@ -1167,8 +1171,8 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) { if (NULL == pTask->delayTimer) { pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer); if (NULL == pTask->delayTimer) { - SCH_TASK_ELOG("start delay timer failed"); - QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + SCH_TASK_ELOG("start delay timer failed, handle:%p", schMgmt.timer); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } return TSDB_CODE_SUCCESS; @@ -1203,7 +1207,12 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { while (pIter) { SSchTask *pTask = *(SSchTask **)pIter; + SCH_LOCK_TASK(pTask); + if (pTask->delayTimer) { + taosTmrStopA(&pTask->delayTimer); + } schDropTaskOnExecNode(pJob, pTask); + SCH_UNLOCK_TASK(pTask); pIter = taosHashIterate(list, pIter); } -- GitLab