/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "catalog.h" #include "command.h" #include "query.h" #include "schInt.h" #include "tmsg.h" #include "tref.h" #include "trpc.h" int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) { int32_t code = 0; SCH_ERR_JRET(schUpdateJobStatus(pJob, status)); switch (status) { case JOB_TASK_STATUS_INIT: break; case JOB_TASK_STATUS_EXEC: SCH_ERR_JRET(schExecJob(pJob, (SSchedulerReq*)param)); break; case JOB_TASK_STATUS_PART_SUCC: SCH_ERR_JRET(schProcessOnJobPartialSuccess(pJob)); break; case JOB_TASK_STATUS_SUCC: break; case JOB_TASK_STATUS_FAIL: SCH_RET(schProcessOnJobFailure(pJob, (int32_t)param)); break; case JOB_TASK_STATUS_DROP: SCH_ERR_JRET(schProcessOnJobDropped(pJob, (int32_t)param)); if (taosRemoveRef(schMgmt.jobRef, pJob->refId)) { SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, pJob->refId); } else { SCH_JOB_DLOG("job removed from jobRef list, refId:0x%" PRIx64, pJob->refId); } break; default: { SCH_JOB_ELOG("unknown job status %d", status); SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); } } return TSDB_CODE_SUCCESS; _return: SCH_RET(schProcessOnJobFailure(pJob, code)); } int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SSchedulerReq* pReq) { SSchJob *pJob = schAcquireJob(jobId); if (NULL == pJob) { qError("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, jobId); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } *job = pJob; SCH_RET(schProcessOnOpBegin(pJob, type, pReq)); } void schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) { if (NULL == pJob) { return; } schProcessOnOpEnd(pJob, type, pReq, errCode); schReleaseJob(pJob->refId); }