提交 8ffb5012 编写于 作者: D dapan1121

enh: enhance stop query

上级 65c318db
......@@ -808,11 +808,16 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
SRequestObj* pRequest = (SRequestObj*)param;
pRequest->code = code;
memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
if (pResult) {
memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
}
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
TDMT_VND_CREATE_TABLE == pRequest->type) {
pRequest->body.resInfo.numOfRows = pResult->numOfRows;
if (pResult) {
pRequest->body.resInfo.numOfRows = pResult->numOfRows;
}
schedulerFreeJob(&pRequest->body.queryJob, 0);
}
......
......@@ -434,6 +434,7 @@ int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum);
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode);
#ifdef __cplusplus
......
......@@ -758,6 +758,17 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
return TSDB_CODE_SUCCESS;
}
void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode) {
if (pReq->syncReq) {
return;
}
if (pReq->execFp) {
(*pReq->execFp)(NULL, pReq->cbParam, errCode);
} else if (pReq->fetchFp) {
(*pReq->fetchFp)(NULL, pReq->cbParam, errCode);
}
}
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) {
int32_t op = 0;
......@@ -796,17 +807,13 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int
int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq) {
int32_t code = 0;
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_JOB_ELOG("abort op %s cause of job need to stop, status:%s", schGetOpStr(type), jobTaskStatusStr(status));
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
int8_t status = SCH_GET_JOB_STATUS(pJob);
switch (type) {
case SCH_OP_EXEC:
if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR);
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
}
......@@ -817,11 +824,16 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
case SCH_OP_FETCH:
if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR);
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
}
SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
pJob->userRes.fetchRes = pReq->pFetchRes;
pJob->userRes.fetchFp = pReq->fetchFp;
pJob->userRes.cbParam = pReq->cbParam;
pJob->opStatus.syncReq = pReq->syncReq;
if (!SCH_JOB_NEED_FETCH(pJob)) {
......@@ -834,10 +846,6 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
pJob->userRes.fetchRes = pReq->pFetchRes;
pJob->userRes.fetchFp = pReq->fetchFp;
pJob->userRes.cbParam = pReq->cbParam;
break;
case SCH_OP_GET_STATUS:
if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) {
......@@ -850,6 +858,11 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
}
if (schJobNeedToStop(pJob, &status)) {
SCH_JOB_ELOG("abort op %s cause of job need to stop, status:%s", schGetOpStr(type), jobTaskStatusStr(status));
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -77,6 +77,7 @@ int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
int32_t code = errCode;
if (NULL == pJob) {
schDirectPostJobRes(pReq, errCode);
SCH_RET(code);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册