提交 74859ae1 编写于 作者: dengyihao's avatar dengyihao

Merge branch 'enh/clientRetry' of https://github.com/taosdata/TDengine into retry

...@@ -90,6 +90,10 @@ extern int32_t tsQueryNodeChunkSize; ...@@ -90,6 +90,10 @@ extern int32_t tsQueryNodeChunkSize;
extern bool tsQueryUseNodeAllocator; extern bool tsQueryUseNodeAllocator;
extern bool tsKeepColumnName; extern bool tsKeepColumnName;
extern bool tsEnableQueryHb; extern bool tsEnableQueryHb;
extern int32_t tsRedirectPeriod;
extern int32_t tsRedirectFactor;
extern int32_t tsRedirectMaxPeriod;
extern int32_t tsMaxRetryWaitTime;
// client // client
extern int32_t tsMinSlidingTime; extern int32_t tsMinSlidingTime;
......
...@@ -259,9 +259,15 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t ...@@ -259,9 +259,15 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_CLIENT_HANDLE_ERROR(_code) \ #define NEED_CLIENT_HANDLE_ERROR(_code) \
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \ (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
#define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) (false) // used later
#define NEED_REDIRECT_ERROR(_code) \ #define NEED_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \ ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
(_code) == TSDB_CODE_NODE_NOT_DEPLOYED || (_code) == TSDB_CODE_SYN_NOT_LEADER || \ (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \
SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || \
(_code) == TSDB_CODE_APP_NOT_READY || (_code) == TSDB_CODE_RPC_BROKEN_LINK) (_code) == TSDB_CODE_APP_NOT_READY || (_code) == TSDB_CODE_RPC_BROKEN_LINK)
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
...@@ -270,7 +276,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t ...@@ -270,7 +276,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \ #define NEED_SCHEDULER_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \ ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \
(_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_APP_NOT_READY) SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || \
SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || (_code) == TSDB_CODE_APP_NOT_READY)
#define REQUEST_TOTAL_EXEC_TIMES 2 #define REQUEST_TOTAL_EXEC_TIMES 2
......
...@@ -92,6 +92,7 @@ int32_t* taosGetErrno(); ...@@ -92,6 +92,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0129) #define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0129)
#define TSDB_CODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x012A) #define TSDB_CODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x012A)
#define TSDB_CODE_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012B) #define TSDB_CODE_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012B)
#define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C)
//client //client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
......
...@@ -87,6 +87,10 @@ bool tsQueryPlannerTrace = false; ...@@ -87,6 +87,10 @@ bool tsQueryPlannerTrace = false;
int32_t tsQueryNodeChunkSize = 32 * 1024; int32_t tsQueryNodeChunkSize = 32 * 1024;
bool tsQueryUseNodeAllocator = true; bool tsQueryUseNodeAllocator = true;
bool tsKeepColumnName = false; bool tsKeepColumnName = false;
int32_t tsRedirectPeriod = 100;
int32_t tsRedirectFactor = 5;
int32_t tsRedirectMaxPeriod = 10000;
int32_t tsMaxRetryWaitTime = 60000;
/* /*
* denote if the server needs to compress response message at the application layer to client, including query rsp, * denote if the server needs to compress response message at the application layer to client, including query rsp,
...@@ -301,6 +305,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { ...@@ -301,6 +305,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1; if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1;
if (cfgAddInt32(pCfg, "rpcRetryLimit", tsRpcRetryLimit, 1, 100000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "rpcRetryLimit", tsRpcRetryLimit, 1, 100000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "rpcRetryInterval", tsRpcRetryInterval, 1, 100000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "rpcRetryInterval", tsRpcRetryInterval, 1, 100000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, 0) != 0) return -1;
tsNumOfTaskQueueThreads = tsNumOfCores / 2; tsNumOfTaskQueueThreads = tsNumOfCores / 2;
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4);
...@@ -645,6 +650,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { ...@@ -645,6 +650,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsRpcRetryLimit = cfgGetItem(pCfg, "rpcRetryLimit")->i32; tsRpcRetryLimit = cfgGetItem(pCfg, "rpcRetryLimit")->i32;
tsRpcRetryInterval = cfgGetItem(pCfg, "rpcRetryInterval")->i32; tsRpcRetryInterval = cfgGetItem(pCfg, "rpcRetryInterval")->i32;
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
return 0; return 0;
} }
...@@ -860,6 +866,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { ...@@ -860,6 +866,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32; tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
} else if (strcasecmp("maxMemUsedByInsert", name) == 0) { } else if (strcasecmp("maxMemUsedByInsert", name) == 0) {
tsMaxMemUsedByInsert = cfgGetItem(pCfg, "maxMemUsedByInsert")->i32; tsMaxMemUsedByInsert = cfgGetItem(pCfg, "maxMemUsedByInsert")->i32;
} else if (strcasecmp("maxRetryWaitTime", name) == 0) {
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
} }
break; break;
} }
......
...@@ -66,7 +66,7 @@ void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -66,7 +66,7 @@ void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
} }
pMsg->info.hasEpSet = 1; pMsg->info.hasEpSet = 1;
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info, .msgType = pMsg->msgType + 1}; SRpcMsg rsp = {.code = TSDB_CODE_SYN_NOT_LEADER, .info = pMsg->info, .msgType = pMsg->msgType + 1};
tmsgSendRedirectRsp(&rsp, &newEpSet); tmsgSendRedirectRsp(&rsp, &newEpSet);
} }
......
...@@ -27,6 +27,7 @@ extern "C" { ...@@ -27,6 +27,7 @@ extern "C" {
#include "tarray.h" #include "tarray.h"
#include "thash.h" #include "thash.h"
#include "trpc.h" #include "trpc.h"
#include "ttimer.h"
enum { enum {
SCH_READ = 1, SCH_READ = 1,
...@@ -146,6 +147,7 @@ typedef struct SSchedulerMgmt { ...@@ -146,6 +147,7 @@ typedef struct SSchedulerMgmt {
int32_t jobRef; int32_t jobRef;
int32_t jobNum; int32_t jobNum;
SSchStat stat; SSchStat stat;
void *timer;
SRWLatch hbLock; SRWLatch hbLock;
SHashObj *hbConnections; SHashObj *hbConnections;
void *queryMgmt; void *queryMgmt;
...@@ -202,12 +204,30 @@ typedef struct SSchTaskProfile { ...@@ -202,12 +204,30 @@ typedef struct SSchTaskProfile {
int64_t endTs; int64_t endTs;
} SSchTaskProfile; } SSchTaskProfile;
typedef struct SSchRedirectCtx {
int32_t periodMs;
bool inRedirect;
int32_t totalTimes;
int32_t roundTotal;
int32_t roundTimes; // retry times in current round
int64_t startTs;
} SSchRedirectCtx;
typedef struct SSchTimerParam {
int64_t rId;
uint64_t queryId;
uint64_t taskId;
} SSchTimerParam;
typedef struct SSchTask { typedef struct SSchTask {
uint64_t taskId; // task id uint64_t taskId; // task id
SRWLatch lock; // task reentrant lock SRWLatch lock; // task reentrant lock
int32_t maxExecTimes; // task max exec times int32_t maxExecTimes; // task max exec times
int32_t maxRetryTimes; // task max retry times int32_t maxRetryTimes; // task max retry times
int32_t retryTimes; // task retry times int32_t retryTimes; // task retry times
int32_t delayExecMs; // task execution delay time
tmr_h delayTimer; // task delay execution timer
SSchRedirectCtx redirectCtx; // task redirect context
bool waitRetry; // wait for retry bool waitRetry; // wait for retry
int32_t execId; // task current execute index int32_t execId; // task current execute index
SSchLevel *level; // level SSchLevel *level; // level
...@@ -488,6 +508,7 @@ extern SSchedulerMgmt schMgmt; ...@@ -488,6 +508,7 @@ extern SSchedulerMgmt schMgmt;
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask); void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
void schCleanClusterHb(void *pTrans); void schCleanClusterHb(void *pTrans);
int32_t schLaunchTask(SSchJob *job, SSchTask *task); int32_t schLaunchTask(SSchJob *job, SSchTask *task);
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask);
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType); int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
SSchJob *schAcquireJob(int64_t refId); SSchJob *schAcquireJob(int64_t refId);
int32_t schReleaseJob(int64_t refId); int32_t schReleaseJob(int64_t refId);
...@@ -529,6 +550,7 @@ int32_t schJobFetchRows(SSchJob *pJob); ...@@ -529,6 +550,7 @@ int32_t schJobFetchRows(SSchJob *pJob);
int32_t schJobFetchRowsA(SSchJob *pJob); int32_t schJobFetchRowsA(SSchJob *pJob);
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId); int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId);
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList); int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList);
char *schDumpEpSet(SEpSet *pEpSet);
char *schGetOpStr(SCH_OP_TYPE type); char *schGetOpStr(SCH_OP_TYPE type);
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync); int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq); int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
......
...@@ -887,8 +887,13 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery ...@@ -887,8 +887,13 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, msg, msgSize, msgType, trans, isHb, &pMsgSendInfo)); SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, msg, msgSize, msgType, trans, isHb, &pMsgSendInfo));
SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask)); SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask));
if (pJob && pTask) {
SCH_TASK_DLOG("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId,
epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle);
} else {
qDebug("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId, qDebug("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId,
epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle); epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle);
}
if (pTask) { if (pTask) {
pTask->lastMsgType = msgType; pTask->lastMsgType = msgType;
......
...@@ -340,6 +340,69 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { ...@@ -340,6 +340,69 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
SSchRedirectCtx *pCtx = &pTask->redirectCtx;
if (!pCtx->inRedirect) {
pCtx->inRedirect = true;
pCtx->periodMs = tsRedirectPeriod;
pCtx->startTs = taosGetTimestampMs();
if (SCH_IS_DATA_BIND_TASK(pTask)) {
if (pEpSet) {
pCtx->roundTotal = pEpSet->numOfEps;
} else {
SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
pCtx->roundTotal = pAddr->epSet.numOfEps;
}
} else {
pCtx->roundTotal = 1;
}
goto _return;
}
pCtx->totalTimes++;
if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) {
pCtx->roundTotal = pEpSet->numOfEps;
pCtx->roundTimes = 0;
pTask->delayExecMs = 0;
goto _return;
}
pCtx->roundTimes++;
if (pCtx->roundTimes >= pCtx->roundTotal) {
int64_t nowTs = taosGetTimestampMs();
int64_t lastTime = nowTs - pCtx->startTs;
if (lastTime > tsMaxRetryWaitTime) {
SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d",
nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes);
SCH_ERR_RET(TSDB_CODE_TIMEOUT_ERROR);
}
pCtx->periodMs *= tsRedirectFactor;
if (pCtx->periodMs > tsRedirectMaxPeriod) {
pCtx->periodMs = tsRedirectMaxPeriod;
}
int64_t leftTime = tsMaxRetryWaitTime - lastTime;
pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs;
goto _return;
}
pTask->delayExecMs = 0;
_return:
SCH_TASK_DLOG("task start %d/%d/%d redirect retry, delayExec:%d", pCtx->roundTimes, pCtx->roundTotal, pCtx->totalTimes, pTask->delayExecMs);
return TSDB_CODE_SUCCESS;
}
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
...@@ -349,14 +412,10 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 ...@@ -349,14 +412,10 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
pTask->retryTimes = 0; pTask->retryTimes = 0;
} }
if (((pTask->execId + 1) >= pTask->maxExecTimes) || ((pTask->retryTimes + 1) > pTask->maxRetryTimes)) { SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL));
SCH_TASK_DLOG("task no more retry since reach max times %d:%d, execId %d", pTask->maxRetryTimes,
pTask->maxExecTimes, pTask->execId);
schHandleJobFailure(pJob, rspCode);
return TSDB_CODE_SUCCESS;
}
pTask->waitRetry = true; pTask->waitRetry = true;
schDropTaskOnExecNode(pJob, pTask); schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes); taosHashClear(pTask->execNodes);
schRemoveTaskFromExecList(pJob, pTask); schRemoveTaskFromExecList(pJob, pTask);
...@@ -368,8 +427,16 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 ...@@ -368,8 +427,16 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr)); memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
if (SCH_IS_DATA_BIND_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
if (pData) { if (pData && pData->pEpSet) {
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet)); SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
} else if (SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(rspCode)) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SCH_SWITCH_EPSET(addr);
SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
} else {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps, pEp->fqdn, pEp->port);
} }
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
...@@ -380,7 +447,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 ...@@ -380,7 +447,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
SCH_ERR_JRET(schLaunchTask(pJob, pTask)); SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -428,28 +495,24 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 ...@@ -428,28 +495,24 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC); schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC);
} }
if (SCH_IS_DATA_BIND_TASK(pTask)) { if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) {
if (NULL == pData->pEpSet) { if (NULL == pData->pEpSet) {
SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode)); SCH_TASK_ELOG("epset updating excepted, error:%s", tstrerror(rspCode));
code = rspCode; code = TSDB_CODE_INVALID_MSG;
goto _return; goto _return;
} }
} }
code = schDoTaskRedirect(pJob, pTask, pData, rspCode); code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
taosMemoryFree(pData->pData); taosMemoryFreeClear(pData->pData);
taosMemoryFree(pData->pEpSet); taosMemoryFreeClear(pData->pEpSet);
pData->pData = NULL;
pData->pEpSet = NULL;
SCH_RET(code); SCH_RET(code);
_return: _return:
taosMemoryFree(pData->pData); taosMemoryFreeClear(pData->pData);
taosMemoryFree(pData->pEpSet); taosMemoryFreeClear(pData->pEpSet);
pData->pData = NULL;
pData->pEpSet = NULL;
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
} }
...@@ -715,10 +778,10 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSe ...@@ -715,10 +778,10 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSe
SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0); SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
SEp *pOld = &pAddr->epSet.eps[pAddr->epSet.inUse]; char *origEpset = schDumpEpSet(&pAddr->epSet);
SEp *pNew = &pEpSet->eps[pEpSet->inUse]; char *newEpset = schDumpEpSet(pEpSet);
SCH_TASK_DLOG("update task ep from %s:%d to %s:%d", pOld->fqdn, pOld->port, pNew->fqdn, pNew->port); SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset);
memcpy(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet)); memcpy(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));
...@@ -1078,6 +1141,51 @@ _return: ...@@ -1078,6 +1141,51 @@ _return:
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
} }
void schHandleTimerEvent(void *param, void *tmrId) {
SSchTimerParam *pTimerParam = (SSchTimerParam *)param;
SSchTask *pTask = NULL;
SSchJob *pJob = NULL;
int32_t code = 0;
if (schProcessOnCbBegin(&pJob, &pTask, pTimerParam->queryId, pTimerParam->rId, pTimerParam->taskId)) {
return;
}
code = schLaunchTask(pJob, pTask);
schProcessOnCbEnd(pJob, pTask, code);
}
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
if (pTask->delayExecMs > 0) {
SSchTimerParam *param = taosMemoryMalloc(sizeof(SSchTimerParam));
if (NULL == param) {
SCH_TASK_ELOG("taosMemoryMalloc %d failed", sizeof(SSchTimerParam));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
param->rId = pJob->refId;
param->queryId = pJob->queryId;
param->taskId = pTask->taskId;
if (NULL == pTask->delayTimer) {
pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer);
if (NULL == pTask->delayTimer) {
SCH_TASK_ELOG("start delay timer failed, handle:%p", schMgmt.timer);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
return TSDB_CODE_SUCCESS;
}
taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void*)param, schMgmt.timer, &pTask->delayTimer);
return TSDB_CODE_SUCCESS;
}
SCH_RET(schLaunchTask(pJob, pTask));
}
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) { int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level)); SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level));
...@@ -1099,7 +1207,12 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { ...@@ -1099,7 +1207,12 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
while (pIter) { while (pIter) {
SSchTask *pTask = *(SSchTask **)pIter; SSchTask *pTask = *(SSchTask **)pIter;
SCH_LOCK_TASK(pTask);
if (pTask->delayTimer) {
taosTmrStopA(&pTask->delayTimer);
}
schDropTaskOnExecNode(pJob, pTask); schDropTaskOnExecNode(pJob, pTask);
SCH_UNLOCK_TASK(pTask);
pIter = taosHashIterate(list, pIter); pIter = taosHashIterate(list, pIter);
} }
......
...@@ -36,6 +36,27 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) { ...@@ -36,6 +36,27 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) {
return taosReleaseRef(schMgmt.jobRef, refId); return taosReleaseRef(schMgmt.jobRef, refId);
} }
char *schDumpEpSet(SEpSet *pEpSet) {
if (NULL == pEpSet) {
return NULL;
}
int32_t maxSize = 1024;
char *str = taosMemoryMalloc(maxSize);
if (NULL == str) {
return NULL;
}
int32_t n = 0;
n += snprintf(str + n, maxSize - n, "numOfEps:%d, inUse:%d eps:", pEpSet->numOfEps, pEpSet->inUse);
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
SEp *pEp = &pEpSet->eps[i];
n += snprintf(str + n, maxSize - n, "[%s:%d]", pEp->fqdn, pEp->port);
}
return str;
}
char *schGetOpStr(SCH_OP_TYPE type) { char *schGetOpStr(SCH_OP_TYPE type) {
switch (type) { switch (type) {
case SCH_OP_NULL: case SCH_OP_NULL:
......
...@@ -48,6 +48,12 @@ int32_t schedulerInit() { ...@@ -48,6 +48,12 @@ int32_t schedulerInit() {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
schMgmt.timer = taosTmrInit(0, 0, 0, "scheduler");
if (NULL == schMgmt.timer) {
qError("init timer failed, error:%s", tstrerror(terrno));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (taosGetSystemUUID((char *)&schMgmt.sId, sizeof(schMgmt.sId))) { if (taosGetSystemUUID((char *)&schMgmt.sId, sizeof(schMgmt.sId))) {
qError("generate schdulerId failed, errno:%d", errno); qError("generate schdulerId failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR); SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR);
......
...@@ -95,6 +95,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error") ...@@ -95,6 +95,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk") TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk")
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found") TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space") TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space")
TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout")
//client //client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册