提交 194722ee 编写于 作者: D dapan1121

feature/qnode

上级 fa9d652e
......@@ -192,6 +192,7 @@ typedef struct SSubplan {
int32_t msgType; // message type for subplan, used to denote the send message type to vnode.
int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner.
SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node
SQueryNodeStat execNodeStat; // only for scan subplan
SNodeList* pChildren; // the datasource subplan,from which to fetch the result
SNodeList* pParents; // the data destination subplan, get data from current subplan
SPhysiNode* pNode; // physical plan of current subplan
......
......@@ -35,7 +35,6 @@ enum {
JOB_TASK_STATUS_CANCELLING,
JOB_TASK_STATUS_CANCELLED,
JOB_TASK_STATUS_DROPPING,
JOB_TASK_STATUS_FREEING,
};
enum {
......@@ -133,6 +132,10 @@ typedef struct SQueryNodeAddr {
SEpSet epset;
} SQueryNodeAddr;
typedef struct SQueryNodeStat {
double tableNum; //table number in million
} SQueryNodeStat;
int32_t initTaskQueue();
int32_t cleanupTaskQueue();
......
......@@ -25,6 +25,7 @@ extern "C" {
typedef struct SSchedulerCfg {
uint32_t maxJobNum;
double maxNodeTableNum;
} SSchedulerCfg;
typedef struct SQueryProfileSummary {
......
......@@ -44,7 +44,7 @@ static SDatabaseOptions* setDbBlocks(SAstCreateContext* pCxt, SDatabaseOptions*
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_TOTAL_BLOCKS || val > TSDB_MAX_TOTAL_BLOCKS) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen,
"invalid db option totalBlocks: %d valid range: [%d, %d]", val, TSDB_MIN_TOTAL_BLOCKS, TSDB_MAX_TOTAL_BLOCKS);
"invalid db option totalBlocks: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_TOTAL_BLOCKS, TSDB_MAX_TOTAL_BLOCKS);
pCxt->valid = false;
return pOptions;
}
......@@ -56,7 +56,7 @@ static SDatabaseOptions* setDbCache(SAstCreateContext* pCxt, SDatabaseOptions* p
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_CACHE_BLOCK_SIZE || val > TSDB_MAX_CACHE_BLOCK_SIZE) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen,
"invalid db option cacheBlockSize: %d valid range: [%d, %d]", val, TSDB_MIN_CACHE_BLOCK_SIZE, TSDB_MAX_CACHE_BLOCK_SIZE);
"invalid db option cacheBlockSize: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_CACHE_BLOCK_SIZE, TSDB_MAX_CACHE_BLOCK_SIZE);
pCxt->valid = false;
return pOptions;
}
......@@ -68,7 +68,7 @@ static SDatabaseOptions* setDbCacheLast(SAstCreateContext* pCxt, SDatabaseOption
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_DB_CACHE_LAST_ROW || val > TSDB_MAX_DB_CACHE_LAST_ROW) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen,
"invalid db option cacheLast: %d valid range: [%d, %d]", val, TSDB_MIN_DB_CACHE_LAST_ROW, TSDB_MAX_DB_CACHE_LAST_ROW);
"invalid db option cacheLast: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_DB_CACHE_LAST_ROW, TSDB_MAX_DB_CACHE_LAST_ROW);
pCxt->valid = false;
return pOptions;
}
......@@ -80,7 +80,7 @@ static SDatabaseOptions* setDbComp(SAstCreateContext* pCxt, SDatabaseOptions* pO
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_COMP_LEVEL || val > TSDB_MAX_COMP_LEVEL) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen,
"invalid db option compression: %d valid range: [%d, %d]", val, TSDB_MIN_COMP_LEVEL, TSDB_MAX_COMP_LEVEL);
"invalid db option compression: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_COMP_LEVEL, TSDB_MAX_COMP_LEVEL);
pCxt->valid = false;
return pOptions;
}
......@@ -92,7 +92,7 @@ static SDatabaseOptions* setDbDays(SAstCreateContext* pCxt, SDatabaseOptions* pO
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_DAYS_PER_FILE || val > TSDB_MAX_DAYS_PER_FILE) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen,
"invalid db option daysPerFile: %d valid range: [%d, %d]", val, TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE);
"invalid db option daysPerFile: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE);
pCxt->valid = false;
return pOptions;
}
......@@ -104,7 +104,7 @@ static SDatabaseOptions* setDbFsync(SAstCreateContext* pCxt, SDatabaseOptions* p
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_FSYNC_PERIOD || val > TSDB_MAX_FSYNC_PERIOD) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen,
"invalid db option fsyncPeriod: %d valid range: [%d, %d]", val, TSDB_MIN_FSYNC_PERIOD, TSDB_MAX_FSYNC_PERIOD);
"invalid db option fsyncPeriod: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_FSYNC_PERIOD, TSDB_MAX_FSYNC_PERIOD);
pCxt->valid = false;
return pOptions;
}
......@@ -116,7 +116,7 @@ static SDatabaseOptions* setDbMaxRows(SAstCreateContext* pCxt, SDatabaseOptions*
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_MAX_ROW_FBLOCK || val > TSDB_MAX_MAX_ROW_FBLOCK) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen,
"invalid db option maxRowsPerBlock: %d valid range: [%d, %d]", val, TSDB_MIN_MAX_ROW_FBLOCK, TSDB_MAX_MAX_ROW_FBLOCK);
"invalid db option maxRowsPerBlock: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_MAX_ROW_FBLOCK, TSDB_MAX_MAX_ROW_FBLOCK);
pCxt->valid = false;
return pOptions;
}
......@@ -128,7 +128,7 @@ static SDatabaseOptions* setDbMinRows(SAstCreateContext* pCxt, SDatabaseOptions*
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_MIN_ROW_FBLOCK || val > TSDB_MAX_MIN_ROW_FBLOCK) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen,
"invalid db option minRowsPerBlock: %d valid range: [%d, %d]", val, TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK);
"invalid db option minRowsPerBlock: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK);
pCxt->valid = false;
return pOptions;
}
......@@ -140,7 +140,7 @@ static SDatabaseOptions* setDbKeep(SAstCreateContext* pCxt, SDatabaseOptions* pO
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_KEEP || val > TSDB_MAX_KEEP) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen,
"invalid db option keep: %d valid range: [%d, %d]", val, TSDB_MIN_KEEP, TSDB_MAX_KEEP);
"invalid db option keep: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_KEEP, TSDB_MAX_KEEP);
pCxt->valid = false;
return pOptions;
}
......@@ -168,7 +168,7 @@ static SDatabaseOptions* setDbQuorum(SAstCreateContext* pCxt, SDatabaseOptions*
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_DB_QUORUM_OPTION || val > TSDB_MAX_DB_QUORUM_OPTION) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen,
"invalid db option quorum: %d valid range: [%d, %d]", val, TSDB_MIN_DB_QUORUM_OPTION, TSDB_MAX_DB_QUORUM_OPTION);
"invalid db option quorum: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_DB_QUORUM_OPTION, TSDB_MAX_DB_QUORUM_OPTION);
pCxt->valid = false;
return pOptions;
}
......@@ -180,7 +180,7 @@ static SDatabaseOptions* setDbReplica(SAstCreateContext* pCxt, SDatabaseOptions*
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_DB_REPLICA_OPTION || val > TSDB_MAX_DB_REPLICA_OPTION) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen,
"invalid db option replications: %d valid range: [%d, %d]", val, TSDB_MIN_DB_REPLICA_OPTION, TSDB_MAX_DB_REPLICA_OPTION);
"invalid db option replications: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_DB_REPLICA_OPTION, TSDB_MAX_DB_REPLICA_OPTION);
pCxt->valid = false;
return pOptions;
}
......@@ -192,7 +192,7 @@ static SDatabaseOptions* setDbTtl(SAstCreateContext* pCxt, SDatabaseOptions* pOp
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_DB_TTL_OPTION) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen,
"invalid db option ttl: %d, should be greater than or equal to %d", val, TSDB_MIN_DB_TTL_OPTION);
"invalid db option ttl: %" PRId64 ", should be greater than or equal to %d", val, TSDB_MIN_DB_TTL_OPTION);
pCxt->valid = false;
return pOptions;
}
......@@ -203,7 +203,7 @@ static SDatabaseOptions* setDbTtl(SAstCreateContext* pCxt, SDatabaseOptions* pOp
static SDatabaseOptions* setDbWal(SAstCreateContext* pCxt, SDatabaseOptions* pOptions, const SToken* pVal) {
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_WAL_LEVEL || val > TSDB_MAX_WAL_LEVEL) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid db option walLevel: %d, only 1-2 allowed", val);
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid db option walLevel: %" PRId64 ", only 1-2 allowed", val);
pCxt->valid = false;
return pOptions;
}
......@@ -215,7 +215,7 @@ static SDatabaseOptions* setDbVgroups(SAstCreateContext* pCxt, SDatabaseOptions*
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_VNODES_PER_DB || val > TSDB_MAX_VNODES_PER_DB) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen,
"invalid db option vgroups: %d valid range: [%d, %d]", val, TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB);
"invalid db option vgroups: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB);
pCxt->valid = false;
return pOptions;
}
......@@ -226,7 +226,7 @@ static SDatabaseOptions* setDbVgroups(SAstCreateContext* pCxt, SDatabaseOptions*
static SDatabaseOptions* setDbSingleStable(SAstCreateContext* pCxt, SDatabaseOptions* pOptions, const SToken* pVal) {
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_DB_SINGLE_STABLE_OPTION || val > TSDB_MAX_DB_SINGLE_STABLE_OPTION) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid db option singleStable: %d, only 0-1 allowed", val);
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid db option singleStable: %" PRId64 ", only 0-1 allowed", val);
pCxt->valid = false;
return pOptions;
}
......@@ -237,7 +237,7 @@ static SDatabaseOptions* setDbSingleStable(SAstCreateContext* pCxt, SDatabaseOpt
static SDatabaseOptions* setDbStreamMode(SAstCreateContext* pCxt, SDatabaseOptions* pOptions, const SToken* pVal) {
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_DB_STREAM_MODE_OPTION || val > TSDB_MAX_DB_STREAM_MODE_OPTION) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid db option streamMode: %d, only 0-1 allowed", val);
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid db option streamMode: %" PRId64 ", only 0-1 allowed", val);
pCxt->valid = false;
return pOptions;
}
......@@ -269,7 +269,7 @@ static STableOptions* setTableKeep(SAstCreateContext* pCxt, STableOptions* pOpti
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_KEEP || val > TSDB_MAX_KEEP) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen,
"invalid table option keep: %d valid range: [%d, %d]", val, TSDB_MIN_KEEP, TSDB_MAX_KEEP);
"invalid table option keep: %" PRId64 " valid range: [%d, %d]", val, TSDB_MIN_KEEP, TSDB_MAX_KEEP);
pCxt->valid = false;
return pOptions;
}
......@@ -281,7 +281,7 @@ static STableOptions* setTableTtl(SAstCreateContext* pCxt, STableOptions* pOptio
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_DB_TTL_OPTION) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen,
"invalid table option ttl: %d, should be greater than or equal to %d", val, TSDB_MIN_DB_TTL_OPTION);
"invalid table option ttl: %" PRId64 ", should be greater than or equal to %d", val, TSDB_MIN_DB_TTL_OPTION);
pCxt->valid = false;
return pOptions;
}
......@@ -292,7 +292,7 @@ static STableOptions* setTableTtl(SAstCreateContext* pCxt, STableOptions* pOptio
static STableOptions* setTableComment(SAstCreateContext* pCxt, STableOptions* pOptions, const SToken* pVal) {
if (pVal->n >= sizeof(pOptions->comments)) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen,
"invalid table option comment, length cannot exceed %d", sizeof(pOptions->comments) - 1);
"invalid table option comment, length cannot exceed %d", (int32_t)(sizeof(pOptions->comments) - 1));
pCxt->valid = false;
return pOptions;
}
......
......@@ -26,8 +26,9 @@ extern "C" {
#include "scheduler.h"
#include "thash.h"
#define SCHEDULE_DEFAULT_JOB_NUMBER 1000
#define SCHEDULE_DEFAULT_TASK_NUMBER 1000
#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 20.0 /* in million */
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
......@@ -72,17 +73,26 @@ typedef struct SSchCallbackParam {
uint64_t queryId;
int64_t refId;
uint64_t taskId;
SEpSet epSet;
} SSchCallbackParam;
typedef struct SSchFlowControl {
SRWLatch lock;
double tableNumSum;
uint32_t execTaskNum;
SArray *taskList; // Element is SQueryTask*
} SSchFlowControl;
typedef struct SSchLevel {
int32_t level;
int8_t status;
SRWLatch lock;
int32_t taskFailed;
int32_t taskSucceed;
int32_t taskNum;
int32_t taskLaunchIdx; // launch startup index
SArray *subTasks; // Element is SQueryTask
int32_t level;
int8_t status;
SRWLatch lock;
int32_t taskFailed;
int32_t taskSucceed;
int32_t taskNum;
int32_t taskLaunchedNum;
SHashObj *flowCtrl; // key is ep, element is SSchFlowControl
SArray *subTasks; // Element is SQueryTask
} SSchLevel;
typedef struct SSchTask {
......@@ -102,13 +112,14 @@ typedef struct SSchTask {
int32_t childReady; // child task ready number
SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask*
void* handle; // task send handle
void* handle; // task send handle
} SSchTask;
typedef struct SSchJobAttr {
bool needFetch;
bool syncSchedule;
bool queryJob;
bool needFlowCtrl;
} SSchJobAttr;
typedef struct SSchJob {
......@@ -140,6 +151,8 @@ typedef struct SSchJob {
SQueryProfileSummary summary;
} SSchJob;
extern SSchedulerMgmt schMgmt;
#define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
......@@ -152,8 +165,17 @@ typedef struct SSchJob {
#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st)
#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != SUBPLAN_TYPE_MODIFY)
#define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob)
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEAF_TASK(_job, _task) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
#define SCH_JOB_NEED_FETCH(_job) SCH_IS_QUERY_JOB(_job)
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epset.eps[(_addr)->epset.inUse])
#define SCH_SWITCH_EPSET(_addr) ((_addr)->epset.inUse = ((_addr)->epset.inUse + 1) % (_addr)->epset.numOfEps)
#define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
......@@ -173,10 +195,18 @@ typedef struct SSchJob {
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
static int32_t schLaunchTask(SSchJob *job, SSchTask *task);
static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
int32_t schLaunchTask(SSchJob *job, SSchTask *task);
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
SSchJob *schAcquireJob(int64_t refId);
int32_t schReleaseJob(int64_t refId);
void schFreeFlowCtrl(SSchLevel *pLevel);
int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
int32_t schFetchFromRemote(SSchJob *pJob);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "schedulerInt.h"
#include "tmsg.h"
#include "query.h"
#include "catalog.h"
#include "tref.h"
void schFreeFlowCtrl(SSchLevel *pLevel) {
if (NULL == pLevel->flowCtrl) {
return;
}
SSchFlowControl *ctrl = NULL;
void *pIter = taosHashIterate(pLevel->flowCtrl, NULL);
while (pIter) {
ctrl = (SSchFlowControl *)pIter;
if (ctrl->taskList) {
taosArrayDestroy(ctrl->taskList);
}
pIter = taosHashIterate(pLevel->flowCtrl, pIter);
}
taosHashCleanup(pLevel->flowCtrl);
pLevel->flowCtrl = NULL;
}
int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
if (!SCH_IS_QUERY_JOB(pJob)) {
return TSDB_CODE_SUCCESS;
}
double sum = 0;
for (int32_t i = 0; i < pLevel->taskNum; ++i) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);
sum += pTask->plan->execNodeStat.tableNum;
}
if (sum < schMgmt.cfg.maxNodeTableNum) {
return TSDB_CODE_SUCCESS;
}
pLevel->flowCtrl = taosHashInit(pLevel->taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pLevel->flowCtrl) {
SCH_JOB_ELOG("taosHashInit %d flowCtrl failed", pLevel->taskNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_SET_JOB_NEED_FLOW_CTRL(pJob);
return TSDB_CODE_SUCCESS;
}
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) {
SSchLevel *pLevel = pTask->level;
SSchFlowControl *ctrl = NULL;
int32_t code = 0;
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp));
if (NULL == ctrl) {
SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_LOCK(SCH_WRITE, &ctrl->lock);
if (ctrl->execTaskNum <= 0) {
SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
--ctrl->execTaskNum;
ctrl->tableNumSum -= pTask->plan->execNodeStat.tableNum;
_return:
SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
SCH_RET(code);
}
int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
SSchLevel *pLevel = pTask->level;
int32_t code = 0;
SSchFlowControl *ctrl = NULL;
do {
ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, SCH_GET_CUR_EP(&pTask->plan->execNode), sizeof(SEp));
if (NULL == ctrl) {
SSchFlowControl nctrl = {.tableNumSum = pTask->plan->execNodeStat.tableNum, .execTaskNum = 1};
code = taosHashPut(pLevel->flowCtrl, SCH_GET_CUR_EP(&pTask->plan->execNode), sizeof(SEp), &nctrl, sizeof(nctrl));
if (code) {
if (HASH_NODE_EXIST(code)) {
continue;
}
SCH_TASK_ELOG("taosHashPut flowCtrl failed, size:%d", (int32_t)sizeof(nctrl));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
*enough = true;
return TSDB_CODE_SUCCESS;
}
SCH_LOCK(SCH_WRITE, &ctrl->lock);
if (0 == ctrl->execTaskNum) {
ctrl->tableNumSum = pTask->plan->execNodeStat.tableNum;
++ctrl->execTaskNum;
*enough = true;
break;
}
double sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum;
if (sum <= schMgmt.cfg.maxNodeTableNum) {
ctrl->tableNumSum = sum;
++ctrl->execTaskNum;
*enough = true;
break;
}
if (NULL == ctrl->taskList) {
ctrl->taskList = taosArrayInit(pLevel->taskNum, POINTER_BYTES);
if (NULL == ctrl->taskList) {
SCH_TASK_ELOG("taosArrayInit taskList failed, size:%d", (int32_t)pLevel->taskNum);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
if (NULL == taosArrayPush(ctrl->taskList, &pTask)) {
SCH_TASK_ELOG("taosArrayPush to taskList failed, size:%d", (int32_t)taosArrayGetSize(ctrl->taskList));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
*enough = false;
break;
} while (true);
_return:
SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
SCH_RET(code);
}
int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
do {
SCH_LOCK(SCH_WRITE, &ctrl->lock);
if (NULL == ctrl->taskList || taosArrayGetSize(ctrl->taskList) <= 0) {
SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
return TSDB_CODE_SUCCESS;
}
SSchTask *pTask = *(SSchTask **)taosArrayPop(ctrl->taskList);
SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
bool enough = false;
SCH_ERR_RET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));
if (enough) {
SCH_ERR_RET(schLaunchTaskImpl(pJob, pTask));
continue;
}
break;
} while (true);
return TSDB_CODE_SUCCESS;
}
int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) {
if (!SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
return TSDB_CODE_SUCCESS;
}
SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));
SSchLevel *pLevel = pTask->level;
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
SSchFlowControl *ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp));
if (NULL == ctrl) {
SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_ERR_RET(schLaunchTasksInFlowCtrlListImpl(pJob, ctrl));
}
......@@ -91,6 +91,18 @@ void schFreeTask(SSchTask* pTask) {
}
static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
int8_t status = SCH_GET_JOB_STATUS(pJob);
if (pStatus) {
*pStatus = status;
}
return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED
|| status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING
|| status == JOB_TASK_STATUS_SUCCEED);
}
int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
int32_t lastMsgType = atomic_load_32(&pTask->lastMsgType);
......@@ -197,6 +209,7 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
_return:
SCH_JOB_ELOG("invalid job status update, from %d to %d", oriStatus, newStatus);
SCH_ERR_RET(code);
}
......@@ -275,7 +288,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
}
SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
if (pJob->attr.queryJob && pLevel->taskNum > 1) {
if (SCH_IS_QUERY_JOB(pJob) && pLevel->taskNum > 1) {
SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
......@@ -285,10 +298,9 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
int32_t idx = atomic_load_8(&pTask->candidateIdx);
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, idx);
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
if (NULL == addr) {
SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", idx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
......@@ -323,9 +335,9 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_MAX_TASK_NUM, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == planToTask) {
SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER);
SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -379,7 +391,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
for (int32_t n = 0; n < taskNum; ++n) {
SSubplan *plan = (SSubplan*)nodesListGetNode(plans->pNodeList, n);
SCH_SET_JOB_TYPE(&pJob->attr, plan->subplanType);
SCH_SET_JOB_TYPE(pJob, plan->subplanType);
SSchTask task = {0};
SSchTask *pTask = &task;
......@@ -564,15 +576,45 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
}
int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) {
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
// TODO set retry or not based on task type/errCode/retry times/job status/available eps...
// TODO if needRetry, set task retry info
// TODO set condidateIdx
// TODO record failed but tried task
*needRetry = false;
return TSDB_CODE_SUCCESS;
//TODO CHECK epList/condidateList
if (SCH_IS_DATA_SRC_TASK(pTask)) {
} else {
int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
if ((pTask->candidateIdx + 1) >= candidateNum) {
return TSDB_CODE_SUCCESS;
}
++pTask->candidateIdx;
}
}
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));
}
if (SCH_IS_DATA_SRC_TASK(pTask)) {
SCH_SWITCH_EPSET(&pTask->plan->execNode);
} else {
++pTask->candidateIdx;
}
SCH_ERR_RET(schLaunchTask(pJob, pTask));
return TSDB_CODE_SUCCESS;
}
int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) {
......@@ -588,14 +630,14 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod
}
int32_t code = atomic_load_32(&pJob->errCode);
SCH_ERR_RET(code);
SCH_JOB_ELOG("job errCode is invalid, errCode:%d", code);
SCH_JOB_DLOG("job failed with error: %s", tstrerror(code));
SCH_RET(code);
}
// Note: no more error processing, handled in function internal
// Note: no more task error processing, handled in function internal
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAILED, errCode));
}
......@@ -606,38 +648,8 @@ int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) {
}
// Note: no more error processing, handled in function internal
int32_t schFetchFromRemote(SSchJob *pJob) {
int32_t code = 0;
if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) {
SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch));
return TSDB_CODE_SUCCESS;
}
void *res = atomic_load_ptr(&pJob->res);
if (res) {
atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);
SCH_JOB_DLOG("res already fetched, res:%p", res);
return TSDB_CODE_SUCCESS;
}
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH));
return TSDB_CODE_SUCCESS;
_return:
atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);
schProcessOnJobFailure(pJob, code);
return code;
}
// Note: no more error processing, handled in function internal
// Note: no more task error processing, handled in function internal
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
int32_t code = 0;
......@@ -655,9 +667,7 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
_return:
SCH_ERR_RET(schProcessOnJobFailure(pJob, code));
SCH_RET(code);
SCH_RET(schProcessOnJobFailure(pJob, code));
}
int32_t schProcessOnDataFetched(SSchJob *job) {
......@@ -665,8 +675,16 @@ int32_t schProcessOnDataFetched(SSchJob *job) {
tsem_post(&job->rspSem);
}
// Note: no more error processing, handled in function internal
// Note: no more task error processing, handled in function internal
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_DLOG("task failed not processed cause of job status, job status:%d", status);
SCH_RET(atomic_load_32(&pJob->errCode));
}
bool needRetry = false;
bool moved = false;
int32_t taskDone = 0;
......@@ -674,16 +692,16 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
SCH_ERR_JRET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry));
SCH_ERR_JRET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
if (!needRetry) {
SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
code = schMoveTaskToFailList(pJob, pTask, &moved);
if (code && moved) {
SCH_ERR_RET(errCode);
}
SCH_ERR_JRET(schMoveTaskToFailList(pJob, pTask, &moved));
} else {
SCH_TASK_DLOG("task already done, no more failure process, status:%d", SCH_GET_TASK_STATUS(pTask));
return TSDB_CODE_SUCCESS;
}
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED);
......@@ -702,35 +720,29 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
}
}
} else {
// Note: no more error processing, already handled
SCH_ERR_RET(schLaunchTask(pJob, pTask));
SCH_ERR_JRET(schHandleTaskRetry(pJob, pTask));
return TSDB_CODE_SUCCESS;
}
_return:
SCH_ERR_RET(schProcessOnJobFailure(pJob, errCode));
SCH_ERR_RET(errCode);
SCH_RET(schProcessOnJobFailure(pJob, errCode));
}
// Note: no more error processing, handled in function internal
// Note: no more task error processing, handled in function internal
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
bool moved = false;
int32_t code = 0;
SSchTask *pErrTask = pTask;
code = schMoveTaskToSuccList(pJob, pTask, &moved);
if (code && moved) {
SCH_ERR_RET(code);
}
SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED);
SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask));
SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
if (parentNum == 0) {
int32_t taskDone = 0;
......@@ -759,14 +771,9 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
pJob->fetchTask = pTask;
code = schMoveTaskToExecList(pJob, pTask, &moved);
if (code && moved) {
SCH_ERR_RET(code);
}
SCH_ERR_JRET(schMoveTaskToExecList(pJob, pTask, &moved));
SCH_ERR_RET(schProcessOnJobPartialSuccess(pJob));
return TSDB_CODE_SUCCESS;
SCH_RET(schProcessOnJobPartialSuccess(pJob));
}
/*
......@@ -780,8 +787,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
for (int32_t i = 0; i < parentNum; ++i) {
SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i);
pErrTask = par;
int32_t readyNum = atomic_add_fetch_32(&par->childReady, 1);
SCH_LOCK(SCH_WRITE, &par->lock);
......@@ -790,7 +795,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SCH_UNLOCK(SCH_WRITE, &par->lock);
if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) {
SCH_ERR_RET(schLaunchTask(pJob, par));
SCH_ERR_RET(schLaunchTaskImpl(pJob, par));
}
}
......@@ -798,22 +803,55 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
_return:
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pErrTask, code));
SCH_RET(schProcessOnJobFailure(pJob, code));
}
SCH_ERR_RET(code);
// Note: no more error processing, handled in function internal
int32_t schFetchFromRemote(SSchJob *pJob) {
int32_t code = 0;
if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) {
SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch));
return TSDB_CODE_SUCCESS;
}
void *res = atomic_load_ptr(&pJob->res);
if (res) {
atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);
SCH_JOB_DLOG("res already fetched, res:%p", res);
return TSDB_CODE_SUCCESS;
}
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH));
return TSDB_CODE_SUCCESS;
_return:
atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);
SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
}
// Note: no more task error processing, handled in function internal
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
int32_t code = 0;
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("rsp not processed cause of job status, job status:%d", status);
SCH_RET(atomic_load_32(&pJob->errCode));
}
SCH_ERR_JRET(schValidateTaskReceivedMsgType(pJob, pTask, msgType));
switch (msgType) {
case TDMT_VND_CREATE_TABLE_RSP: {
if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
}
SCH_ERR_JRET(rspCode);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break;
......@@ -828,9 +866,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
pJob->resNumOfRows += rsp->affectedRows;
#else
if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
}
SCH_ERR_JRET(rspCode);
SSubmitRsp *rsp = (SSubmitRsp *)msg;
if (rsp) {
......@@ -845,9 +881,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
case TDMT_VND_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
SCH_ERR_JRET(rspCode);
if (NULL == msg) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
SCH_ERR_JRET(rsp->code);
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY));
......@@ -856,9 +894,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
case TDMT_VND_RES_READY_RSP: {
SResReadyRsp *rsp = (SResReadyRsp *)msg;
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
SCH_ERR_JRET(rspCode);
if (NULL == msg) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
SCH_ERR_JRET(rsp->code);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
......@@ -867,14 +907,15 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
case TDMT_VND_FETCH_RSP: {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
SCH_ERR_JRET(rspCode);
if (NULL == msg) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (pJob->res) {
SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->res);
tfree(rsp);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
atomic_store_ptr(&pJob->res, rsp);
......@@ -886,7 +927,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);
SCH_ERR_JRET(schProcessOnDataFetched(pJob));
schProcessOnDataFetched(pJob);
break;
}
case TDMT_VND_DROP_TASK_RSP: {
......@@ -904,9 +945,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
_return:
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, code));
SCH_RET(code);
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
......@@ -1057,7 +1096,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
int32_t code = 0;
bool isCandidateAddr = false;
if (NULL == addr) {
addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx));
addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
isCandidateAddr = true;
}
......@@ -1177,28 +1216,17 @@ _return:
SCH_RET(code);
}
static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
int8_t status = SCH_GET_JOB_STATUS(pJob);
if (pStatus) {
*pStatus = status;
}
return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED
|| status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING);
}
// Note: no more error processing, handled in function internal
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
int8_t status = 0;
int32_t code = 0;
atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("no need to launch task cause of job status, job status:%d", status);
SCH_TASK_DLOG("no need to launch task cause of job status, job status:%d", status);
code = atomic_load_32(&pJob->errCode);
SCH_ERR_RET(code);
SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
SCH_RET(atomic_load_32(&pJob->errCode));
}
SSubplan *plan = pTask->plan;
......@@ -1207,38 +1235,69 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen);
if (TSDB_CODE_SUCCESS != code || NULL == pTask->msg || pTask->msgLen <= 0) {
SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, pTask->msgLen);
SCH_ERR_JRET(code);
SCH_ERR_RET(code);
} else {
SCH_TASK_DLOG("physical plan len:%d, %s", pTask->msgLen, pTask->msg);
}
}
SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
// NOTE: race condition: the task should be put into the hash table before send msg to server
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) {
SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING);
}
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
return TSDB_CODE_SUCCESS;
}
// Note: no more error processing, handled in function internal
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
bool enough = false;
int32_t code = 0;
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));
if (enough) {
SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
}
} else {
SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
}
return TSDB_CODE_SUCCESS;
_return:
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, code));
SCH_RET(code);
}
int32_t schLaunchJob(SSchJob *pJob) {
SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_EXECUTING));
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
for (int32_t i = 0; i < level->taskNum; ++i) {
SSchTask *pTask = taosArrayGet(level->subTasks, i);
SCH_ERR_RET(schLaunchTask(pJob, pTask));
}
return TSDB_CODE_SUCCESS;
}
int32_t schLaunchJob(SSchJob *pJob) {
SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_EXECUTING));
SCH_ERR_RET(schCheckJobNeedFlowCtrl(pJob, level));
SCH_ERR_RET(schLaunchLevelTasks(pJob, level));
return TSDB_CODE_SUCCESS;
}
......@@ -1312,6 +1371,8 @@ void schFreeJobImpl(void *job) {
for(int32_t i = 0; i < numOfLevels; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
schFreeFlowCtrl(pLevel);
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
for(int32_t j = 0; j < numOfTasks; ++j) {
SSchTask* pTask = taosArrayGet(pLevel->subTasks, j);
......@@ -1423,10 +1484,11 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
schMgmt.cfg = *cfg;
if (schMgmt.cfg.maxJobNum == 0) {
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
}
} else {
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
}
schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl);
......@@ -1611,7 +1673,7 @@ int32_t schedulerFetchRows(int64_t job, void** pData) {
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (!SCH_JOB_NEED_FETCH(&pJob->attr)) {
if (!SCH_JOB_NEED_FETCH(pJob)) {
SCH_JOB_ELOG("no need to fetch data, status:%d", SCH_GET_JOB_STATUS(pJob));
taosReleaseRef(schMgmt.jobRef, job);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
......
......@@ -535,7 +535,7 @@ TEST(queryTest, normalCase) {
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
int64_t job = 0;
SQueryPlan dag = {0};
SQueryPlan dag;
schtInitLogFile();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册