diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 70ac7a630460a2917f11b93984e4c1434567e6ef..e6973cd390c10ff524f70549d161090582ee56ab 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -25,11 +25,6 @@ extern "C" { extern tsem_t schdRspSem; -typedef struct SSchedulerCfg { - uint32_t maxJobNum; - int32_t maxNodeTableNum; -} SSchedulerCfg; - typedef struct SQueryProfileSummary { int64_t startTs; // Object created and added into the message queue int64_t endTs; // the timestamp when the task is completed @@ -84,7 +79,7 @@ typedef struct SSchedulerReq { } SSchedulerReq; -int32_t schedulerInit(SSchedulerCfg *cfg); +int32_t schedulerInit(void); int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob); @@ -96,6 +91,8 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub); void schedulerStopQueryHb(void *pTrans); +int32_t schedulerUpdatePolicy(int32_t policy); +int32_t schedulerEnableReSchedule(bool enableResche); /** * Cancel query job diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 9e67dc6571aae2401bda0a18f348a602dad1790d..da9d63fdfd6cd9cd6879130a20838a009c11e1de 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -363,8 +363,7 @@ void taos_init_imp(void) { SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100}; catalogInit(&cfg); - SSchedulerCfg scfg = {.maxJobNum = 100}; - schedulerInit(&scfg); + schedulerInit(); tscDebug("starting to initialize TAOS driver"); taosSetCoreDump(true); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 04d6df3be01a707c55369c46059c98152a7dc2cc..5e620d106045f341af6ed1dde9653abd1da9ea4e 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -834,6 +834,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); pRequest->prevCode = code; + schedulerFreeJob(&pRequest->body.queryJob, 0); doAsyncQuery(pRequest, true); return; } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 6ab16b722e79dcdabc26cdf7c0bad25c6d3e6326..62052457fd61d8e0f408ef9c0d5cb0b5c6cc702d 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -131,6 +131,7 @@ void taos_close(TAOS *taos) { STscObj *pObj = acquireTscObj(*(int64_t *)taos); if (NULL == pObj) { + taosMemoryFree(taos); return; } diff --git a/source/libs/command/CMakeLists.txt b/source/libs/command/CMakeLists.txt index 51118f4a34be578a62ae9ec4de0f02cb7fbabb6b..a890972d149531d01620cebc9f0ca2db0166fd38 100644 --- a/source/libs/command/CMakeLists.txt +++ b/source/libs/command/CMakeLists.txt @@ -8,9 +8,9 @@ target_include_directories( target_link_libraries( command - PRIVATE os util nodes catalog function transport qcom + PRIVATE os util nodes catalog function transport qcom scheduler ) if(${BUILD_TEST}) ADD_SUBDIRECTORY(test) -endif(${BUILD_TEST}) \ No newline at end of file +endif(${BUILD_TEST}) diff --git a/source/libs/command/inc/commandInt.h b/source/libs/command/inc/commandInt.h index 6aca581f45d186e76ab2c4b38a86f2851dd6810e..7012c889e9d216433342d785d207a5ecfc7d129b 100644 --- a/source/libs/command/inc/commandInt.h +++ b/source/libs/command/inc/commandInt.h @@ -77,6 +77,10 @@ extern "C" { #define EXPLAIN_MODE_FORMAT "mode=%s" #define EXPLAIN_STRING_TYPE_FORMAT "%s" +#define COMMAND_RESET_LOG "resetLog" +#define COMMAND_SCHEDULE_POLICY "schedulePolicy" +#define COMMAND_ENABLE_RESCHEDULE "enableReSchedule" + typedef struct SExplainGroup { int32_t nodeNum; int32_t physiPlanExecNum; diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 034778e5bfd4f527ca4782d7e91015244db8b012..d22b3d88b4f33c1401e29a05726f92086c21f633 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -17,6 +17,8 @@ #include "catalog.h" #include "tdatablock.h" #include "tglobal.h" +#include "commandInt.h" +#include "scheduler.h" extern SConfig* tsCfg; @@ -479,7 +481,42 @@ static int32_t execShowCreateSTable(SShowCreateTableStmt* pStmt, SRetrieveTableR return execShowCreateTable(pStmt, pRsp); } +static int32_t execAlterCmd(char* cmd, char* value, bool* processed) { + int32_t code = 0; + + if (0 == strcasecmp(cmd, COMMAND_RESET_LOG)) { + taosResetLog(); + cfgDumpCfg(tsCfg, 0, false); + } else if (0 == strcasecmp(cmd, COMMAND_SCHEDULE_POLICY)) { + code = schedulerUpdatePolicy(atoi(value)); + } else if (0 == strcasecmp(cmd, COMMAND_ENABLE_RESCHEDULE)) { + code = schedulerEnableReSchedule(atoi(value)); + } else { + goto _return; + } + + *processed = true; + +_return: + + if (code) { + terrno = code; + } + + return code; +} + static int32_t execAlterLocal(SAlterLocalStmt* pStmt) { + bool processed = false; + + if (execAlterCmd(pStmt->config, pStmt->value, &processed)) { + return terrno; + } + + if (processed) { + goto _return; + } + if (cfgSetItem(tsCfg, pStmt->config, pStmt->value, CFG_STYPE_ALTER_CMD)) { return terrno; } @@ -488,6 +525,8 @@ static int32_t execAlterLocal(SAlterLocalStmt* pStmt) { return terrno; } +_return: + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index b00b08a66d4835f0a01fdde843582910b0ac8f3d..7265e7ee7802081b37c04dbc774532ea56e73883 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -388,6 +388,11 @@ static void destroyDataSinkNode(SDataSinkNode* pNode) { nodesDestroyNode((SNode* static void destroyExprNode(SExprNode* pExpr) { taosArrayDestroy(pExpr->pAssociation); } +static void nodesDestroyNodePointer(void* node) { + SNode* pNode = *(SNode**)node; + nodesDestroyNode(pNode); +} + void nodesDestroyNode(SNode* pNode) { if (NULL == pNode) { return; @@ -718,6 +723,7 @@ void nodesDestroyNode(SNode* pNode) { } taosArrayDestroy(pQuery->pDbList); taosArrayDestroy(pQuery->pTableList); + taosArrayDestroyEx(pQuery->pPlaceholderValues, nodesDestroyNodePointer); break; } case QUERY_NODE_LOGIC_PLAN_SCAN: { diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 539643c3907cc20de3bee586d21135fbe8adb748..d8d7c5a0ea82543118e9570c3ce9358d01ff49bf 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -378,6 +378,7 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt); int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore); int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet); int32_t qwAddTaskCtx(QW_FPARAMS_DEF); +int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx); #ifdef __cplusplus diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index 704cd3142845c07976f1d8a4b8a798bc99e775d3..acb7004a510cb171562cbe94775f5149fabc7847 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -24,7 +24,7 @@ extern "C" { #include "dataSinkMgt.h" int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF); -int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); +int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql); int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg); diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 869eedf8f6e4970f548300b33b10c8bba246d92e..fa63cf2c3ae16d424ac702b7ef47fadadcc14235 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -9,7 +9,7 @@ #include "tmsg.h" #include "tname.h" -SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .tmp = true}; +SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .tmp = false}; int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { if (!gQWDebug.statusEnable) { @@ -147,9 +147,9 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int return TSDB_CODE_SUCCESS; } -int32_t qwDbgResponseREdirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) { +int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) { if (gQWDebug.tmp) { - if (TDMT_SCH_QUERY == qwMsg->msgType) { + if (TDMT_SCH_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) { SEpSet epSet = {0}; epSet.inUse = 1; epSet.numOfEps = 3; @@ -159,16 +159,15 @@ int32_t qwDbgResponseREdirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) { epSet.eps[1].port = 7200; strcpy(epSet.eps[2].fqdn, "localhost"); epSet.eps[2].port = 7300; - + + ctx->phase = QW_PHASE_POST_QUERY; qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet); - gQWDebug.tmp = false; return TSDB_CODE_SUCCESS; } - if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType) { + if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) { ctx->phase = QW_PHASE_POST_QUERY; qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL); - gQWDebug.tmp = false; return TSDB_CODE_SUCCESS; } } diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 73110472f7e35e71ee4bf478c422664212e9660c..93268e1bccd35640660f911f07c8066effbd07bf 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -315,10 +315,10 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { int64_t rId = msg->refId; int32_t eId = msg->execId; - SQWMsg qwMsg = {.msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info}; + SQWMsg qwMsg = {.msgType = pMsg->msgType, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info}; QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p", pMsg->info.handle); - QW_ERR_RET(qwPrerocessQuery(QW_FPARAMS(), &qwMsg)); + QW_ERR_RET(qwPreprocessQuery(QW_FPARAMS(), &qwMsg)); QW_SCH_TASK_DLOG("prerocessQuery end, handle:%p", pMsg->info.handle); return TSDB_CODE_SUCCESS; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 3e8ced318c4baa2f1eec3aee85db03c6c67547ba..1b58dc28244fce3971392ecdcda66aec4d4ed876 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -469,7 +469,7 @@ int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) { } -int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { +int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; bool queryRsped = false; SSubplan *plan = NULL; @@ -488,6 +488,8 @@ int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); + qwDbgResponseRedirect(qwMsg, ctx); + _return: if (ctx) { diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index b6b17cb1066e738e360bd7a8cd2d80c5b0d7def3..bc0270635d237df97b88823c0ea7a6badf2493d8 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -28,15 +28,6 @@ extern "C" { #include "trpc.h" #include "command.h" -#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000 -#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000 -#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT - -#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000 -#define SCH_MAX_TASK_TIMEOUT_USEC 60000000 - -#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA - enum { SCH_READ = 1, SCH_WRITE, @@ -54,6 +45,24 @@ typedef enum { SCH_OP_GET_STATUS, } SCH_OP_TYPE; +typedef enum { + SCH_LOAD_SEQ = 1, + SCH_RANDOM, + SCH_ALL, +} SCH_POLICY; + +#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000 +#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000 +#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT +#define SCHEDULE_DEFAULT_POLICY SCH_LOAD_SEQ + +#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000 +#define SCH_MAX_TASK_TIMEOUT_USEC 60000000 +#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA + + + + typedef struct SSchDebug { bool lockEnable; bool apiEnable; @@ -126,6 +135,13 @@ typedef struct SSchStatusFps { schStatusEventFp eventFp; } SSchStatusFps; +typedef struct SSchedulerCfg { + uint32_t maxJobNum; + int32_t maxNodeTableNum; + SCH_POLICY schPolicy; + bool enableReSchedule; +} SSchedulerCfg; + typedef struct SSchedulerMgmt { uint64_t taskId; // sequential taksId uint64_t sId; // schedulerId @@ -184,34 +200,36 @@ typedef struct SSchLevel { typedef struct SSchTaskProfile { int64_t startTs; - int64_t* execTime; + SArray* execTime; int64_t waitTime; int64_t endTs; } SSchTaskProfile; typedef struct SSchTask { - uint64_t taskId; // task id - SRWLatch lock; // task reentrant lock - int32_t maxExecTimes; // task may exec times - int32_t execId; // task current execute try index - SSchLevel *level; // level - SRWLatch planLock; // task update plan lock - SSubplan *plan; // subplan - char *msg; // operator tree - int32_t msgLen; // msg length - int8_t status; // task status - int32_t lastMsgType; // last sent msg type - int64_t timeoutUsec; // taks timeout useconds before reschedule - SQueryNodeAddr succeedAddr; // task executed success node address - int8_t candidateIdx; // current try condidation index - SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr - SHashObj *execNodes; // all tried node for current task, element is SSchNodeInfo - SSchTaskProfile profile; // task execution profile - int32_t childReady; // child task ready number - SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* - SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* - void* handle; // task send handle - bool registerdHb; // registered in hb + uint64_t taskId; // task id + SRWLatch lock; // task reentrant lock + int32_t maxExecTimes; // task max exec times + int32_t maxRetryTimes; // task max retry times + int32_t retryTimes; // task retry times + int32_t execId; // task current execute index + SSchLevel *level; // level + SRWLatch planLock; // task update plan lock + SSubplan *plan; // subplan + char *msg; // operator tree + int32_t msgLen; // msg length + int8_t status; // task status + int32_t lastMsgType; // last sent msg type + int64_t timeoutUsec; // task timeout useconds before reschedule + SQueryNodeAddr succeedAddr; // task executed success node address + int8_t candidateIdx; // current try condidation index + SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr + SHashObj *execNodes; // all tried node for current task, element is SSchNodeInfo + SSchTaskProfile profile; // task execution profile + int32_t childReady; // child task ready number + SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* + SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* + void* handle; // task send handle + bool registerdHb; // registered in hb } SSchTask; typedef struct SSchJobAttr { @@ -265,7 +283,7 @@ typedef struct SSchJob { extern SSchedulerMgmt schMgmt; -#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.execTime[(_task)->execId % (_task)->maxExecTimes]) > (_task)->timeoutUsec) +#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - *(int64_t*)taosArrayGet((_task)->profile.execTime, (_task)->execId)) > (_task)->timeoutUsec) #define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children)) @@ -299,7 +317,6 @@ extern SSchedulerMgmt schMgmt; #define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) #define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) #define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY) -#define SCH_TASK_MAX_EXEC_TIMES(_levelIdx, _levelNum) (SCH_MAX_CANDIDATE_EP_NUM * ((_levelNum) - (_levelIdx))) #define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) @@ -321,8 +338,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_LOG_TASK_START_TS(_task) \ do { \ int64_t us = taosGetTimestampUs(); \ - int32_t idx = (_task)->execId % (_task)->maxExecTimes; \ - (_task)->profile.execTime[idx] = us; \ + taosArrayPush((_task)->profile.execTime, &us); \ if (0 == (_task)->execId) { \ (_task)->profile.startTs = us; \ } \ @@ -331,8 +347,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_LOG_TASK_WAIT_TS(_task) \ do { \ int64_t us = taosGetTimestampUs(); \ - int32_t idx = (_task)->execId % (_task)->maxExecTimes; \ - (_task)->profile.waitTime += us - (_task)->profile.execTime[idx]; \ + (_task)->profile.waitTime += us - *(int64_t*)taosArrayGet((_task)->profile.execTime, (_task)->execId); \ } while (0) @@ -340,7 +355,8 @@ extern SSchedulerMgmt schMgmt; do { \ int64_t us = taosGetTimestampUs(); \ int32_t idx = (_task)->execId % (_task)->maxExecTimes; \ - (_task)->profile.execTime[idx] = us - (_task)->profile.execTime[idx]; \ + int64_t *startts = taosArrayGet((_task)->profile.execTime, (_task)->execId); \ + *startts = us - *startts; \ (_task)->profile.endTs = us; \ } while (0) @@ -471,9 +487,11 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask); void schDropTaskInHashList(SSchJob *pJob, SHashObj *list); int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level); int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask); -int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum); +int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel); int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask); void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode); +int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode); +int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode); bool schChkCurrentOp(SSchJob *pJob, int32_t op, bool sync); extern SSchDebug gSCHDebug; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 19bb93249f50d22bb535e25654fc93bbab11a981..1b1268baf1bb58e9fcf51ccef36dac8afe0f6a53 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -343,7 +343,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_ERR_JRET(schInitTask(pJob, pTask, plan, pLevel, levelNum)); + SCH_ERR_JRET(schInitTask(pJob, pTask, plan, pLevel)); SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask)); @@ -476,7 +476,7 @@ _return: SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); } -int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) { +int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { schUpdateJobErrCode(pJob, errCode); int32_t code = atomic_load_32(&pJob->errCode); @@ -489,21 +489,29 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR); } -// Note: no more task error processing, handled in function internal -int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { +int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode) { if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) { return TSDB_CODE_SCH_IGNORE_ERROR; } - schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAIL, errCode); + schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, &errCode); return TSDB_CODE_SCH_IGNORE_ERROR; } -// Note: no more error processing, handled in function internal int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { - SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_DROP, errCode)); + SCH_RET(schProcessOnJobFailure(pJob, errCode)); +} + +int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) { + if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) { + return TSDB_CODE_SCH_IGNORE_ERROR; + } + + schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, &errCode); + return TSDB_CODE_SCH_IGNORE_ERROR; } + int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_EXEC); @@ -828,7 +836,7 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int } if (errCode) { - schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void*)&errCode); + schHandleJobFailure(pJob, errCode); } SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode)); @@ -907,7 +915,7 @@ void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { } if (errCode) { - schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void*)&errCode); + schHandleJobFailure(pJob, errCode); } if (pJob) { diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 236257666b23b93d174f8f1126497f061dfd90c9..9483ecd6ebfbfd17d301280eed0191b6c26b6255 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -42,32 +42,47 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) { taosHashCleanup(pTask->execNodes); } - taosMemoryFree(pTask->profile.execTime); + taosArrayDestroy(pTask->profile.execTime); } -int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum) { +void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) { + if (SCH_IS_DATA_BIND_TASK(pTask) || (!SCH_IS_QUERY_JOB(pJob)) || (SCH_ALL != schMgmt.cfg.schPolicy)) { + pTask->maxRetryTimes = SCH_MAX_CANDIDATE_EP_NUM; + } else { + int32_t nodeNum = taosArrayGetSize(pJob->nodeList); + pTask->maxRetryTimes = TMAX(nodeNum, SCH_MAX_CANDIDATE_EP_NUM); + } + + pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1); +} + +int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) { int32_t code = 0; pTask->plan = pPlan; pTask->level = pLevel; pTask->execId = -1; - pTask->maxExecTimes = SCH_TASK_MAX_EXEC_TIMES(pLevel->level, levelNum); pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC; pTask->taskId = schGenTaskId(); pTask->execNodes = taosHashInit(SCH_MAX_CANDIDATE_EP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - pTask->profile.execTime = taosMemoryCalloc(pTask->maxExecTimes, sizeof(int64_t)); + + schInitTaskRetryTimes(pJob, pTask, pLevel); + + pTask->profile.execTime = taosArrayInit(pTask->maxExecTimes, sizeof(int64_t)); if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) { SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); + SCH_TASK_DLOG("task initialized, max times %d:%d", pTask->maxRetryTimes, pTask->maxExecTimes); + return TSDB_CODE_SUCCESS; _return: - taosMemoryFreeClear(pTask->profile.execTime); + taosArrayDestroy(pTask->profile.execTime); taosHashCleanup(pTask->execNodes); SCH_RET(code); @@ -105,7 +120,7 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_ } if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) { - SCH_TASK_ELOG("fail to remove execId %d from execNodeList", execId); + SCH_TASK_DLOG("execId %d already not in execNodeList", execId); } else { SCH_TASK_DLOG("execId %d removed from execNodeList", execId); } @@ -235,7 +250,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { } if (pTask->level->taskFailed > 0) { - SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, NULL)); + SCH_RET(schHandleJobFailure(pJob, pJob->errCode)); } else { SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL)); } @@ -285,6 +300,10 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { } int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { + if (!schMgmt.cfg.enableReSchedule) { + return TSDB_CODE_SUCCESS; + } + if (SCH_IS_DATA_BIND_TASK(pTask)) { return TSDB_CODE_SUCCESS; } @@ -304,13 +323,17 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { int32_t code = 0; - if ((pTask->execId + 1) >= pTask->maxExecTimes) { - SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId); - schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void *)&rspCode); - return TSDB_CODE_SUCCESS; + SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); + + if (NULL == pData) { + pTask->retryTimes = 0; } - SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); + if (((pTask->execId + 1) >= pTask->maxExecTimes) || ((pTask->retryTimes + 1) > pTask->maxRetryTimes)) { + SCH_TASK_DLOG("task no more retry since reach max times %d:%d, execId %d", pTask->maxRetryTimes, pTask->maxExecTimes, pTask->execId); + schHandleJobFailure(pJob, rspCode); + return TSDB_CODE_SUCCESS; + } schDropTaskOnExecNode(pJob, pTask); taosHashClear(pTask->execNodes); @@ -493,9 +516,15 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo } } + if ((pTask->retryTimes + 1) > pTask->maxRetryTimes) { + *needRetry = false; + SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d", pTask->retryTimes, pTask->maxRetryTimes); + return TSDB_CODE_SUCCESS; + } + if ((pTask->execId + 1) >= pTask->maxExecTimes) { *needRetry = false; - SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId); + SCH_TASK_DLOG("task no more retry since reach max exec times, execId:%d/%d", pTask->execId, pTask->maxExecTimes); return TSDB_CODE_SUCCESS; } @@ -649,10 +678,31 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSe int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) { int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); - if (++pTask->candidateIdx >= candidateNum) { - pTask->candidateIdx = 0; + if (candidateNum <= 1) { + goto _return; + } + + switch (schMgmt.cfg.schPolicy) { + case SCH_LOAD_SEQ: + case SCH_ALL: + default: + if (++pTask->candidateIdx >= candidateNum) { + pTask->candidateIdx = 0; + } + break; + case SCH_RANDOM: { + int32_t lastIdx = pTask->candidateIdx; + while (lastIdx == pTask->candidateIdx) { + pTask->candidateIdx = taosRand() % candidateNum; + } + break; + } } - SCH_TASK_DLOG("switch task candiateIdx to %d", pTask->candidateIdx); + +_return: + + SCH_TASK_DLOG("switch task candiateIdx to %d/%d", pTask->candidateIdx, candidateNum); + return TSDB_CODE_SUCCESS; } @@ -739,8 +789,9 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); pTask->execId++; + pTask->retryTimes++; - SCH_TASK_DLOG("start to launch task's %dth exec", pTask->execId); + SCH_TASK_DLOG("start to launch task, execId %d, retry %d", pTask->execId, pTask->retryTimes); SCH_LOG_TASK_START_TS(pTask); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 39465f3064e9b117ca97c345223b17ce83e95a3d..3a15523040f7c0c108273563c17f8000c43ff582 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -22,26 +22,19 @@ SSchedulerMgmt schMgmt = { .jobRef = -1, }; -int32_t schedulerInit(SSchedulerCfg *cfg) { +int32_t schedulerInit() { if (schMgmt.jobRef >= 0) { qError("scheduler already initialized"); return TSDB_CODE_QRY_INVALID_INPUT; } - if (cfg) { - schMgmt.cfg = *cfg; - - if (schMgmt.cfg.maxJobNum == 0) { - schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM; - } - if (schMgmt.cfg.maxNodeTableNum <= 0) { - schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM; - } - } else { - schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM; - schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM; - } + schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM; + schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM; + schMgmt.cfg.schPolicy = SCHEDULE_DEFAULT_POLICY; + schMgmt.cfg.enableReSchedule = true; + qDebug("schedule policy init to %d", schMgmt.cfg.schPolicy); + schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl); if (schMgmt.jobRef < 0) { qError("init schduler jobRef failed, num:%u", schMgmt.cfg.maxJobNum); @@ -130,6 +123,26 @@ void schedulerStopQueryHb(void *pTrans) { schCleanClusterHb(pTrans); } +int32_t schedulerUpdatePolicy(int32_t policy) { + switch (policy) { + case SCH_LOAD_SEQ: + case SCH_RANDOM: + case SCH_ALL: + schMgmt.cfg.schPolicy = policy; + qDebug("schedule policy updated to %d", schMgmt.cfg.schPolicy); + break; + default: + return TSDB_CODE_TSC_INVALID_INPUT; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t schedulerEnableReSchedule(bool enableResche) { + schMgmt.cfg.enableReSchedule = enableResche; + return TSDB_CODE_SUCCESS; +} + void schedulerFreeJob(int64_t* jobId, int32_t errCode) { if (0 == *jobId) { return; @@ -141,7 +154,7 @@ void schedulerFreeJob(int64_t* jobId, int32_t errCode) { return; } - schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, (void*)&errCode); + schHandleJobDrop(pJob, errCode); schReleaseJob(*jobId); *jobId = 0; diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index d6b1baf978bdc823b3337e80830ec89f5de4e790..ca2122ed8f433c4b8dd70eb120e0eba972dda9fe 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -477,7 +477,7 @@ void* schtRunJobThread(void *aa) { schtInitLogFile(); - int32_t code = schedulerInit(NULL); + int32_t code = schedulerInit(); assert(code == 0); @@ -649,7 +649,7 @@ TEST(queryTest, normalCase) { qnodeAddr.port = 6031; taosArrayPush(qnodeList, &qnodeAddr); - int32_t code = schedulerInit(NULL); + int32_t code = schedulerInit(); ASSERT_EQ(code, 0); schtBuildQueryDag(&dag); @@ -756,7 +756,7 @@ TEST(queryTest, readyFirstCase) { qnodeAddr.port = 6031; taosArrayPush(qnodeList, &qnodeAddr); - int32_t code = schedulerInit(NULL); + int32_t code = schedulerInit(); ASSERT_EQ(code, 0); schtBuildQueryDag(&dag); @@ -866,7 +866,7 @@ TEST(queryTest, flowCtrlCase) { qnodeAddr.port = 6031; taosArrayPush(qnodeList, &qnodeAddr); - int32_t code = schedulerInit(NULL); + int32_t code = schedulerInit(); ASSERT_EQ(code, 0); schtBuildQueryFlowCtrlDag(&dag); @@ -975,7 +975,7 @@ TEST(insertTest, normalCase) { qnodeAddr.port = 6031; taosArrayPush(qnodeList, &qnodeAddr); - int32_t code = schedulerInit(NULL); + int32_t code = schedulerInit(); ASSERT_EQ(code, 0); schtBuildInsertDag(&dag); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 2364c53a9abd9c15a858e912eeb856d44e25474d..95334b553579b9f8f45b6c152479207e71317754 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -135,7 +135,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_API_ERROR, "Stmt API usage error" TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_TBNAME_ERROR, "Stmt table name not set") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CLAUSE_ERROR, "not supported stmt clause") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_QUERY_KILLED, "Query killed") -TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_EXEC_NODE, "No available execution node") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_EXEC_NODE, "No available execution node in current query policy configuration") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NOT_STABLE_ERROR, "Table is not a super table") // mnode-common diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index b31c39718ca9addbf4e8e2980d57665bb3d19156..29c1fdb015689db8c2bd700adde4112dafe48e0b 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -2685,6 +2685,8 @@ int main(int argc, char *argv[]) runAll(taos); + taos_close(taos); + return 0; }