提交 b3e8e7ba 编写于 作者: D dapan1121

fix: fix scheduler no resp issue

上级 473ac84f
......@@ -175,7 +175,7 @@ typedef struct SSchLevel {
int32_t taskNum;
int32_t taskLaunchedNum;
int32_t taskDoneNum;
SArray *subTasks; // Element is SQueryTask
SArray *subTasks; // Element is SSchTask
} SSchLevel;
typedef struct SSchTaskProfile {
......@@ -213,6 +213,7 @@ typedef struct SSchTask {
typedef struct SSchJobAttr {
EExplainMode explainMode;
bool queryJob;
bool needFetch;
bool needFlowCtrl;
} SSchJobAttr;
......@@ -318,11 +319,11 @@ extern SSchedulerMgmt schMgmt;
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY)
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
#define SCH_JOB_NEED_FETCH(_job) SCH_IS_QUERY_JOB(_job)
#define SCH_IS_WAIT_ALL_JOB(_job) (!SCH_IS_QUERY_JOB(_job))
#define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_JOB_NEED_FETCH(_job) ((_job)->attr.needFetch)
#define SCH_JOB_NEED_WAIT(_job) (!SCH_IS_QUERY_JOB(_job))
#define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (SCH_NETWORK_ERR(_code) && ((_len) > 0))
......
......@@ -230,9 +230,16 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
}
SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
if (SCH_IS_QUERY_JOB(pJob) && pLevel->taskNum > 1) {
SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
if (SCH_IS_QUERY_JOB(pJob)) {
if (pLevel->taskNum > 1) {
SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SSchTask* pTask = taosArrayGet(pLevel->subTasks, 0);
if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType) {
pJob->attr.needFetch = true;
}
}
return TSDB_CODE_SUCCESS;
......@@ -371,9 +378,12 @@ _return:
int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes) {
pRes->code = atomic_load_32(&pJob->errCode);
pRes->numOfRows = pJob->resNumOfRows;
memcpy(pRes, &pJob->execRes, sizeof(pJob->execRes));
pRes->res = pJob->execRes.res;
pRes->msgType = pJob->execRes.msgType;
pJob->execRes.res = NULL;
SCH_JOB_DLOG("execRes dumped, code: %s", tstrerror(pRes->code));
return TSDB_CODE_SUCCESS;
}
......@@ -434,12 +444,12 @@ int32_t schNotifyUserFetchRes(SSchJob* pJob) {
void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
if (SCH_OP_NULL == pJob->opStatus.op) {
SCH_JOB_DLOG("job not in any op, no need to post job res, status:%s", jobTaskStatusStr(pJob->status));
SCH_JOB_DLOG("job not in any operation, no need to post job res, status:%s", jobTaskStatusStr(pJob->status));
return;
}
if (op && pJob->opStatus.op != op) {
SCH_JOB_ELOG("job in op %s mis-match with expected %s", schGetOpStr(pJob->opStatus.op), schGetOpStr(op));
SCH_JOB_ELOG("job in operation %s mis-match with expected %s", schGetOpStr(pJob->opStatus.op), schGetOpStr(op));
return;
}
......@@ -754,23 +764,21 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int
switch (type) {
case SCH_OP_EXEC:
/*
op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
if (SCH_OP_NULL == op || op != type) {
SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status));
}
*/
if (pReq && pReq->syncReq) {
op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
if (SCH_OP_NULL == op || op != type) {
SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status));
}
schDumpJobExecRes(pJob, pReq->pExecRes);
}
break;
case SCH_OP_FETCH:
/*
op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
if (SCH_OP_NULL == op || op != type) {
SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status));
if (pReq && pReq->syncReq) {
op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
if (SCH_OP_NULL == op || op != type) {
SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status));
}
}
*/
break;
case SCH_OP_GET_STATUS:
errCode = TSDB_CODE_SUCCESS;
......
......@@ -170,7 +170,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAIL);
if (SCH_IS_WAIT_ALL_JOB(pJob)) {
if (SCH_JOB_NEED_WAIT(pJob)) {
SCH_LOCK(SCH_WRITE, &pTask->level->lock);
pTask->level->taskFailed++;
taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
......@@ -212,7 +212,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
if (parentNum == 0) {
int32_t taskDone = 0;
if (SCH_IS_WAIT_ALL_JOB(pJob)) {
if (SCH_JOB_NEED_WAIT(pJob)) {
SCH_LOCK(SCH_WRITE, &pTask->level->lock);
pTask->level->taskSucceed++;
taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
......@@ -792,7 +792,7 @@ int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
}
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
if (!SCH_IS_NEED_DROP_JOB(pJob)) {
if (!SCH_JOB_NEED_DROP(pJob)) {
return;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册