提交 93e94127 编写于 作者: D dapan1121

feature/qnode

上级 180eba5b
...@@ -35,6 +35,7 @@ enum { ...@@ -35,6 +35,7 @@ enum {
JOB_TASK_STATUS_CANCELLING, JOB_TASK_STATUS_CANCELLING,
JOB_TASK_STATUS_CANCELLED, JOB_TASK_STATUS_CANCELLED,
JOB_TASK_STATUS_DROPPING, JOB_TASK_STATUS_DROPPING,
JOB_TASK_STATUS_FREEING,
}; };
typedef struct STableComInfo { typedef struct STableComInfo {
......
...@@ -61,14 +61,17 @@ typedef struct SSchLevel { ...@@ -61,14 +61,17 @@ typedef struct SSchLevel {
typedef struct SSchTask { typedef struct SSchTask {
uint64_t taskId; // task id uint64_t taskId; // task id
SRWLatch lock; // task lock
SSchLevel *level; // level SSchLevel *level; // level
SSubplan *plan; // subplan SSubplan *plan; // subplan
char *msg; // operator tree char *msg; // operator tree
int32_t msgLen; // msg length int32_t msgLen; // msg length
int8_t status; // task status int8_t status; // task status
SQueryNodeAddr execAddr; // task actual executed node address int32_t lastMsgType; // last sent msg type
SQueryNodeAddr succeedAddr; // task executed success node address
int8_t candidateIdx; // current try condidation index int8_t candidateIdx; // current try condidation index
SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr
SArray *execAddrs; // all tried node for current task, element is SQueryNodeAddr
SQueryProfileSummary summary; // task execution summary SQueryProfileSummary summary; // task execution summary
int32_t childReady; // child task ready number int32_t childReady; // child task ready number
SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask*
...@@ -96,22 +99,24 @@ typedef struct SSchJob { ...@@ -96,22 +99,24 @@ typedef struct SSchJob {
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
int32_t ref;
int8_t status; int8_t status;
SQueryNodeAddr resNode; SQueryNodeAddr resNode;
tsem_t rspSem; tsem_t rspSem;
int32_t userFetch; int8_t userFetch;
int32_t remoteFetch; int32_t remoteFetch;
SSchTask *fetchTask; SSchTask *fetchTask;
int32_t errCode; int32_t errCode;
void *res; void *res; //TODO free it or not
int32_t resNumOfRows; int32_t resNumOfRows;
SQueryProfileSummary summary; SQueryProfileSummary summary;
} SSchJob; } SSchJob;
#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE
#define SCH_TASK_READY_TO_LUNCH(task) (atomic_load_32(&(task)->childReady) >= taosArrayGetSize((task)->children)) #define SCH_TASK_READY_TO_LUNCH(task) (atomic_load_32(&(task)->childReady) >= taosArrayGetSize((task)->children))
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN) #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY) #define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
#define SCH_TASK_NO_NEED_DROP(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
...@@ -127,6 +132,7 @@ typedef struct SSchJob { ...@@ -127,6 +132,7 @@ typedef struct SSchJob {
#define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__) #define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__) #define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_WLOG(param, ...) qWarn("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
...@@ -137,7 +143,7 @@ typedef struct SSchJob { ...@@ -137,7 +143,7 @@ typedef struct SSchJob {
static int32_t schLaunchTask(SSchJob *job, SSchTask *task); static int32_t schLaunchTask(SSchJob *job, SSchTask *task);
static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType); static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,9 +20,75 @@ ...@@ -20,9 +20,75 @@
static SSchedulerMgmt schMgmt = {0}; static SSchedulerMgmt schMgmt = {0};
int32_t schValidateStatus(SSchJob *pJob, int8_t oriStatus, int8_t newStatus) {
int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) {
pTask->plan = pPlan;
pTask->level = pLevel;
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
pTask->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
pTask->execAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
if (NULL == pTask->execAddrs) {
SCH_TASK_ELOG("taosArrayInit %d exec addrs failed", SCH_MAX_CONDIDATE_EP_NUM);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
return TSDB_CODE_SUCCESS;
}
void schFreeTask(SSchTask* pTask) {
if (pTask->candidateAddrs) {
taosArrayDestroy(pTask->candidateAddrs);
}
// TODO NEED TO VERFY WITH ASYNC_SEND MEMORY FREE
//tfree(pTask->msg);
if (pTask->children) {
taosArrayDestroy(pTask->children);
}
if (pTask->parents) {
taosArrayDestroy(pTask->parents);
}
}
int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
int32_t lastMsgType = atomic_load_32(&pTask->lastMsgType);
switch (msgType) {
case TDMT_VND_CREATE_TABLE_RSP:
case TDMT_VND_SUBMIT_RSP:
case TDMT_VND_QUERY_RSP:
case TDMT_VND_RES_READY_RSP:
case TDMT_VND_FETCH_RSP:
case TDMT_VND_DROP_TASK:
if (lastMsgType != (msgType - 1)) {
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%d, rspType:%d", lastMsgType, msgType);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%d", SCH_GET_TASK_STATUS(pTask), msgType);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
break;
default:
SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%d", msgType, SCH_GET_TASK_STATUS(pTask));
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
return TSDB_CODE_SUCCESS;
}
int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
int32_t code = 0; int32_t code = 0;
int8_t oriStatus = SCH_GET_JOB_STATUS(pJob);
/* /*
if (oriStatus == newStatus) { if (oriStatus == newStatus) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
...@@ -80,6 +146,10 @@ int32_t schValidateStatus(SSchJob *pJob, int8_t oriStatus, int8_t newStatus) { ...@@ -80,6 +146,10 @@ int32_t schValidateStatus(SSchJob *pJob, int8_t oriStatus, int8_t newStatus) {
} }
*/ */
SCH_SET_JOB_STATUS(pJob, newStatus);
SCH_JOB_DLOG("status updated from %d to %d", oriStatus, newStatus);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
...@@ -137,6 +207,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { ...@@ -137,6 +207,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum); SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
} else {
if (0 != pLevel->level) {
SCH_TASK_ELOG("invalid task info, level:%d, parentNum:%d", pLevel->level, parentNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
} }
for (int32_t n = 0; n < parentNum; ++n) { for (int32_t n = 0; n < parentNum; ++n) {
...@@ -166,19 +241,28 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { ...@@ -166,19 +241,28 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) {
pTask->plan = pPlan; int32_t schRecordTaskSucceedNode(SSchTask *pTask) {
pTask->level = pLevel; SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
pTask->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); assert(NULL != addr);
pTask->succeedAddr = *addr;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void schFreeTask(SSchTask* pTask) {
taosArrayDestroy(pTask->candidateAddrs); int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr) {
if (NULL == taosArrayPush(pTask->execAddrs, addr)) {
SCH_TASK_ELOG("taosArrayPush addr to execAddr list failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
return TSDB_CODE_SUCCESS;
} }
int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) { int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) {
int32_t code = 0; int32_t code = 0;
...@@ -257,7 +341,7 @@ int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) { ...@@ -257,7 +341,7 @@ int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) {
SSchTask task = {0}; SSchTask task = {0};
SSchTask *pTask = &task; SSchTask *pTask = &task;
schInitTask(pJob, &task, plan, pLevel); SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel));
void *p = taosArrayPush(pLevel->subTasks, &task); void *p = taosArrayPush(pLevel->subTasks, &task);
if (NULL == p) { if (NULL == p) {
...@@ -346,46 +430,67 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { ...@@ -346,46 +430,67 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES); int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
if (0 != code) { if (0 != code) {
if (HASH_NODE_EXIST(code)) { if (HASH_NODE_EXIST(code)) {
SCH_TASK_ELOG("task already in exec list, code:%x", code); SCH_TASK_ELOG("task already in execTask list, code:%x", code);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
SCH_TASK_ELOG("taosHashPut task to exec list failed, errno:%d", errno); SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SCH_TASK_DLOG("task added to exec list, numOfTasks:%d", taosHashGetSize(pJob->execTasks)); SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
qError("remove task taskId:0x%"PRIx64" from execTasks failed, reqId:0x%"PRIx64, task->taskId, job->queryId); SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask));
return TSDB_CODE_SUCCESS;
} }
if (0 != taosHashPut(job->succTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
qError("taosHashPut failed"); if (0 != code) {
if (HASH_NODE_EXIST(code)) {
*moved = true;
SCH_TASK_ELOG("task already in succTask list, status:%d", SCH_GET_TASK_STATUS(pTask));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
*moved = true; *moved = true;
SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { *moved = false;
qWarn("remove task[%"PRIx64"] from execTasks failed, it may not exist", task->taskId);
if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask));
} }
if (0 != taosHashPut(job->failTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
qError("taosHashPut failed"); if (0 != code) {
if (HASH_NODE_EXIST(code)) {
*moved = true;
SCH_TASK_WLOG("task already in failTask list, status:%d", SCH_GET_TASK_STATUS(pTask));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
*moved = true; *moved = true;
SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -393,6 +498,8 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { ...@@ -393,6 +498,8 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) {
int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) {
// TODO set retry or not based on task type/errCode/retry times/job status/available eps... // TODO set retry or not based on task type/errCode/retry times/job status/available eps...
// TODO if needRetry, set task retry info // TODO if needRetry, set task retry info
// TODO set condidateIdx
// TODO record failed but tried task
*needRetry = false; *needRetry = false;
...@@ -400,60 +507,70 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b ...@@ -400,60 +507,70 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b
} }
int32_t schFetchFromRemote(SSchJob *job) { int32_t schFetchFromRemote(SSchJob *pJob) {
int32_t code = 0; int32_t code = 0;
if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) { if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) {
qInfo("prior fetching not finished"); SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SCH_ERR_JRET(schBuildAndSendMsg(job, job->fetchTask, TDMT_VND_FETCH)); if (atomic_load_ptr(&pJob->res))
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);
return code; return code;
} }
int32_t schProcessOnJobPartialSuccess(SSchJob *job) { // Note: no more error processing, handled in function internal
job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED; int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_FAILED));
bool needFetch = job->userFetch;
if ((!SCH_JOB_NEED_FETCH(&job->attr)) && job->attr.syncSchedule) {
tsem_post(&job->rspSem);
}
if (needFetch) { if (errCode) {
SCH_ERR_RET(schFetchFromRemote(job)); atomic_store_32(&pJob->errCode, errCode);
} }
return TSDB_CODE_SUCCESS; if (atomic_load_8(&pJob->userFetch) || ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule)) {
tsem_post(&pJob->rspSem);
}
SCH_ERR_RET(atomic_load_32(&pJob->errCode));
assert(0);
} }
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
int8_t status = SCH_GET_JOB_STATUS(pJob);
if (schValidateStatus(pJob, status, JOB_TASK_STATUS_FAILED)) { // Note: no more error processing, handled in function internal
SCH_ERR_RET(atomic_load_32(&pJob->errCode)); int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
} int32_t code = 0;
SCH_SET_JOB_STATUS(pJob, JOB_TASK_STATUS_FAILED);
atomic_store_32(&pJob->errCode, errCode); SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_PARTIAL_SUCCEED));
atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);
if (pJob->userFetch || ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule)) { if ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule) {
tsem_post(&pJob->rspSem); tsem_post(&pJob->rspSem);
} }
if (atomic_load_8(&pJob->userFetch)) {
SCH_ERR_JRET(schFetchFromRemote(pJob));
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return:
SCH_ERR_RET(schProcessOnJobFailure(pJob, code));
SCH_RET(code);
} }
int32_t schProcessOnDataFetched(SSchJob *job) { int32_t schProcessOnDataFetched(SSchJob *job) {
atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
...@@ -461,25 +578,77 @@ int32_t schProcessOnDataFetched(SSchJob *job) { ...@@ -461,25 +578,77 @@ int32_t schProcessOnDataFetched(SSchJob *job) {
} }
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
// Note: no more error processing, handled in function internal
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
bool needRetry = false;
bool moved = false; bool moved = false;
int32_t taskDone = 0;
int32_t code = 0; int32_t code = 0;
SCH_TASK_DLOG("taskOnFailure, code:%x", errCode);
SCH_ERR_RET(schMoveTaskToSuccList(pJob, pTask, &moved)); SCH_ERR_JRET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry));
if (!moved) {
SCH_TASK_ELOG(" task may already moved, status:%d", pTask->status); if (!needRetry) {
SCH_TASK_ELOG("task failed and no more retry, code:%x", errCode);
if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
code = schMoveTaskToFailList(pJob, pTask, &moved);
if (code && moved) {
SCH_ERR_RET(errCode);
}
}
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED);
if (SCH_TASK_NEED_WAIT_ALL(pTask)) {
SCH_LOCK(SCH_WRITE, &pTask->level->lock);
pTask->level->taskFailed++;
taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
atomic_store_32(&pJob->errCode, errCode);
if (taskDone < pTask->level->taskNum) {
SCH_TASK_DLOG("not all tasks done, done:%d, all:%d", taskDone, pTask->level->taskNum);
SCH_ERR_RET(errCode);
}
}
} else {
// Note: no more error processing, already handled
SCH_ERR_RET(schLaunchTask(pJob, pTask));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
_return:
SCH_ERR_RET(schProcessOnJobFailure(pJob, errCode));
SCH_ERR_RET(errCode);
}
// Note: no more error processing, handled in function internal
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
bool moved = false;
int32_t code = 0;
SSchTask *pErrTask = pTask;
SCH_TASK_DLOG("taskOnSuccess, status:%d", SCH_GET_TASK_STATUS(pTask));
code = schMoveTaskToSuccList(pJob, pTask, &moved);
if (code && moved) {
SCH_ERR_RET(code);
}
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
SCH_ERR_JRET(schRecordTaskSucceedNode(pTask));
int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0; int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
if (parentNum == 0) { if (parentNum == 0) {
if (pTask->level->level != 0) {
SCH_TASK_ELOG("no parent task level error, level:%d", pTask->level->level);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t taskDone = 0; int32_t taskDone = 0;
if (SCH_TASK_NEED_WAIT_ALL(pTask)) { if (SCH_TASK_NEED_WAIT_ALL(pTask)) {
...@@ -490,22 +659,23 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { ...@@ -490,22 +659,23 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
if (taskDone < pTask->level->taskNum) { if (taskDone < pTask->level->taskNum) {
SCH_TASK_ELOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum); SCH_TASK_ELOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (taskDone > pTask->level->taskNum) { } else if (taskDone > pTask->level->taskNum) {
assert(0); assert(0);
} }
if (pTask->level->taskFailed > 0) { if (pTask->level->taskFailed > 0) {
pJob->status = JOB_TASK_STATUS_FAILED; SCH_RET(schProcessOnJobFailure(pJob, 0));
SCH_ERR_RET(schProcessOnJobFailure(pJob, TSDB_CODE_QRY_APP_ERROR)); } else {
SCH_RET(schProcessOnJobPartialSuccess(pJob));
return TSDB_CODE_SUCCESS;
} }
} else { } else {
pJob->resNode = pTask->execAddr; pJob->resNode = pTask->succeedAddr;
} }
pJob->fetchTask = pTask; pJob->fetchTask = pTask;
SCH_ERR_RET(schProcessOnJobPartialSuccess(pJob)); SCH_ERR_RET(schProcessOnJobPartialSuccess(pJob));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -522,14 +692,18 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { ...@@ -522,14 +692,18 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
for (int32_t i = 0; i < parentNum; ++i) { for (int32_t i = 0; i < parentNum; ++i) {
SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i); SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i);
pErrTask = par;
atomic_add_fetch_32(&par->childReady, 1); atomic_add_fetch_32(&par->childReady, 1);
code = qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &pTask->execAddr); SCH_LOCK(SCH_WRITE, &par->lock);
code = qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &pTask->succeedAddr);
if (code) { if (code) {
SCH_UNLOCK(SCH_WRITE, &par->lock);
SCH_TASK_ELOG("qSetSubplanExecutionNode failed, code:%x, templateId:%"PRIx64, code, pTask->plan->id.templateId); SCH_TASK_ELOG("qSetSubplanExecutionNode failed, code:%x, templateId:%"PRIx64, code, pTask->plan->id.templateId);
SCH_ERR_RET(code); SCH_ERR_JRET(code);
} }
SCH_UNLOCK(SCH_WRITE, &par->lock);
if (SCH_TASK_READY_TO_LUNCH(par)) { if (SCH_TASK_READY_TO_LUNCH(par)) {
SCH_ERR_RET(schLaunchTask(pJob, par)); SCH_ERR_RET(schLaunchTask(pJob, par));
...@@ -537,158 +711,142 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { ...@@ -537,158 +711,142 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
}
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
bool needRetry = false;
bool moved = false;
int32_t taskDone = 0;
SCH_ERR_RET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry));
if (!needRetry) {
SCH_TASK_ELOG("task failed and no more retry, code:%x", errCode);
if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
SCH_ERR_RET(schMoveTaskToFailList(pJob, pTask, &moved));
if (!moved) {
SCH_TASK_ELOG("task may already moved, status:%d", pTask->status);
}
}
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED);
if (SCH_TASK_NEED_WAIT_ALL(pTask)) {
SCH_LOCK(SCH_WRITE, &pTask->level->lock);
pTask->level->taskFailed++;
taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
if (taskDone < pTask->level->taskNum) {
qDebug("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
return TSDB_CODE_SUCCESS;
}
}
SCH_ERR_RET(schProcessOnJobFailure(pJob, errCode));
return errCode; _return:
}
SCH_ERR_RET(schLaunchTask(pJob, pTask)); SCH_ERR_RET(schProcessOnTaskFailure(pJob, pErrTask, code));
return TSDB_CODE_SUCCESS; SCH_ERR_RET(code);
} }
int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCH_ERR_JRET(schValidateTaskReceivedMsgType(pJob, pTask, msgType));
switch (msgType) { switch (msgType) {
case TDMT_VND_CREATE_TABLE_RSP: { case TDMT_VND_CREATE_TABLE_RSP: {
if (rspCode != TSDB_CODE_SUCCESS) { if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
} else {
code = schProcessOnTaskSuccess(job, task);
if (code) {
goto _task_error;
} }
}
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break; break;
} }
case TDMT_VND_SUBMIT_RSP: { case TDMT_VND_SUBMIT_RSP: {
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg) { #if 0 //TODO OPEN THIS
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
} else {
SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg; if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) {
job->resNumOfRows += rsp->affectedRows; SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
}
code = schProcessOnTaskSuccess(job, task);
if (code) { pJob->resNumOfRows += rsp->affectedRows;
goto _task_error; #else
} if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
} }
#endif
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break; break;
} }
case TDMT_VND_QUERY_RSP: { case TDMT_VND_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)msg; SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
if (rsp->code != TSDB_CODE_SUCCESS || NULL == msg) { if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rsp->code));
} else {
code = schBuildAndSendMsg(job, task, TDMT_VND_RES_READY);
if (code) {
goto _task_error;
}
} }
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY));
break; break;
} }
case TDMT_VND_RES_READY_RSP: { case TDMT_VND_RES_READY_RSP: {
SResReadyRsp *rsp = (SResReadyRsp *)msg; SResReadyRsp *rsp = (SResReadyRsp *)msg;
if (rsp->code != TSDB_CODE_SUCCESS || NULL == msg) { if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rsp->code));
} else {
code = schProcessOnTaskSuccess(job, task);
if (code) {
goto _task_error;
}
} }
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break; break;
} }
case TDMT_VND_FETCH_RSP: { case TDMT_VND_FETCH_RSP: {
SCH_ERR_JRET(rspCode);
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
job->res = rsp; if (rspCode != TSDB_CODE_SUCCESS || NULL == msg) {
if (rsp) { SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
job->resNumOfRows = rsp->numOfRows;
} }
atomic_store_ptr(&pJob->res, rsp);
atomic_store_32(&pJob->resNumOfRows, rsp->numOfRows);
SCH_ERR_JRET(schProcessOnDataFetched(pJob));
SCH_ERR_JRET(schProcessOnDataFetched(job));
break; break;
} }
case TDMT_VND_DROP_TASK: { case TDMT_VND_DROP_TASK: {
// SHOULD NEVER REACH HERE
assert(0);
break;
} }
default: default:
qError("unknown msg type:%d received", msgType); SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%d", msgType, SCH_GET_TASK_STATUS(pTask));
return TSDB_CODE_QRY_INVALID_INPUT;
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_task_error:
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, code));
return TSDB_CODE_SUCCESS;
_return: _return:
code = schProcessOnJobFailure(job, code);
return code; SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, code));
SCH_RET(code);
} }
int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) { int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SSchCallbackParam *pParam = (SSchCallbackParam *)param; SSchCallbackParam *pParam = (SSchCallbackParam *)param;
SSchJob *pJob = NULL;
SSchTask *pTask = NULL;
SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId));
if (NULL == job || NULL == (*job)) { if (NULL == job || NULL == (*job)) {
qError("taosHashGet queryId:%"PRIx64" not exist", pParam->queryId); qError("QID:%"PRIx64" taosHashGet queryId not exist", pParam->queryId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
int32_t s = taosHashGetSize((*job)->execTasks); pJob = *job;
atomic_add_fetch_32(&pJob->ref, 1);
int32_t s = taosHashGetSize(pJob->execTasks);
assert(s != 0); assert(s != 0);
SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId)); SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId));
if (NULL == task || NULL == (*task)) { if (NULL == task || NULL == (*task)) {
qError("failed to get task, taskId:%"PRIx64" not exist, reqId:0x%"PRIx64, pParam->taskId, (*job)->queryId); qError("QID:%"PRIx64",TID:%"PRIx64" taosHashGet taskId not exist", pParam->queryId, pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
pTask = *task;
SCH_TASK_DLOG("rsp msg received, type:%d, code:%x", msgType, rspCode);
schProcessRspMsg(*job, *task, msgType, pMsg->pData, pMsg->len, rspCode); SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
_return: _return:
if (pJob) {
atomic_sub_fetch_32(&pJob->ref, 1);
}
tfree(param); tfree(param);
SCH_RET(code); SCH_RET(code);
} }
...@@ -715,7 +873,7 @@ int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) ...@@ -715,7 +873,7 @@ int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code)
int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) { int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SSchCallbackParam *pParam = (SSchCallbackParam *)param; SSchCallbackParam *pParam = (SSchCallbackParam *)param;
qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code); qDebug("QID:%"PRIx64",TID:%"PRIx64" drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code);
} }
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
...@@ -739,7 +897,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { ...@@ -739,7 +897,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
*fp = schHandleDropCallback; *fp = schHandleDropCallback;
break; break;
default: default:
qError("unknown msg type:%d", msgType); qError("unknown msg type for callback, msgType:%d", msgType);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -771,12 +929,15 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t ...@@ -771,12 +929,15 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t
pMsgSendInfo->msgInfo.pData = msg; pMsgSendInfo->msgInfo.pData = msg;
pMsgSendInfo->msgInfo.len = msgSize; pMsgSendInfo->msgInfo.len = msgSize;
pMsgSendInfo->msgType = msgType; pMsgSendInfo->msgType = msgType;
pMsgSendInfo->fp = fp; pMsgSendInfo->fp = fp;
int64_t transporterId = 0; int64_t transporterId = 0;
SCH_ERR_JRET(asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo)); code = asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo);
if (code) {
qError("QID:%"PRIx64 ",TID:%"PRIx64 " asyncSendMsgToServer failed, code:%x", qId, tId, code);
SCH_ERR_JRET(code);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -798,15 +959,19 @@ void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { ...@@ -798,15 +959,19 @@ void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) {
} }
} }
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
uint32_t msgSize = 0; uint32_t msgSize = 0;
void *msg = NULL; void *msg = NULL;
int32_t code = 0; int32_t code = 0;
bool isCandidateAddr = false;
SEpSet epSet; SEpSet epSet;
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); if (NULL == addr) {
addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx));
isCandidateAddr = true;
}
schConvertAddrToEpSet(addr, &epSet); schConvertAddrToEpSet(addr, &epSet);
switch (msgType) { switch (msgType) {
...@@ -887,18 +1052,25 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { ...@@ -887,18 +1052,25 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
break; break;
} }
default: default:
SCH_TASK_ELOG("unknown msg type:%d", msgType); SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
break; break;
} }
atomic_store_32(&pTask->lastMsgType, msgType);
SCH_ERR_JRET(schAsyncSendMsg(pJob->transport, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize)); SCH_ERR_JRET(schAsyncSendMsg(pJob->transport, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize));
if (isCandidateAddr) {
SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr));
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
atomic_store_32(&pTask->lastMsgType, -1);
tfree(msg); tfree(msg);
SCH_RET(code); SCH_RET(code);
} }
...@@ -913,13 +1085,19 @@ static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { ...@@ -913,13 +1085,19 @@ static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
|| status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING); || status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING);
} }
// Note: no more error processing, handled in function internal
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
int8_t status = 0; int8_t status = 0;
int32_t code = 0; int32_t code = 0;
if (schJobNeedToStop(pJob, &status)) { if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("no need to launch task cause of job status, job status:%d", status); SCH_TASK_ELOG("no need to launch task cause of job status, job status:%d", status);
SCH_ERR_RET(atomic_load_32(&pJob->errCode));
code = atomic_load_32(&pJob->errCode);
SCH_ERR_RET(code);
SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
SSubplan *plan = pTask->plan; SSubplan *plan = pTask->plan;
...@@ -941,56 +1119,68 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { ...@@ -941,56 +1119,68 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING);
} }
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, plan->msgType)); SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
code = schProcessOnTaskFailure(pJob, pTask, code); SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, code));
SCH_RET(code); SCH_RET(code);
} }
int32_t schLaunchJob(SSchJob *pJob) { int32_t schLaunchJob(SSchJob *pJob) {
SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx); SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_EXECUTING));
for (int32_t i = 0; i < level->taskNum; ++i) { for (int32_t i = 0; i < level->taskNum; ++i) {
SSchTask *pTask = taosArrayGet(level->subTasks, i); SSchTask *pTask = taosArrayGet(level->subTasks, i);
SCH_ERR_RET(schLaunchTask(pJob, pTask)); SCH_ERR_RET(schLaunchTask(pJob, pTask));
} }
pJob->status = JOB_TASK_STATUS_EXECUTING;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void schDropJobAllTasks(SSchJob *job) { void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) {
void *pIter = taosHashIterate(job->succTasks, NULL); if (NULL == pTask->execAddrs) {
while (pIter) { SCH_TASK_DLOG("no exec address, status:%d", SCH_GET_TASK_STATUS(pTask));
SSchTask *task = *(SSchTask **)pIter; return;
}
int32_t msgType = task->plan->msgType; int32_t size = (int32_t)taosArrayGetSize(pTask->execAddrs);
if (msgType == TDMT_VND_CREATE_TABLE || msgType == TDMT_VND_SUBMIT) {
break; if (size <= 0) {
} SCH_TASK_DLOG("empty exec address, status:%d", SCH_GET_TASK_STATUS(pTask));
return;
}
schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK); SQueryNodeAddr *addr = NULL;
pIter = taosHashIterate(job->succTasks, pIter); for (int32_t i = 0; i < size; ++i) {
} addr = (SQueryNodeAddr *)taosArrayGet(pTask->execAddrs, i);
pIter = taosHashIterate(job->failTasks, NULL); schBuildAndSendMsg(pJob, pTask, addr, TDMT_VND_DROP_TASK);
}
}
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
void *pIter = taosHashIterate(list, NULL);
while (pIter) { while (pIter) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *pTask = *(SSchTask **)pIter;
int32_t msgType = task->plan->msgType; if (!SCH_TASK_NO_NEED_DROP(pTask)) {
if (msgType == TDMT_VND_CREATE_TABLE || msgType == TDMT_VND_SUBMIT) { schDropTaskOnExecutedNode(pJob, pTask);
break;
} }
pIter = taosHashIterate(list, pIter);
}
}
schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK); void schDropJobAllTasks(SSchJob *pJob) {
pIter = taosHashIterate(job->succTasks, pIter); schDropTaskInHashList(pJob, pJob->execTasks);
} schDropTaskInHashList(pJob, pJob->succTasks);
schDropTaskInHashList(pJob, pJob->failTasks);
} }
int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** job, bool syncSchedule) { int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** job, bool syncSchedule) {
...@@ -1066,6 +1256,13 @@ _return: ...@@ -1066,6 +1256,13 @@ _return:
SCH_RET(code); SCH_RET(code);
} }
int32_t schCancelJob(SSchJob *pJob) {
//TODO
//TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
}
int32_t schedulerInit(SSchedulerCfg *cfg) { int32_t schedulerInit(SSchedulerCfg *cfg) {
if (schMgmt.jobs) { if (schMgmt.jobs) {
...@@ -1104,10 +1301,12 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void ...@@ -1104,10 +1301,12 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, true)); SSchJob *job = NULL;
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, (void **)&job, true));
*pJob = job;
SSchJob *job = *(SSchJob **)pJob;
pRes->code = atomic_load_32(&job->errCode); pRes->code = atomic_load_32(&job->errCode);
pRes->numOfRows = job->resNumOfRows; pRes->numOfRows = job->resNumOfRows;
...@@ -1119,93 +1318,129 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, ...@@ -1119,93 +1318,129 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
return schExecJobImpl(transport, nodeList, pDag, pJob, false); SSchJob *job = NULL;
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, (void **)&job, false));
*pJob = job;
return TSDB_CODE_SUCCESS;
} }
int32_t scheduleFetchRows(void *pJob, void **data) { int32_t scheduleFetchRows(void *job, void **data) {
if (NULL == pJob || NULL == data) { if (NULL == job || NULL == data) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SSchJob *job = pJob; SSchJob *pJob = job;
int32_t code = 0; int32_t code = 0;
if (!SCH_JOB_NEED_FETCH(&job->attr)) { atomic_add_fetch_32(&pJob->ref, 1);
qError("no need to fetch data");
if (!SCH_JOB_NEED_FETCH(&pJob->attr)) {
SCH_JOB_ELOG("no need to fetch data, status:%d", SCH_GET_JOB_STATUS(pJob));
atomic_sub_fetch_32(&pJob->ref, 1);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
} }
if (job->status == JOB_TASK_STATUS_FAILED) { if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) {
job->res = NULL; SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch));
SCH_RET(atomic_load_32(&job->errCode)); atomic_sub_fetch_32(&pJob->ref, 1);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
} }
if (job->status == JOB_TASK_STATUS_SUCCEED) { int8_t status = SCH_GET_JOB_STATUS(pJob);
job->res = NULL;
return TSDB_CODE_SUCCESS;
}
if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) { if (status == JOB_TASK_STATUS_FAILED) {
qError("prior fetching not finished"); *data = atomic_load_ptr(&pJob->res);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); atomic_store_ptr(&pJob->res, NULL);
SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
} else if (status == JOB_TASK_STATUS_SUCCEED) {
*data = atomic_load_ptr(&pJob->res);
atomic_store_ptr(&pJob->res, NULL);
goto _return;
} else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_ERR_JRET(schFetchFromRemote(pJob));
} }
if (job->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { tsem_wait(&pJob->rspSem);
SCH_ERR_JRET(schFetchFromRemote(job));
}
tsem_wait(&job->rspSem); status = SCH_GET_JOB_STATUS(pJob);
if (job->status == JOB_TASK_STATUS_FAILED) { if (status == JOB_TASK_STATUS_FAILED) {
code = atomic_load_32(&job->errCode); code = atomic_load_32(&pJob->errCode);
} }
if (job->res && ((SRetrieveTableRsp *)job->res)->completed) { if (pJob->res && ((SRetrieveTableRsp *)pJob->res)->completed) {
job->status = JOB_TASK_STATUS_SUCCEED; SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED));
} }
*data = job->res; while (true) {
job->res = NULL; *data = atomic_load_ptr(&pJob->res);
if (*data != atomic_val_compare_exchange_ptr(&pJob->res, *data, NULL)) {
continue;
}
break;
}
_return: _return:
atomic_val_compare_exchange_32(&job->userFetch, 1, 0);
atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
atomic_sub_fetch_32(&pJob->ref, 1);
SCH_RET(code); SCH_RET(code);
} }
int32_t scheduleCancelJob(void *pJob) { int32_t scheduleCancelJob(void *job) {
//TODO SSchJob *pJob = (SSchJob *)job;
//TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST atomic_add_fetch_32(&pJob->ref, 1);
return TSDB_CODE_SUCCESS; int32_t code = schCancelJob(pJob);
atomic_sub_fetch_32(&pJob->ref, 1);
SCH_RET(code);
} }
void scheduleFreeJob(void *pJob) { void scheduleFreeJob(void *job) {
if (NULL == pJob) { if (NULL == job) {
return; return;
} }
SSchJob *job = pJob; SSchJob *pJob = job;
if (job->status > 0) { if (0 != taosHashRemove(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId))) {
if (0 != taosHashRemove(schMgmt.jobs, &job->queryId, sizeof(job->queryId))) { SCH_JOB_ELOG("taosHashRemove job from list failed, may already freed, pJob:%p", pJob);
qError("remove job:%"PRIx64"from mgmt failed", job->queryId); // maybe already freed return;
return; }
}
if (job->status == JOB_TASK_STATUS_EXECUTING) { while (true) {
scheduleCancelJob(pJob); int32_t ref = atomic_load_32(&pJob->ref);
if (0 == ref) {
break;
} else if (ref > 0) {
usleep(1);
} else {
assert(0);
} }
}
schDropJobAllTasks(job); if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
schCancelJob(pJob);
} }
job->subPlans = NULL; // it is a reference to pDag->pSubplans schDropJobAllTasks(pJob);
int32_t numOfLevels = taosArrayGetSize(job->levels);
pJob->subPlans = NULL; // it is a reference to pDag->pSubplans
int32_t numOfLevels = taosArrayGetSize(pJob->levels);
for(int32_t i = 0; i < numOfLevels; ++i) { for(int32_t i = 0; i < numOfLevels; ++i) {
SSchLevel *pLevel = taosArrayGet(job->levels, i); SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
for(int32_t j = 0; j < numOfTasks; ++j) { for(int32_t j = 0; j < numOfTasks; ++j) {
...@@ -1216,12 +1451,15 @@ void scheduleFreeJob(void *pJob) { ...@@ -1216,12 +1451,15 @@ void scheduleFreeJob(void *pJob) {
taosArrayDestroy(pLevel->subTasks); taosArrayDestroy(pLevel->subTasks);
} }
taosHashCleanup(job->execTasks); taosHashCleanup(pJob->execTasks);
taosHashCleanup(job->failTasks); taosHashCleanup(pJob->failTasks);
taosHashCleanup(job->succTasks); taosHashCleanup(pJob->succTasks);
taosArrayDestroy(job->levels);
taosArrayDestroy(pJob->levels);
tfree(pJob->res);
tfree(job); tfree(pJob);
} }
void schedulerDestroy(void) { void schedulerDestroy(void) {
......
...@@ -36,7 +36,7 @@ ...@@ -36,7 +36,7 @@
namespace { namespace {
extern "C" int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
void schtBuildQueryDag(SQueryDag *dag) { void schtBuildQueryDag(SQueryDag *dag) {
uint64_t qId = 0x0000000000000001; uint64_t qId = 0x0000000000000001;
...@@ -188,7 +188,7 @@ void *schtSendRsp(void *param) { ...@@ -188,7 +188,7 @@ void *schtSendRsp(void *param) {
SShellSubmitRspMsg rsp = {0}; SShellSubmitRspMsg rsp = {0};
rsp.affectedRows = 10; rsp.affectedRows = 10;
schProcessRspMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0); schHandleResponseMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0);
pIter = taosHashIterate(job->execTasks, pIter); pIter = taosHashIterate(job->execTasks, pIter);
} }
...@@ -233,7 +233,7 @@ TEST(queryTest, normalCase) { ...@@ -233,7 +233,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0}; SQueryTableRsp rsp = {0};
code = schProcessRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter); pIter = taosHashIterate(job->execTasks, pIter);
...@@ -244,7 +244,7 @@ TEST(queryTest, normalCase) { ...@@ -244,7 +244,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0}; SResReadyRsp rsp = {0};
code = schProcessRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter); pIter = taosHashIterate(job->execTasks, pIter);
...@@ -255,7 +255,7 @@ TEST(queryTest, normalCase) { ...@@ -255,7 +255,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0}; SQueryTableRsp rsp = {0};
code = schProcessRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter); pIter = taosHashIterate(job->execTasks, pIter);
...@@ -266,7 +266,7 @@ TEST(queryTest, normalCase) { ...@@ -266,7 +266,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0}; SResReadyRsp rsp = {0};
code = schProcessRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter); pIter = taosHashIterate(job->execTasks, pIter);
...@@ -275,7 +275,7 @@ TEST(queryTest, normalCase) { ...@@ -275,7 +275,7 @@ TEST(queryTest, normalCase) {
SRetrieveTableRsp rsp = {0}; SRetrieveTableRsp rsp = {0};
rsp.completed = 1; rsp.completed = 1;
rsp.numOfRows = 10; rsp.numOfRows = 10;
code = schProcessRspMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0); code = schHandleResponseMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册