提交 b5207239 编写于 作者: D dapan1121

enh: refactor scheduler code

上级 3ffd9759
......@@ -82,10 +82,11 @@ typedef struct SSchedulerReq {
const char *sql;
int64_t startTs;
schedulerExecFp execFp;
void* execParam;
schedulerFetchFp fetchFp;
void* cbParam;
schedulerChkKillFp chkKillFp;
void* chkKillParam;
SQueryResult* pQueryRes;
SExecResult* pExecRes;
char** pFetchRes;
} SSchedulerReq;
......
......@@ -388,10 +388,10 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719)
#define TSDB_CODE_QRY_JOB_FREED TAOS_DEF_ERROR_CODE(0, 0x071A)
#define TSDB_CODE_QRY_TASK_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x071B)
//json
#define TSDB_CODE_QRY_JSON_IN_ERROR TAOS_DEF_ERROR_CODE(0, 0x071C)
#define TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR TAOS_DEF_ERROR_CODE(0, 0x071D)
#define TSDB_CODE_QRY_JSON_IN_GROUP_ERROR TAOS_DEF_ERROR_CODE(0, 0x071E)
#define TSDB_CODE_QRY_JOB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x071F)
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800)
......
......@@ -1368,9 +1368,9 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
SSchedulerReq req = {
.syncReq = true,
.
.pFetchRes = &pResInfo->pData,
};
pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
return NULL;
......
......@@ -863,7 +863,12 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
}
}
schedulerFetchRowsA(pRequest->body.queryJob, fetchCallback, pRequest);
SSchedulerReq req = {
.syncReq = false,
.fetchFp = fetchCallback,
.execParam = pRequest,
};
schedulerFetchRows(pRequest->body.queryJob, &req);
}
void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
......
......@@ -52,15 +52,9 @@ typedef enum {
SCH_OP_NULL = 0,
SCH_OP_EXEC,
SCH_OP_FETCH,
SCH_OP_GET_STATUS,
} SCH_OP_TYPE;
typedef enum {
SCH_EVENT_BEGIN_OP = 1,
SCH_EVENT_END_OP,
SCH_EVENT_MSG,
SCH_EVENT_DROP,
} SCH_EVENT_TYPE;
typedef struct SSchTrans {
void *pTrans;
void *pHandle;
......@@ -108,7 +102,7 @@ typedef struct SSchResInfo {
void** fetchRes;
schedulerExecFp execFp;
schedulerFetchFp fetchFp;
void* userParam;
void* cbParam;
} SSchResInfo;
typedef struct SSchOpEvent {
......@@ -358,9 +352,10 @@ extern SSchedulerMgmt schMgmt;
#define SCH_TASK_WLOG(param, ...) \
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define SCH_SET_ERRNO(_err) do { if (TSDB_CODE_SCH_IGNORE_ERROR != (_err)) { terrno = (_err); } } while (0)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); } return _code; } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); goto _return; } } while (0)
#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
......
......@@ -51,7 +51,12 @@ _return:
SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode));
}
bool schJobDone(SSchJob *pJob) {
int8_t status = SCH_GET_JOB_STATUS(pJob);
return (status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_DROP ||
status == JOB_TASK_STATUS_SUCC);
}
FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
int8_t status = SCH_GET_JOB_STATUS(pJob);
......@@ -59,13 +64,14 @@ FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
*pStatus = status;
}
if (schJobDone(pJob)) {
return true;
}
if ((*pJob->chkKillFp)(pJob->chkKillParam)) {
schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED);
return true;
}
return (status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_DROP ||
status == JOB_TASK_STATUS_SUCC);
}
}
int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
......@@ -77,10 +83,6 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
oriStatus = SCH_GET_JOB_STATUS(pJob);
if (oriStatus == newStatus) {
if (newStatus == JOB_TASK_STATUS_DROP) {
SCH_ERR_JRET(TSDB_CODE_SCH_JOB_IS_DROPPING);
}
SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
}
......@@ -140,7 +142,11 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
_return:
SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
if (TSDB_CODE_SCH_IGNORE_ERROR == code) {
SCH_JOB_DLOG("ignore job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
} else {
SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
}
SCH_RET(code);
}
......@@ -360,7 +366,7 @@ _return:
}
int32_t schDumpJobExecRes(SSchJob* pJob, SQueryResult* pRes) {
int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes) {
pRes->code = atomic_load_32(&pJob->errCode);
pRes->numOfRows = pJob->resNumOfRows;
pRes->res = pJob->execRes;
......@@ -372,7 +378,7 @@ int32_t schDumpJobExecRes(SSchJob* pJob, SQueryResult* pRes) {
int32_t schDumpJobFetchRes(SSchJob* pJob, void** pData) {
int32_t code = 0;
if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) {
SCH_ERR_RET(schUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCC));
SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_SUCC, NULL));
}
while (true) {
......@@ -451,9 +457,6 @@ void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
}
int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) {
// if already FAILED, no more processing
SCH_ERR_RET(schUpdateJobStatus(pJob, status));
schUpdateJobErrCode(pJob, errCode);
int32_t code = atomic_load_32(&pJob->errCode);
......@@ -463,13 +466,17 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod
schPostJobRes(pJob, 0);
SCH_RET(code);
SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
// Note: no more task error processing, handled in function internal
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
return TSDB_CODE_SCH_IGNORE_ERROR;
}
schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAIL, errCode);
return TSDB_CODE_SUCCESS;
return TSDB_CODE_SCH_IGNORE_ERROR;
}
// Note: no more error processing, handled in function internal
......@@ -477,19 +484,10 @@ int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) {
SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_DROP, errCode));
}
// Note: no more task error processing, handled in function internal
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
int32_t code = 0;
SCH_ERR_RET(schUpdateJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC));
schPostJobRes(pJob, SCH_OP_EXEC);
return TSDB_CODE_SUCCESS;
_return:
SCH_RET(schProcessOnJobFailure(pJob, code));
}
void schProcessOnDataFetched(SSchJob *pJob) {
......@@ -570,7 +568,7 @@ int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
int32_t schLaunchJob(SSchJob *pJob) {
if (EXPLAIN_MODE_STATIC == pJob->attr.explainMode) {
SCH_ERR_RET(qExecStaticExplain(pJob->pDag, (SRetrieveTableRsp **)&pJob->resData));
SCH_ERR_RET(schJobStatusEnter(&pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
} else {
SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
SCH_ERR_RET(schLaunchLevelTasks(pJob, level));
......@@ -586,12 +584,6 @@ void schDropJobAllTasks(SSchJob *pJob) {
// schDropTaskInHashList(pJob, pJob->failTasks);
}
int32_t schCancelJob(SSchJob *pJob) {
// TODO
return TSDB_CODE_SUCCESS;
// TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
}
void schFreeJobImpl(void *job) {
if (NULL == job) {
return;
......@@ -603,10 +595,6 @@ void schFreeJobImpl(void *job) {
qDebug("QID:0x%" PRIx64 " begin to free sch job, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
if (pJob->status == JOB_TASK_STATUS_EXEC) {
schCancelJob(pJob);
}
schDropJobAllTasks(pJob);
int32_t numOfLevels = taosArrayGetSize(pJob->levels);
......@@ -655,34 +643,21 @@ int32_t schJobFetchRows(SSchJob *pJob) {
int32_t code = 0;
if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
SCH_ERR_JRET(schLaunchFetchTask(pJob));
tsem_wait(&pJob->rspSem);
}
SCH_ERR_JRET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
_return:
schEndOperation(pJob);
SCH_RET(code);
}
int32_t schJobFetchRowsA(SSchJob *pJob) {
int32_t code = 0;
if (pJob->attr.explainMode == EXPLAIN_MODE_STATIC) {
SCH_ERR_RET(schLaunchFetchTask(pJob));
if (pJob->opStatus.syncReq) {
SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
tsem_wait(&pJob->rspSem);
schPostJobRes(pJob, SCH_OP_FETCH);
}
} else {
schPostJobRes(pJob, SCH_OP_FETCH);
return TSDB_CODE_SUCCESS;
}
SCH_ERR_RET(schLaunchFetchTask(pJob));
return TSDB_CODE_SUCCESS;
SCH_RET(code);
}
int32_t schInitJob(SSchJob **pSchJob, SSchedulerReq *pReq) {
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
int32_t code = 0;
int64_t refId = -1;
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
......@@ -698,7 +673,7 @@ int32_t schInitJob(SSchJob **pSchJob, SSchedulerReq *pReq) {
pJob->chkKillFp = pReq->chkKillFp;
pJob->chkKillParam = pReq->chkKillParam;
pJob->userRes.execFp = pReq->execFp;
pJob->userRes.userParam = pReq->execParam;
pJob->userRes.cbParam = pReq->cbParam;
pJob->opStatus.op = SCH_OP_EXEC;
pJob->opStatus.syncReq = pReq->syncReq;
......@@ -730,35 +705,28 @@ int32_t schInitJob(SSchJob **pSchJob, SSchedulerReq *pReq) {
tsem_init(&pJob->rspSem, 0, 0);
refId = taosAddRef(schMgmt.jobRef, pJob);
if (refId < 0) {
pJob->refId = taosAddRef(schMgmt.jobRef, pJob);
if (pJob->refId < 0) {
SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
SCH_ERR_JRET(terrno);
}
atomic_add_fetch_32(&schMgmt.jobNum, 1);
if (NULL == schAcquireJob(refId)) {
SCH_JOB_ELOG("schAcquireJob job failed, refId:0x%" PRIx64, refId);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
pJob->refId = refId;
*pJobId = pJob->refId;
SCH_JOB_DLOG("job refId:0x%" PRIx64" created", pJob->refId);
*pSchJob = pJob;
return TSDB_CODE_SUCCESS;
_return:
if (NULL == pJob) {
qDestroyQueryPlan(pReq->pDag);
} else if (refId < 0) {
} else if (pJob->refId < 0) {
schFreeJobImpl(pJob);
} else {
taosRemoveRef(schMgmt.jobRef, refId);
taosRemoveRef(schMgmt.jobRef, pJob->refId);
}
SCH_RET(code);
......@@ -768,7 +736,7 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
int32_t code = 0;
qDebug("QID:0x%" PRIx64 " sch job refId 0x%"PRIx64 " started", pReq->pDag->queryId, pJob->refId);
SCH_ERR_JRET(schLaunchJob(pJob));
SCH_ERR_RET(schLaunchJob(pJob));
if (pReq->syncReq) {
SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
......@@ -778,83 +746,148 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
return TSDB_CODE_SUCCESS;
_return:
SCH_RET(schProcessOnJobFailure(pJob, code));
}
void schProcessOnOpEnd(SSchJob *pJob) {
int32_t op = atomic_load_32(&pJob->opStatus.op);
if (SCH_OP_NULL == op) {
SCH_JOB_DLOG("job already not in any operation, status:%s", jobTaskStatusStr(pJob->status));
return;
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) {
switch (type) {
case SCH_OP_EXEC:
int32_t op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
if (SCH_OP_NULL == op || op != type) {
SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status));
}
if (pReq) {
schDumpJobExecRes(pJob, pReq->pExecRes);
}
break;
case SCH_OP_FETCH:
int32_t op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
if (SCH_OP_NULL == op || op != type) {
SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status));
}
break;
case SCH_OP_GET_STATUS:
errCode = TSDB_CODE_SUCCESS;
break;
default:
break;
}
atomic_store_32(&pJob->opStatus.op, SCH_OP_NULL);
SCH_JOB_DLOG("job end %s operation", schGetOpStr(op));
if (errCode) {
schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, errCode);
}
SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode));
}
int32_t schProcessOnOpBegin(SSchJob* pJob, SSchEvent* pEvent) {
int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq) {
int32_t code = 0;
int8_t status = 0;
SSchOpEvent* pInfo = (SSchOpEvent*)pEvent->info;
SCH_OP_TYPE type, bool sync;
if (schJobNeedToStop(pJob, &status)) {
SCH_JOB_ELOG("abort op %s cause of job need to stop", schGetOpStr(type));
SCH_ERR_JRET(pJob->errCode);
SCH_JOB_ELOG("abort op %s cause of job need to stop, status:%s", schGetOpStr(type), jobTaskStatusStr(status));
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
SCH_ERR_JRET(TSDB_CODE_TSC_APP_ERROR);
}
SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
pJob->opStatus.syncReq = sync;
switch (type) {
case SCH_OP_EXEC:
SCH_ERR_JRET(schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC));
if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
}
SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
pJob->opStatus.syncReq = pReq->syncReq;
break;
case SCH_OP_FETCH:
if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
}
SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
pJob->opStatus.syncReq = pReq->syncReq;
if (!SCH_JOB_NEED_FETCH(pJob)) {
SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (status != JOB_TASK_STATUS_PART_SUCC) {
SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
pJob->userRes.fetchRes = pReq->pFetchRes;
pJob->userRes.fetchFp = pReq->fetchFp;
pJob->userRes.cbParam = pReq->cbParam;
break;
case SCH_OP_GET_STATUS:
if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) {
qDebug("job not initialized or not executable job, refId:0x%" PRIx64, pJob->refId);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
break;
default:
SCH_JOB_ELOG("unknown operation type %d", type);
SCH_ERR_JRET(TSDB_CODE_TSC_APP_ERROR);
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
}
return TSDB_CODE_SUCCESS;
}
_return:
schEndOperation(pJob);
void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
if (pTask) {
SCH_UNLOCK_TASK(pTask);
}
SCH_RET(code);
if (errCode) {
schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, errCode);
}
if (pJob) {
schReleaseJob(pJob->refId);
}
}
int32_t schHandleJobEvent(SSchJob* pJob, SSchEvent* pEvent) {
switch (pEvent->event) {
case SCH_EVENT_BEGIN_OP:
schProcessOnOpBegin(pJob, pEvent);
case SCH_EVENT_END_OP:
schProcessOnOpEnd(pJob);
int32_t schProcessOnCbBegin(SSchJob** job, SSchTask** task, uint64_t qId, int64_t rId, uint64_t tId) {
int32_t code = 0;
int8_t status = 0;
SSchTask *pTask = NULL;
SSchJob *pJob = schAcquireJob(rId);
if (NULL == pJob) {
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId);
SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST);
}
}
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
}
SCH_ERR_JRET(schGetTaskInJob(pJob, tId, &pTask));
SCH_LOCK_TASK(pTask);
return TSDB_CODE_SUCCESS;
_return:
if (pTask) {
SCH_UNLOCK_TASK(pTask);
}
if (pJob) {
schReleaseJob(rId);
}
SCH_RET(code);
}
......@@ -88,9 +88,21 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
}
// Note: no more task error processing, handled in function internal
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize,
int32_t rspCode) {
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
char *msg = pMsg->pData;
int32_t msgSize = pMsg->len;
int32_t msgType = pMsg->msgType;
bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode));
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, execId));
SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType));
int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1);
if (SCH_NEED_REDIRECT(reqType, rspCode, pMsg->len)) {
SCH_RET(schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode));
}
switch (msgType) {
case TDMT_VND_CREATE_TABLE_RSP: {
......@@ -362,65 +374,24 @@ _return:
int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
int32_t msgType = pMsg->msgType;
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
SSchTask *pTask = NULL;
SSchJob *pJob = NULL;
SSchJob *pJob = schAcquireJob(pParam->refId);
if (NULL == pJob) {
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:0x%" PRIx64,
pParam->queryId, pParam->taskId, pParam->refId);
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
}
SCH_ERR_JRET(schGetTaskInJob(pJob, pParam->taskId, &pTask));
SCH_LOCK_TASK(pTask);
SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
if (pParam->execId != pTask->execId) {
SCH_TASK_DLOG("execId %d mis-match current execId %d", pParam->execId, pTask->execId);
goto _return;
}
bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode));
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, pParam->execId));
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("rsp will not be processed cause of job status %s, rspCode:0x%x", jobTaskStatusStr(status), rspCode);
code = atomic_load_32(&pJob->errCode);
goto _return;
}
SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType));
SCH_TASK_DLOG("begin to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode));
int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1);
if (SCH_NEED_REDIRECT(reqType, rspCode, pMsg->len)) {
code = schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode);
goto _return;
}
SCH_ERR_RET(schProcessOnCbBegin(&pJob, &pTask, pParam->queryId, pParam->refId, pParam->taskId));
schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode);
code = schHandleResponseMsg(pJob, pTask, pParam->execId, pMsg, rspCode);
pMsg->pData = NULL;
_return:
if (pTask) {
if (code) {
schProcessOnTaskFailure(pJob, pTask, code);
}
SCH_UNLOCK_TASK(pTask);
}
if (pJob) {
schReleaseJob(pParam->refId);
}
schProcessOnCbEnd(pJob, pTask, code);
taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(param);
SCH_TASK_DLOG("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode));
SCH_RET(code);
}
......@@ -451,6 +422,37 @@ int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) {
}
int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchedulerHbRsp rsp = {0};
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
if (code) {
qError("hb rsp error:%s", tstrerror(code));
SCH_ERR_JRET(code);
}
if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) {
qError("invalid hb rsp msg, size:%d", pMsg->len);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSchTrans trans = {0};
trans.pTrans = pParam->pTrans;
trans.pHandle = pMsg->handle;
SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans));
SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus));
_return:
tFreeSSchedulerHbRsp(&rsp);
taosMemoryFree(param);
SCH_RET(code);
}
int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bool isHb, SSchTrans *trans, void **pParam) {
if (!isHb) {
SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
......@@ -692,36 +694,6 @@ _return:
SCH_RET(code);
}
int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchedulerHbRsp rsp = {0};
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
if (code) {
qError("hb rsp error:%s", tstrerror(code));
SCH_ERR_JRET(code);
}
if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) {
qError("invalid hb rsp msg, size:%d", pMsg->len);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSchTrans trans = {0};
trans.pTrans = pParam->pTrans;
trans.pHandle = pMsg->handle;
SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans));
SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus));
_return:
tFreeSSchedulerHbRsp(&rsp);
taosMemoryFree(param);
SCH_RET(code);
}
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb) {
int32_t code = 0;
int32_t msgType = TDMT_SCH_LINK_BROKEN;
......
......@@ -21,56 +21,66 @@
#include "tref.h"
#include "trpc.h"
SSchStatusFps gSchJobFps[JOB_TASK_STATUS_MAX] = {
{JOB_TASK_STATUS_NULL, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent},
{JOB_TASK_STATUS_INIT, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent},
{JOB_TASK_STATUS_EXEC, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent},
{JOB_TASK_STATUS_PART_SUCC, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent},
{JOB_TASK_STATUS_SUCC, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent},
{JOB_TASK_STATUS_FAIL, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent},
{JOB_TASK_STATUS_DROP, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent},
};
SSchStatusFps gSchTaskFps[JOB_TASK_STATUS_MAX] = {
{JOB_TASK_STATUS_NULL, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent},
{JOB_TASK_STATUS_INIT, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent},
{JOB_TASK_STATUS_EXEC, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent},
{JOB_TASK_STATUS_PART_SUCC, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent},
{JOB_TASK_STATUS_SUCC, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent},
{JOB_TASK_STATUS_FAIL, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent},
{JOB_TASK_STATUS_DROP, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent},
};
int32_t schSwitchJobStatus(SSchJob** job, int32_t status, void* param) {
SCH_ERR_RET(schUpdateJobStatus(*job, status));
int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) {
int32_t code = 0;
SCH_ERR_JRET(schUpdateJobStatus(pJob, status));
switch (status) {
case JOB_TASK_STATUS_INIT:
SCH_RET(schInitJob(job, param));
break;
case JOB_TASK_STATUS_EXEC:
SCH_RET(schExecJob(job, param));
SCH_ERR_JRET(schExecJob(pJob, (SSchedulerReq*)param));
break;
case JOB_TASK_STATUS_PART_SUCC:
SCH_ERR_JRET(schProcessOnJobPartialSuccess(pJob));
break;
case JOB_TASK_STATUS_SUCC:
break;
case JOB_TASK_STATUS_FAIL:
SCH_RET(schProcessOnJobFailure(pJob, (int32_t)param));
break;
case JOB_TASK_STATUS_DROP:
SCH_ERR_JRET(schProcessOnJobDropped(pJob, (int32_t)param));
if (taosRemoveRef(schMgmt.jobRef, pJob->refId)) {
SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, pJob->refId);
} else {
SCH_JOB_DLOG("job removed from jobRef list, refId:0x%" PRIx64, pJob->refId);
}
break;
default: {
SSchJob* pJob = *job;
SCH_JOB_ELOG("enter unknown job status %d", status);
SCH_JOB_ELOG("unknown job status %d", status);
SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
}
return TSDB_CODE_SUCCESS;
_return:
SCH_RET(schProcessOnJobFailure(pJob, code));
}
int32_t schHandleOpBeginEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq) {
SSchEvent event = {0};
event.event = SCH_EVENT_BEGIN_OP;
SSchOpEvent opEvent = {0};
opEvent.type = type;
opEvent.begin = true;
opEvent.pReq = pReq;
int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SSchedulerReq* pReq) {
SSchJob *pJob = schAcquireJob(jobId);
if (NULL == pJob) {
qError("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, jobId);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
*job = pJob;
SCH_ERR_RET(schHandleJobEvent(pJob, &event));
SCH_RET(schProcessOnOpBegin(pJob, type, pReq));
}
void schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) {
if (NULL == pJob) {
return;
}
schProcessOnOpEnd(pJob, type, pReq, errCode);
schReleaseJob(pJob->refId);
}
......@@ -102,7 +102,7 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_
if (execId != pTask->execId) { // ignore it
SCH_TASK_DLOG("execId %d is not current execId %d", execId, pTask->execId);
SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
return TSDB_CODE_SUCCESS;
......@@ -135,18 +135,26 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v
// Note: no more task error processing, handled in function internal
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
return TSDB_CODE_SCH_IGNORE_ERROR;
}
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(status));
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
SCH_TASK_ELOG("task already not in EXEC status, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (errCode == TSDB_CODE_SCH_TIMEOUT_ERROR) {
SCH_LOG_TASK_WAIT_TS(pTask);
} else {
SCH_LOG_TASK_END_TS(pTask);
}
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_DLOG("task failed not processed cause of job status, job status:%s", jobTaskStatusStr(status));
SCH_RET(atomic_load_32(&pJob->errCode));
}
bool needRetry = false;
bool moved = false;
......@@ -155,16 +163,11 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
SCH_ERR_JRET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
SCH_ERR_RET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
if (!needRetry) {
SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
SCH_TASK_ELOG("task not in executing list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAIL);
if (SCH_IS_WAIT_ALL_JOB(pJob)) {
......@@ -181,14 +184,12 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
}
}
} else {
SCH_ERR_JRET(schHandleTaskRetry(pJob, pTask));
SCH_ERR_RET(schHandleTaskRetry(pJob, pTask));
return TSDB_CODE_SUCCESS;
}
_return:
SCH_RET(schProcessOnJobFailure(pJob, errCode));
SCH_RET(code);
}
......@@ -204,9 +205,9 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC);
SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask));
SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask));
SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
if (parentNum == 0) {
......@@ -225,9 +226,9 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
}
if (pTask->level->taskFailed > 0) {
SCH_RET(schProcessOnJobFailure(pJob, 0));
SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, 0));
} else {
SCH_RET(schProcessOnJobPartialSuccess(pJob));
SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
}
} else {
pJob->resNode = pTask->succeedAddr;
......@@ -235,7 +236,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
pJob->fetchTask = pTask;
SCH_RET(schProcessOnJobPartialSuccess(pJob));
SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
}
/*
......@@ -269,10 +270,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(schLaunchJobLowerLevel(pJob, pTask));
return TSDB_CODE_SUCCESS;
_return:
SCH_RET(schProcessOnJobFailure(pJob, code));
}
int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
......@@ -280,15 +277,14 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
SCH_LOCK_TASK(pTask);
if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status &&
pJob->fetchTask != pTask && taosArrayGetSize(pTask->candidateAddrs) > 1) {
SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId);
schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes);
schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR);
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR));
}
SCH_UNLOCK_TASK(pTask);
return TSDB_CODE_SUCCESS;
}
......@@ -298,7 +294,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32
if ((pTask->execId + 1) >= pTask->maxExecTimes) {
SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId);
schProcessOnJobFailure(pJob, rspCode);
schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void*)rspCode);
return TSDB_CODE_SUCCESS;
}
......@@ -353,9 +349,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32
_return:
code = schProcessOnTaskFailure(pJob, pTask, code);
SCH_RET(code);
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) {
......@@ -372,9 +366,7 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32
_return:
schProcessOnTaskFailure(pJob, pTask, code);
SCH_RET(code);
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
......@@ -679,49 +671,39 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) {
int32_t taskNum = (int32_t)taosArrayGetSize(pStatusList);
SSchTask *pTask = NULL;
SSchJob *pJob = NULL;
qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn, pEpId->ep.port);
for (int32_t i = 0; i < taskNum; ++i) {
STaskStatus *taskStatus = taosArrayGet(pStatusList, i);
STaskStatus *pStatus = taosArrayGet(pStatusList, i);
int32_t code = 0;
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s",
taskStatus->queryId, taskStatus->taskId, taskStatus->execId, jobTaskStatusStr(taskStatus->status));
pStatus->queryId, pStatus->taskId, pStatus->execId, jobTaskStatusStr(pStatus->status));
SSchJob *pJob = schAcquireJob(taskStatus->refId);
if (NULL == pJob) {
qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId,
taskStatus->queryId, taskStatus->taskId);
// TODO DROP TASK FROM SERVER!!!!
if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) {
continue;
}
pTask = NULL;
schGetTaskInJob(pJob, taskStatus->taskId, &pTask);
if (NULL == pTask) {
// TODO DROP TASK FROM SERVER!!!!
schReleaseJob(taskStatus->refId);
if (pStatus->execId != pTask->execId) {
//TODO
SCH_TASK_DLOG("execId %d mis-match current execId %d", pStatus->execId, pTask->execId);
schProcessOnCbEnd(pJob, pTask, 0);
continue;
}
if (taskStatus->execId != pTask->execId) {
// TODO DROP TASK FROM SERVER!!!!
SCH_TASK_DLOG("EID %d in hb rsp mis-match", taskStatus->execId);
schReleaseJob(taskStatus->refId);
continue;
}
if (taskStatus->status == JOB_TASK_STATUS_FAIL) {
if (pStatus->status == JOB_TASK_STATUS_FAIL) {
// RECORD AND HANDLE ERROR!!!!
schReleaseJob(taskStatus->refId);
schProcessOnCbEnd(pJob, pTask, 0);
continue;
}
if (taskStatus->status == JOB_TASK_STATUS_INIT) {
schRescheduleTask(pJob, pTask);
if (pStatus->status == JOB_TASK_STATUS_INIT) {
code = schRescheduleTask(pJob, pTask);
}
schReleaseJob(taskStatus->refId);
schProcessOnCbEnd(pJob, pTask, code);
}
return TSDB_CODE_SUCCESS;
......@@ -739,9 +721,8 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
SCH_LOG_TASK_START_TS(pTask);
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_DLOG("no need to launch task cause of job status, job status:%s", jobTaskStatusStr(status));
SCH_RET(atomic_load_32(&pJob->errCode));
SCH_TASK_DLOG("no need to launch task cause of job status %s", jobTaskStatusStr(status));
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
// NOTE: race condition: the task should be put into the hash table before send msg to server
......
......@@ -43,6 +43,8 @@ char* schGetOpStr(SCH_OP_TYPE type) {
return "EXEC";
case SCH_OP_FETCH:
return "FETCH";
case SCH_OP_GET_STATUS:
return "GET STATUS";
default:
return "UNKNOWN";
}
......
......@@ -73,93 +73,39 @@ int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJobId) {
int32_t code = 0;
SSchJob *pJob = NULL;
*pJobId = 0;
SCH_ERR_JRET(schInitJob(pJobId, pReq));
SCH_ERR_RET(schSwitchJobStatus(&pJob, JOB_TASK_STATUS_INIT, pReq));
SCH_ERR_JRET(schHandleOpBeginEvent(*pJobId, &pJob, SCH_OP_EXEC, pReq));
SCH_ERR_RET(schHandleOpBeginEvent(pJob, SCH_OP_EXEC, pReq));
SCH_ERR_RET(schSwitchJobStatus(&pJob, JOB_TASK_STATUS_EXEC, pReq));
SCH_ERR_RET(schHandleOpEndEvent(pJob, SCH_OP_EXEC, pReq));
SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_INIT, pReq));
*pJobId = pJob->refId;
SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_EXEC, pReq));
_return:
schDumpJobExecRes(pJob, pReq->pQueryRes);
schReleaseJob(pJob->refId);
return code;
SCH_RET(schHandleOpEndEvent(pJob, SCH_OP_EXEC, pReq, code));
}
int32_t schedulerFetchRows(int64_t job, SSchedulerReq *pReq) {
int32_t schedulerFetchRows(int64_t jobId, SSchedulerReq *pReq) {
qDebug("scheduler %s fetch rows start", pReq->syncReq ? "SYNC" : "ASYNC");
int32_t code = 0;
SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, jobId:0x%" PRIx64, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
SCH_ERR_RET(schHandleOpBeginEvent(pJob, SCH_OP_FETCH, pReq));
SCH_ERR_RET(schBeginOperation(pJob, SCH_OP_FETCH, true));
pJob->userRes.fetchRes = pData;
code = schJobFetchRows(pJob);
schReleaseJob(job);
SCH_RET(code);
}
void schedulerFetchRowsA(int64_t job, schedulerFetchFp fp, void* param) {
qDebug("scheduler async fetch rows start");
SSchJob *pJob = NULL;
int32_t code = 0;
if (NULL == fp || NULL == param) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
SCH_ERR_JRET(schHandleOpBeginEvent(jobId, &pJob, SCH_OP_FETCH, pReq));
SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) {
qError("acquire sch job from job list failed, may be dropped, jobId:0x%" PRIx64, job);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
SCH_ERR_JRET(schBeginOperation(pJob, SCH_OP_FETCH, false));
pJob->userRes.fetchFp = fp;
pJob->userRes.userParam = param;
SCH_ERR_JRET(schJobFetchRowsA(pJob));
SCH_ERR_JRET(schJobFetchRows(pJob));
_return:
if (code) {
fp(NULL, param, code);
}
schReleaseJob(job);
SCH_RET(schHandleOpEndEvent(pJob, SCH_OP_FETCH, pReq, code));
}
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
int32_t schedulerGetTasksStatus(int64_t jobId, SArray *pSub) {
int32_t code = 0;
SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) {
qDebug("acquire job from jobRef list failed, may not started or dropped, refId:0x%" PRIx64, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
SSchJob *pJob = NULL;
if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) {
qDebug("job not initialized or not executable job, refId:0x%" PRIx64, job);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
SCH_ERR_JRET(schHandleOpBeginEvent(jobId, &pJob, SCH_OP_GET_STATUS, NULL));
for (int32_t i = pJob->levelNum - 1; i >= 0; --i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
......@@ -176,23 +122,7 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
_return:
schReleaseJob(job);
SCH_RET(code);
}
int32_t scheduleCancelJob(int64_t job) {
SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, jobId:0x%" PRIx64, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
int32_t code = schCancelJob(pJob);
schReleaseJob(job);
SCH_RET(code);
SCH_RET(schHandleOpEndEvent(pJob, SCH_OP_GET_STATUS, NULL, code));
}
void schedulerStopQueryHb(void *pTrans) {
......@@ -203,33 +133,23 @@ void schedulerStopQueryHb(void *pTrans) {
schCleanClusterHb(pTrans);
}
void schedulerFreeJob(int64_t* job, int32_t errCode) {
if (0 == *job) {
void schedulerFreeJob(int64_t* jobId, int32_t errCode) {
if (0 == *jobId) {
return;
}
SSchJob *pJob = schAcquireJob(*job);
SSchJob *pJob = schAcquireJob(*jobId);
if (NULL == pJob) {
qError("acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *job);
*job = 0;
qError("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId);
return;
}
int32_t code = schProcessOnJobDropped(pJob, errCode);
if (TSDB_CODE_SCH_JOB_IS_DROPPING == code) {
SCH_JOB_DLOG("sch job is already dropping, refId:0x%" PRIx64, *job);
*job = 0;
if (schJobDone(pJob)) {
return;
}
SCH_JOB_DLOG("start to remove job from jobRef list, refId:0x%" PRIx64, *job);
if (taosRemoveRef(schMgmt.jobRef, *job)) {
SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, *job);
}
schReleaseJob(*job);
*job = 0;
schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, (void*)errCode);
*jobId = 0;
}
void schedulerDestroy(void) {
......
......@@ -513,7 +513,7 @@ void* schtRunJobThread(void *aa) {
req.pDag = &dag;
req.sql = "select * from tb";
req.execFp = schtQueryCb;
req.execParam = &queryDone;
req.cbParam = &queryDone;
code = schedulerExecJob(&req, &queryJobRefId);
assert(code == 0);
......@@ -665,7 +665,7 @@ TEST(queryTest, normalCase) {
req.pDag = &dag;
req.sql = "select * from tb";
req.execFp = schtQueryCb;
req.execParam = &queryDone;
req.cbParam = &queryDone;
code = schedulerExecJob(&req, &job);
ASSERT_EQ(code, 0);
......@@ -769,7 +769,7 @@ TEST(queryTest, readyFirstCase) {
req.pDag = &dag;
req.sql = "select * from tb";
req.execFp = schtQueryCb;
req.execParam = &queryDone;
req.cbParam = &queryDone;
code = schedulerExecJob(&req, &job);
ASSERT_EQ(code, 0);
......@@ -876,7 +876,7 @@ TEST(queryTest, flowCtrlCase) {
req.pDag = &dag;
req.sql = "select * from tb";
req.execFp = schtQueryCb;
req.execParam = &queryDone;
req.cbParam = &queryDone;
code = schedulerExecJob(&req, &job);
ASSERT_EQ(code, 0);
......@@ -989,7 +989,7 @@ TEST(insertTest, normalCase) {
req.pDag = &dag;
req.sql = "insert into tb values(now,1)";
req.execFp = schtQueryCb;
req.execParam = NULL;
req.cbParam = NULL;
code = schedulerExecJob(&req, &insertJobRefId, &res);
ASSERT_EQ(code, 0);
......
......@@ -393,6 +393,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPING, "Task dropping")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUPLICATTED_OPERATION, "Duplicatted operation")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_MSG_ERROR, "Task message error")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_FREED, "Job already freed")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_NOT_EXIST, "Job not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_STATUS_ERROR, "Task status error")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_IN_ERROR, "Json not support in in/notin operator")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR, "Json not support in this place")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册