diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index cf5d17cd743aae6f36e7128e067fa66d788bbbb1..c1516969ff167777e7cf83e81e9fcdbc5c06bb9c 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -192,6 +192,7 @@ typedef struct SSubplan { int32_t msgType; // message type for subplan, used to denote the send message type to vnode. int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner. SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node + SQueryNodeStat execNodeStat; // only for scan subplan SNodeList* pChildren; // the datasource subplan,from which to fetch the result SNodeList* pParents; // the data destination subplan, get data from current subplan SPhysiNode* pNode; // physical plan of current subplan diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 6d3f97fc4e3e0d0f335af3cc269713309f739e35..ad1f988f4c28f030037846e1217a5d127418d781 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -35,7 +35,6 @@ enum { JOB_TASK_STATUS_CANCELLING, JOB_TASK_STATUS_CANCELLED, JOB_TASK_STATUS_DROPPING, - JOB_TASK_STATUS_FREEING, }; enum { @@ -133,6 +132,10 @@ typedef struct SQueryNodeAddr { SEpSet epset; } SQueryNodeAddr; +typedef struct SQueryNodeStat { + double tableNum; //table number in million +} SQueryNodeStat; + int32_t initTaskQueue(); int32_t cleanupTaskQueue(); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 518cabb636eaaea56a4ee8d4a548489069640c75..4f0f0b49530f868fc61b1a0c6b57d1965cd5263c 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -25,6 +25,7 @@ extern "C" { typedef struct SSchedulerCfg { uint32_t maxJobNum; + double maxNodeTableNum; } SSchedulerCfg; typedef struct SQueryProfileSummary { diff --git a/source/libs/parser/src/astCreateFuncs.c b/source/libs/parser/src/astCreateFuncs.c index a531f3de71990ad68673a754b1ad79b4b9e45331..a9e10da143a548fb3ab90590cbce21eba53c6882 100644 --- a/source/libs/parser/src/astCreateFuncs.c +++ b/source/libs/parser/src/astCreateFuncs.c @@ -44,7 +44,7 @@ static SDatabaseOptions* setDbBlocks(SAstCreateContext* pCxt, SDatabaseOptions* int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_TOTAL_BLOCKS || val > TSDB_MAX_TOTAL_BLOCKS) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option totalBlocks: %d valid range: [%d, %d]", val, TSDB_MIN_TOTAL_BLOCKS, TSDB_MAX_TOTAL_BLOCKS); + "invalid db option totalBlocks: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_TOTAL_BLOCKS, TSDB_MAX_TOTAL_BLOCKS); pCxt->valid = false; return pOptions; } @@ -56,7 +56,7 @@ static SDatabaseOptions* setDbCache(SAstCreateContext* pCxt, SDatabaseOptions* p int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_CACHE_BLOCK_SIZE || val > TSDB_MAX_CACHE_BLOCK_SIZE) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option cacheBlockSize: %d valid range: [%d, %d]", val, TSDB_MIN_CACHE_BLOCK_SIZE, TSDB_MAX_CACHE_BLOCK_SIZE); + "invalid db option cacheBlockSize: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_CACHE_BLOCK_SIZE, TSDB_MAX_CACHE_BLOCK_SIZE); pCxt->valid = false; return pOptions; } @@ -68,7 +68,7 @@ static SDatabaseOptions* setDbCacheLast(SAstCreateContext* pCxt, SDatabaseOption int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_DB_CACHE_LAST_ROW || val > TSDB_MAX_DB_CACHE_LAST_ROW) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option cacheLast: %d valid range: [%d, %d]", val, TSDB_MIN_DB_CACHE_LAST_ROW, TSDB_MAX_DB_CACHE_LAST_ROW); + "invalid db option cacheLast: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_DB_CACHE_LAST_ROW, TSDB_MAX_DB_CACHE_LAST_ROW); pCxt->valid = false; return pOptions; } @@ -80,7 +80,7 @@ static SDatabaseOptions* setDbComp(SAstCreateContext* pCxt, SDatabaseOptions* pO int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_COMP_LEVEL || val > TSDB_MAX_COMP_LEVEL) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option compression: %d valid range: [%d, %d]", val, TSDB_MIN_COMP_LEVEL, TSDB_MAX_COMP_LEVEL); + "invalid db option compression: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_COMP_LEVEL, TSDB_MAX_COMP_LEVEL); pCxt->valid = false; return pOptions; } @@ -92,7 +92,7 @@ static SDatabaseOptions* setDbDays(SAstCreateContext* pCxt, SDatabaseOptions* pO int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_DAYS_PER_FILE || val > TSDB_MAX_DAYS_PER_FILE) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option daysPerFile: %d valid range: [%d, %d]", val, TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE); + "invalid db option daysPerFile: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE); pCxt->valid = false; return pOptions; } @@ -104,7 +104,7 @@ static SDatabaseOptions* setDbFsync(SAstCreateContext* pCxt, SDatabaseOptions* p int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_FSYNC_PERIOD || val > TSDB_MAX_FSYNC_PERIOD) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option fsyncPeriod: %d valid range: [%d, %d]", val, TSDB_MIN_FSYNC_PERIOD, TSDB_MAX_FSYNC_PERIOD); + "invalid db option fsyncPeriod: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_FSYNC_PERIOD, TSDB_MAX_FSYNC_PERIOD); pCxt->valid = false; return pOptions; } @@ -116,7 +116,7 @@ static SDatabaseOptions* setDbMaxRows(SAstCreateContext* pCxt, SDatabaseOptions* int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_MAX_ROW_FBLOCK || val > TSDB_MAX_MAX_ROW_FBLOCK) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option maxRowsPerBlock: %d valid range: [%d, %d]", val, TSDB_MIN_MAX_ROW_FBLOCK, TSDB_MAX_MAX_ROW_FBLOCK); + "invalid db option maxRowsPerBlock: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_MAX_ROW_FBLOCK, TSDB_MAX_MAX_ROW_FBLOCK); pCxt->valid = false; return pOptions; } @@ -128,7 +128,7 @@ static SDatabaseOptions* setDbMinRows(SAstCreateContext* pCxt, SDatabaseOptions* int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_MIN_ROW_FBLOCK || val > TSDB_MAX_MIN_ROW_FBLOCK) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option minRowsPerBlock: %d valid range: [%d, %d]", val, TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK); + "invalid db option minRowsPerBlock: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK); pCxt->valid = false; return pOptions; } @@ -140,7 +140,7 @@ static SDatabaseOptions* setDbKeep(SAstCreateContext* pCxt, SDatabaseOptions* pO int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_KEEP || val > TSDB_MAX_KEEP) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option keep: %d valid range: [%d, %d]", val, TSDB_MIN_KEEP, TSDB_MAX_KEEP); + "invalid db option keep: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_KEEP, TSDB_MAX_KEEP); pCxt->valid = false; return pOptions; } @@ -168,7 +168,7 @@ static SDatabaseOptions* setDbQuorum(SAstCreateContext* pCxt, SDatabaseOptions* int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_DB_QUORUM_OPTION || val > TSDB_MAX_DB_QUORUM_OPTION) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option quorum: %d valid range: [%d, %d]", val, TSDB_MIN_DB_QUORUM_OPTION, TSDB_MAX_DB_QUORUM_OPTION); + "invalid db option quorum: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_DB_QUORUM_OPTION, TSDB_MAX_DB_QUORUM_OPTION); pCxt->valid = false; return pOptions; } @@ -180,7 +180,7 @@ static SDatabaseOptions* setDbReplica(SAstCreateContext* pCxt, SDatabaseOptions* int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_DB_REPLICA_OPTION || val > TSDB_MAX_DB_REPLICA_OPTION) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option replications: %d valid range: [%d, %d]", val, TSDB_MIN_DB_REPLICA_OPTION, TSDB_MAX_DB_REPLICA_OPTION); + "invalid db option replications: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_DB_REPLICA_OPTION, TSDB_MAX_DB_REPLICA_OPTION); pCxt->valid = false; return pOptions; } @@ -192,7 +192,7 @@ static SDatabaseOptions* setDbTtl(SAstCreateContext* pCxt, SDatabaseOptions* pOp int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_DB_TTL_OPTION) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option ttl: %d, should be greater than or equal to %d", val, TSDB_MIN_DB_TTL_OPTION); + "invalid db option ttl: %" PRId64 ", should be greater than or equal to %d", val, TSDB_MIN_DB_TTL_OPTION); pCxt->valid = false; return pOptions; } @@ -203,7 +203,7 @@ static SDatabaseOptions* setDbTtl(SAstCreateContext* pCxt, SDatabaseOptions* pOp static SDatabaseOptions* setDbWal(SAstCreateContext* pCxt, SDatabaseOptions* pOptions, const SToken* pVal) { int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_WAL_LEVEL || val > TSDB_MAX_WAL_LEVEL) { - snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid db option walLevel: %d, only 1-2 allowed", val); + snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid db option walLevel: %" PRId64 ", only 1-2 allowed", val); pCxt->valid = false; return pOptions; } @@ -215,7 +215,7 @@ static SDatabaseOptions* setDbVgroups(SAstCreateContext* pCxt, SDatabaseOptions* int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_VNODES_PER_DB || val > TSDB_MAX_VNODES_PER_DB) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid db option vgroups: %d valid range: [%d, %d]", val, TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB); + "invalid db option vgroups: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB); pCxt->valid = false; return pOptions; } @@ -226,7 +226,7 @@ static SDatabaseOptions* setDbVgroups(SAstCreateContext* pCxt, SDatabaseOptions* static SDatabaseOptions* setDbSingleStable(SAstCreateContext* pCxt, SDatabaseOptions* pOptions, const SToken* pVal) { int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_DB_SINGLE_STABLE_OPTION || val > TSDB_MAX_DB_SINGLE_STABLE_OPTION) { - snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid db option singleStable: %d, only 0-1 allowed", val); + snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid db option singleStable: %" PRId64 ", only 0-1 allowed", val); pCxt->valid = false; return pOptions; } @@ -237,7 +237,7 @@ static SDatabaseOptions* setDbSingleStable(SAstCreateContext* pCxt, SDatabaseOpt static SDatabaseOptions* setDbStreamMode(SAstCreateContext* pCxt, SDatabaseOptions* pOptions, const SToken* pVal) { int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_DB_STREAM_MODE_OPTION || val > TSDB_MAX_DB_STREAM_MODE_OPTION) { - snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid db option streamMode: %d, only 0-1 allowed", val); + snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid db option streamMode: %" PRId64 ", only 0-1 allowed", val); pCxt->valid = false; return pOptions; } @@ -269,7 +269,7 @@ static STableOptions* setTableKeep(SAstCreateContext* pCxt, STableOptions* pOpti int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_KEEP || val > TSDB_MAX_KEEP) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid table option keep: %d valid range: [%d, %d]", val, TSDB_MIN_KEEP, TSDB_MAX_KEEP); + "invalid table option keep: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_KEEP, TSDB_MAX_KEEP); pCxt->valid = false; return pOptions; } @@ -281,7 +281,7 @@ static STableOptions* setTableTtl(SAstCreateContext* pCxt, STableOptions* pOptio int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_DB_TTL_OPTION) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid table option ttl: %d, should be greater than or equal to %d", val, TSDB_MIN_DB_TTL_OPTION); + "invalid table option ttl: %" PRId64 ", should be greater than or equal to %d", val, TSDB_MIN_DB_TTL_OPTION); pCxt->valid = false; return pOptions; } @@ -292,7 +292,7 @@ static STableOptions* setTableTtl(SAstCreateContext* pCxt, STableOptions* pOptio static STableOptions* setTableComment(SAstCreateContext* pCxt, STableOptions* pOptions, const SToken* pVal) { if (pVal->n >= sizeof(pOptions->comments)) { snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, - "invalid table option comment, length cannot exceed %d", sizeof(pOptions->comments) - 1); + "invalid table option comment, length cannot exceed %d", (int32_t)(sizeof(pOptions->comments) - 1)); pCxt->valid = false; return pOptions; } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index f9259fd645bb2bc79a81174efd32b03097597988..7813251bfb39ce794b3e4028137cb6d5ee416ea2 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -26,8 +26,9 @@ extern "C" { #include "scheduler.h" #include "thash.h" -#define SCHEDULE_DEFAULT_JOB_NUMBER 1000 -#define SCHEDULE_DEFAULT_TASK_NUMBER 1000 +#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000 +#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000 +#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 20.0 /* in million */ #define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA @@ -72,17 +73,26 @@ typedef struct SSchCallbackParam { uint64_t queryId; int64_t refId; uint64_t taskId; + SEpSet epSet; } SSchCallbackParam; +typedef struct SSchFlowControl { + SRWLatch lock; + double tableNumSum; + uint32_t execTaskNum; + SArray *taskList; // Element is SQueryTask* +} SSchFlowControl; + typedef struct SSchLevel { - int32_t level; - int8_t status; - SRWLatch lock; - int32_t taskFailed; - int32_t taskSucceed; - int32_t taskNum; - int32_t taskLaunchIdx; // launch startup index - SArray *subTasks; // Element is SQueryTask + int32_t level; + int8_t status; + SRWLatch lock; + int32_t taskFailed; + int32_t taskSucceed; + int32_t taskNum; + int32_t taskLaunchedNum; + SHashObj *flowCtrl; // key is ep, element is SSchFlowControl + SArray *subTasks; // Element is SQueryTask } SSchLevel; typedef struct SSchTask { @@ -102,13 +112,14 @@ typedef struct SSchTask { int32_t childReady; // child task ready number SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* - void* handle; // task send handle + void* handle; // task send handle } SSchTask; typedef struct SSchJobAttr { bool needFetch; bool syncSchedule; bool queryJob; + bool needFlowCtrl; } SSchJobAttr; typedef struct SSchJob { @@ -140,6 +151,8 @@ typedef struct SSchJob { SQueryProfileSummary summary; } SSchJob; +extern SSchedulerMgmt schMgmt; + #define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children)) #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) @@ -152,8 +165,17 @@ typedef struct SSchJob { #define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st) #define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status) -#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != SUBPLAN_TYPE_MODIFY) -#define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob) +#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true +#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) +#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEAF_TASK(_job, _task) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) + +#define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY) +#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) +#define SCH_JOB_NEED_FETCH(_job) SCH_IS_QUERY_JOB(_job) +#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) +#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum) +#define SCH_GET_CUR_EP(_addr) (&(_addr)->epset.eps[(_addr)->epset.inUse]) +#define SCH_SWITCH_EPSET(_addr) ((_addr)->epset.inUse = ((_addr)->epset.inUse + 1) % (_addr)->epset.numOfEps) #define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) #define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) @@ -173,10 +195,18 @@ typedef struct SSchJob { #define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) -static int32_t schLaunchTask(SSchJob *job, SSchTask *task); -static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType); +int32_t schLaunchTask(SSchJob *job, SSchTask *task); +int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType); SSchJob *schAcquireJob(int64_t refId); int32_t schReleaseJob(int64_t refId); +void schFreeFlowCtrl(SSchLevel *pLevel); +int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel); +int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask); +int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough); +int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask); +int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask); +int32_t schFetchFromRemote(SSchJob *pJob); + #ifdef __cplusplus } diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c new file mode 100644 index 0000000000000000000000000000000000000000..2b3b5e20aeb17c70eb7704209ba7261a1ec3fedb --- /dev/null +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -0,0 +1,216 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "schedulerInt.h" +#include "tmsg.h" +#include "query.h" +#include "catalog.h" +#include "tref.h" + +void schFreeFlowCtrl(SSchLevel *pLevel) { + if (NULL == pLevel->flowCtrl) { + return; + } + + SSchFlowControl *ctrl = NULL; + void *pIter = taosHashIterate(pLevel->flowCtrl, NULL); + while (pIter) { + ctrl = (SSchFlowControl *)pIter; + + if (ctrl->taskList) { + taosArrayDestroy(ctrl->taskList); + } + + pIter = taosHashIterate(pLevel->flowCtrl, pIter); + } + + taosHashCleanup(pLevel->flowCtrl); + pLevel->flowCtrl = NULL; +} + +int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { + if (!SCH_IS_QUERY_JOB(pJob)) { + return TSDB_CODE_SUCCESS; + } + + double sum = 0; + + for (int32_t i = 0; i < pLevel->taskNum; ++i) { + SSchTask *pTask = taosArrayGet(pLevel->subTasks, i); + + sum += pTask->plan->execNodeStat.tableNum; + } + + if (sum < schMgmt.cfg.maxNodeTableNum) { + return TSDB_CODE_SUCCESS; + } + + pLevel->flowCtrl = taosHashInit(pLevel->taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == pLevel->flowCtrl) { + SCH_JOB_ELOG("taosHashInit %d flowCtrl failed", pLevel->taskNum); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SCH_SET_JOB_NEED_FLOW_CTRL(pJob); + + return TSDB_CODE_SUCCESS; +} + +int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) { + SSchLevel *pLevel = pTask->level; + SSchFlowControl *ctrl = NULL; + int32_t code = 0; + SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); + + ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp)); + if (NULL == ctrl) { + SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + SCH_LOCK(SCH_WRITE, &ctrl->lock); + if (ctrl->execTaskNum <= 0) { + SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + --ctrl->execTaskNum; + ctrl->tableNumSum -= pTask->plan->execNodeStat.tableNum; + +_return: + + SCH_UNLOCK(SCH_WRITE, &ctrl->lock); + + SCH_RET(code); +} + +int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { + SSchLevel *pLevel = pTask->level; + int32_t code = 0; + SSchFlowControl *ctrl = NULL; + + do { + ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, SCH_GET_CUR_EP(&pTask->plan->execNode), sizeof(SEp)); + if (NULL == ctrl) { + SSchFlowControl nctrl = {.tableNumSum = pTask->plan->execNodeStat.tableNum, .execTaskNum = 1}; + + code = taosHashPut(pLevel->flowCtrl, SCH_GET_CUR_EP(&pTask->plan->execNode), sizeof(SEp), &nctrl, sizeof(nctrl)); + if (code) { + if (HASH_NODE_EXIST(code)) { + continue; + } + + SCH_TASK_ELOG("taosHashPut flowCtrl failed, size:%d", (int32_t)sizeof(nctrl)); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + *enough = true; + return TSDB_CODE_SUCCESS; + } + + SCH_LOCK(SCH_WRITE, &ctrl->lock); + + if (0 == ctrl->execTaskNum) { + ctrl->tableNumSum = pTask->plan->execNodeStat.tableNum; + ++ctrl->execTaskNum; + + *enough = true; + break; + } + + double sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum; + + if (sum <= schMgmt.cfg.maxNodeTableNum) { + ctrl->tableNumSum = sum; + ++ctrl->execTaskNum; + + *enough = true; + break; + } + + if (NULL == ctrl->taskList) { + ctrl->taskList = taosArrayInit(pLevel->taskNum, POINTER_BYTES); + if (NULL == ctrl->taskList) { + SCH_TASK_ELOG("taosArrayInit taskList failed, size:%d", (int32_t)pLevel->taskNum); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + + if (NULL == taosArrayPush(ctrl->taskList, &pTask)) { + SCH_TASK_ELOG("taosArrayPush to taskList failed, size:%d", (int32_t)taosArrayGetSize(ctrl->taskList)); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + *enough = false; + + break; + } while (true); + +_return: + + SCH_UNLOCK(SCH_WRITE, &ctrl->lock); + + SCH_RET(code); +} + +int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { + do { + SCH_LOCK(SCH_WRITE, &ctrl->lock); + + if (NULL == ctrl->taskList || taosArrayGetSize(ctrl->taskList) <= 0) { + SCH_UNLOCK(SCH_WRITE, &ctrl->lock); + return TSDB_CODE_SUCCESS; + } + + SSchTask *pTask = *(SSchTask **)taosArrayPop(ctrl->taskList); + + SCH_UNLOCK(SCH_WRITE, &ctrl->lock); + + bool enough = false; + SCH_ERR_RET(schCheckIncTaskFlowQuota(pJob, pTask, &enough)); + + if (enough) { + SCH_ERR_RET(schLaunchTaskImpl(pJob, pTask)); + continue; + } + + break; + } while (true); + + return TSDB_CODE_SUCCESS; +} + + +int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) { + if (!SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { + return TSDB_CODE_SUCCESS; + } + + SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask)); + + SSchLevel *pLevel = pTask->level; + SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); + + SSchFlowControl *ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp)); + if (NULL == ctrl) { + SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + SCH_ERR_RET(schLaunchTasksInFlowCtrlListImpl(pJob, ctrl)); + +} + + diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 2b5ad4f26b201251d2d35f0725a5f9a308b8b200..e6a5c285de5ad809c5a6d440e8e27c0b8ba77455 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -91,6 +91,18 @@ void schFreeTask(SSchTask* pTask) { } +static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { + int8_t status = SCH_GET_JOB_STATUS(pJob); + if (pStatus) { + *pStatus = status; + } + + return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED + || status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING + || status == JOB_TASK_STATUS_SUCCEED); +} + + int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { int32_t lastMsgType = atomic_load_32(&pTask->lastMsgType); @@ -197,6 +209,7 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { _return: SCH_JOB_ELOG("invalid job status update, from %d to %d", oriStatus, newStatus); + SCH_ERR_RET(code); } @@ -275,7 +288,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { } SSchLevel *pLevel = taosArrayGet(pJob->levels, 0); - if (pJob->attr.queryJob && pLevel->taskNum > 1) { + if (SCH_IS_QUERY_JOB(pJob) && pLevel->taskNum > 1) { SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -285,10 +298,9 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) { - int32_t idx = atomic_load_8(&pTask->candidateIdx); - SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, idx); + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); if (NULL == addr) { - SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", idx, (int32_t)taosArrayGetSize(pTask->candidateAddrs)); + SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs)); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -323,9 +335,9 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_MAX_TASK_NUM, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); if (NULL == planToTask) { - SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER); + SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -379,7 +391,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { for (int32_t n = 0; n < taskNum; ++n) { SSubplan *plan = (SSubplan*)nodesListGetNode(plans->pNodeList, n); - SCH_SET_JOB_TYPE(&pJob->attr, plan->subplanType); + SCH_SET_JOB_TYPE(pJob, plan->subplanType); SSchTask task = {0}; SSchTask *pTask = &task; @@ -564,15 +576,45 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { } -int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { +int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) { // TODO set retry or not based on task type/errCode/retry times/job status/available eps... - // TODO if needRetry, set task retry info - // TODO set condidateIdx - // TODO record failed but tried task *needRetry = false; return TSDB_CODE_SUCCESS; + + //TODO CHECK epList/condidateList + if (SCH_IS_DATA_SRC_TASK(pTask)) { + + } else { + int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); + + if ((pTask->candidateIdx + 1) >= candidateNum) { + return TSDB_CODE_SUCCESS; + } + + ++pTask->candidateIdx; + } + + +} + +int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { + atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1); + + if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { + SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask)); + } + + if (SCH_IS_DATA_SRC_TASK(pTask)) { + SCH_SWITCH_EPSET(&pTask->plan->execNode); + } else { + ++pTask->candidateIdx; + } + + SCH_ERR_RET(schLaunchTask(pJob, pTask)); + + return TSDB_CODE_SUCCESS; } int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) { @@ -588,14 +630,14 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod } int32_t code = atomic_load_32(&pJob->errCode); - SCH_ERR_RET(code); - SCH_JOB_ELOG("job errCode is invalid, errCode:%d", code); + SCH_JOB_DLOG("job failed with error: %s", tstrerror(code)); + + SCH_RET(code); } - -// Note: no more error processing, handled in function internal +// Note: no more task error processing, handled in function internal int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAILED, errCode)); } @@ -606,38 +648,8 @@ int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { } -// Note: no more error processing, handled in function internal -int32_t schFetchFromRemote(SSchJob *pJob) { - int32_t code = 0; - - if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) { - SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch)); - return TSDB_CODE_SUCCESS; - } - - void *res = atomic_load_ptr(&pJob->res); - if (res) { - atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); - SCH_JOB_DLOG("res already fetched, res:%p", res); - return TSDB_CODE_SUCCESS; - } - - SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH)); - - return TSDB_CODE_SUCCESS; - -_return: - - atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); - - schProcessOnJobFailure(pJob, code); - - return code; -} - - -// Note: no more error processing, handled in function internal +// Note: no more task error processing, handled in function internal int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { int32_t code = 0; @@ -655,9 +667,7 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { _return: - SCH_ERR_RET(schProcessOnJobFailure(pJob, code)); - - SCH_RET(code); + SCH_RET(schProcessOnJobFailure(pJob, code)); } int32_t schProcessOnDataFetched(SSchJob *job) { @@ -665,8 +675,16 @@ int32_t schProcessOnDataFetched(SSchJob *job) { tsem_post(&job->rspSem); } -// Note: no more error processing, handled in function internal +// Note: no more task error processing, handled in function internal int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { + int8_t status = 0; + + if (schJobNeedToStop(pJob, &status)) { + SCH_TASK_DLOG("task failed not processed cause of job status, job status:%d", status); + + SCH_RET(atomic_load_32(&pJob->errCode)); + } + bool needRetry = false; bool moved = false; int32_t taskDone = 0; @@ -674,16 +692,16 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode)); - SCH_ERR_JRET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry)); + SCH_ERR_JRET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry)); if (!needRetry) { SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode)); if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) { - code = schMoveTaskToFailList(pJob, pTask, &moved); - if (code && moved) { - SCH_ERR_RET(errCode); - } + SCH_ERR_JRET(schMoveTaskToFailList(pJob, pTask, &moved)); + } else { + SCH_TASK_DLOG("task already done, no more failure process, status:%d", SCH_GET_TASK_STATUS(pTask)); + return TSDB_CODE_SUCCESS; } SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED); @@ -702,35 +720,29 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) } } } else { - // Note: no more error processing, already handled - SCH_ERR_RET(schLaunchTask(pJob, pTask)); + SCH_ERR_JRET(schHandleTaskRetry(pJob, pTask)); return TSDB_CODE_SUCCESS; } _return: - SCH_ERR_RET(schProcessOnJobFailure(pJob, errCode)); - - SCH_ERR_RET(errCode); + SCH_RET(schProcessOnJobFailure(pJob, errCode)); } - -// Note: no more error processing, handled in function internal +// Note: no more task error processing, handled in function internal int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { bool moved = false; int32_t code = 0; - SSchTask *pErrTask = pTask; - code = schMoveTaskToSuccList(pJob, pTask, &moved); - if (code && moved) { - SCH_ERR_RET(code); - } + SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved)); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED); SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask)); - + + SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask)); + int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0; if (parentNum == 0) { int32_t taskDone = 0; @@ -759,14 +771,9 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { pJob->fetchTask = pTask; - code = schMoveTaskToExecList(pJob, pTask, &moved); - if (code && moved) { - SCH_ERR_RET(code); - } + SCH_ERR_JRET(schMoveTaskToExecList(pJob, pTask, &moved)); - SCH_ERR_RET(schProcessOnJobPartialSuccess(pJob)); - - return TSDB_CODE_SUCCESS; + SCH_RET(schProcessOnJobPartialSuccess(pJob)); } /* @@ -780,8 +787,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { for (int32_t i = 0; i < parentNum; ++i) { SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i); - pErrTask = par; - int32_t readyNum = atomic_add_fetch_32(&par->childReady, 1); SCH_LOCK(SCH_WRITE, &par->lock); @@ -790,7 +795,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SCH_UNLOCK(SCH_WRITE, &par->lock); if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) { - SCH_ERR_RET(schLaunchTask(pJob, par)); + SCH_ERR_RET(schLaunchTaskImpl(pJob, par)); } } @@ -798,22 +803,55 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { _return: - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pErrTask, code)); + SCH_RET(schProcessOnJobFailure(pJob, code)); +} - SCH_ERR_RET(code); + +// Note: no more error processing, handled in function internal +int32_t schFetchFromRemote(SSchJob *pJob) { + int32_t code = 0; + + if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) { + SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch)); + return TSDB_CODE_SUCCESS; + } + + void *res = atomic_load_ptr(&pJob->res); + if (res) { + atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); + + SCH_JOB_DLOG("res already fetched, res:%p", res); + return TSDB_CODE_SUCCESS; + } + + SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH)); + + return TSDB_CODE_SUCCESS; + +_return: + + atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); + + SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code)); } + +// Note: no more task error processing, handled in function internal int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; + int8_t status = 0; + + if (schJobNeedToStop(pJob, &status)) { + SCH_TASK_ELOG("rsp not processed cause of job status, job status:%d", status); + + SCH_RET(atomic_load_32(&pJob->errCode)); + } SCH_ERR_JRET(schValidateTaskReceivedMsgType(pJob, pTask, msgType)); switch (msgType) { case TDMT_VND_CREATE_TABLE_RSP: { - if (rspCode != TSDB_CODE_SUCCESS) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); - } - + SCH_ERR_JRET(rspCode); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); break; @@ -828,9 +866,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch pJob->resNumOfRows += rsp->affectedRows; #else - if (rspCode != TSDB_CODE_SUCCESS) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); - } + SCH_ERR_JRET(rspCode); SSubmitRsp *rsp = (SSubmitRsp *)msg; if (rsp) { @@ -845,9 +881,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch case TDMT_VND_QUERY_RSP: { SQueryTableRsp *rsp = (SQueryTableRsp *)msg; - if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); + SCH_ERR_JRET(rspCode); + if (NULL == msg) { + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } + SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY)); @@ -856,9 +894,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch case TDMT_VND_RES_READY_RSP: { SResReadyRsp *rsp = (SResReadyRsp *)msg; - if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); + SCH_ERR_JRET(rspCode); + if (NULL == msg) { + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } + SCH_ERR_JRET(rsp->code); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); @@ -867,14 +907,15 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch case TDMT_VND_FETCH_RSP: { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; - if (rspCode != TSDB_CODE_SUCCESS || NULL == msg) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); + SCH_ERR_JRET(rspCode); + if (NULL == msg) { + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - + if (pJob->res) { SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->res); tfree(rsp); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } atomic_store_ptr(&pJob->res, rsp); @@ -886,7 +927,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed); - SCH_ERR_JRET(schProcessOnDataFetched(pJob)); + schProcessOnDataFetched(pJob); break; } case TDMT_VND_DROP_TASK_RSP: { @@ -904,9 +945,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch _return: - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, code)); - - SCH_RET(code); + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } @@ -1057,7 +1096,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t code = 0; bool isCandidateAddr = false; if (NULL == addr) { - addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx)); + addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); isCandidateAddr = true; } @@ -1177,28 +1216,17 @@ _return: SCH_RET(code); } -static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { - int8_t status = SCH_GET_JOB_STATUS(pJob); - if (pStatus) { - *pStatus = status; - } - - return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED - || status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING); -} - -// Note: no more error processing, handled in function internal -int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { +int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { int8_t status = 0; int32_t code = 0; + + atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); if (schJobNeedToStop(pJob, &status)) { - SCH_TASK_ELOG("no need to launch task cause of job status, job status:%d", status); + SCH_TASK_DLOG("no need to launch task cause of job status, job status:%d", status); - code = atomic_load_32(&pJob->errCode); - SCH_ERR_RET(code); - SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_RET(atomic_load_32(&pJob->errCode)); } SSubplan *plan = pTask->plan; @@ -1207,38 +1235,69 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen); if (TSDB_CODE_SUCCESS != code || NULL == pTask->msg || pTask->msgLen <= 0) { SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, pTask->msgLen); - SCH_ERR_JRET(code); + SCH_ERR_RET(code); } else { SCH_TASK_DLOG("physical plan len:%d, %s", pTask->msgLen, pTask->msg); } } - SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); + SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask)); // NOTE: race condition: the task should be put into the hash table before send msg to server if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) { - SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask)); + SCH_ERR_RET(schPushTaskToExecList(pJob, pTask)); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING); } - SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); + + SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); + + return TSDB_CODE_SUCCESS; +} + +// Note: no more error processing, handled in function internal +int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { + bool enough = false; + int32_t code = 0; + + if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { + SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough)); + + if (enough) { + SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask)); + } + } else { + SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask)); + } + return TSDB_CODE_SUCCESS; _return: - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, code)); - SCH_RET(code); -} -int32_t schLaunchJob(SSchJob *pJob) { - SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx); + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); +} - SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_EXECUTING)); - +int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) { for (int32_t i = 0; i < level->taskNum; ++i) { SSchTask *pTask = taosArrayGet(level->subTasks, i); + SCH_ERR_RET(schLaunchTask(pJob, pTask)); } + + return TSDB_CODE_SUCCESS; +} + + + +int32_t schLaunchJob(SSchJob *pJob) { + SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx); + SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_EXECUTING)); + + SCH_ERR_RET(schCheckJobNeedFlowCtrl(pJob, level)); + + SCH_ERR_RET(schLaunchLevelTasks(pJob, level)); + return TSDB_CODE_SUCCESS; } @@ -1312,6 +1371,8 @@ void schFreeJobImpl(void *job) { for(int32_t i = 0; i < numOfLevels; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); + schFreeFlowCtrl(pLevel); + int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); for(int32_t j = 0; j < numOfTasks; ++j) { SSchTask* pTask = taosArrayGet(pLevel->subTasks, j); @@ -1423,10 +1484,11 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { schMgmt.cfg = *cfg; if (schMgmt.cfg.maxJobNum == 0) { - schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; + schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM; } } else { - schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; + schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM; + schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM; } schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl); @@ -1611,7 +1673,7 @@ int32_t schedulerFetchRows(int64_t job, void** pData) { SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - if (!SCH_JOB_NEED_FETCH(&pJob->attr)) { + if (!SCH_JOB_NEED_FETCH(pJob)) { SCH_JOB_ELOG("no need to fetch data, status:%d", SCH_GET_JOB_STATUS(pJob)); taosReleaseRef(schMgmt.jobRef, job); SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 80fd7e35b32888f2cff24a2662134d6de14f4fc9..61cc8cd25d07380ba64668174f650d5868dbb8e5 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -535,7 +535,7 @@ TEST(queryTest, normalCase) { char *tablename = "table1"; SVgroupInfo vgInfo = {0}; int64_t job = 0; - SQueryPlan dag = {0}; + SQueryPlan dag; schtInitLogFile();