schStatus.c 2.6 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalog.h"
#include "command.h"
#include "query.h"
#include "schInt.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"

D
dapan1121 已提交
24 25 26
int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) {
  int32_t code = 0;
  SCH_ERR_JRET(schUpdateJobStatus(pJob, status));
D
dapan1121 已提交
27 28 29

  switch (status) {
    case JOB_TASK_STATUS_INIT:
D
dapan1121 已提交
30
      break;
D
dapan1121 已提交
31
    case JOB_TASK_STATUS_EXEC:
D
dapan1121 已提交
32 33
      SCH_ERR_JRET(schExecJob(pJob, (SSchedulerReq*)param));    
      break;
D
dapan1121 已提交
34
    case JOB_TASK_STATUS_PART_SUCC:
D
dapan1121 已提交
35 36 37 38 39
      SCH_ERR_JRET(schProcessOnJobPartialSuccess(pJob));
      break;
    case JOB_TASK_STATUS_SUCC:
      break;
    case JOB_TASK_STATUS_FAIL:      
D
dapan1121 已提交
40
      SCH_RET(schProcessOnJobFailure(pJob, (param ? *(int32_t*)param : 0)));
D
dapan1121 已提交
41 42
      break;
    case JOB_TASK_STATUS_DROP:
D
dapan1121 已提交
43
      schProcessOnJobDropped(pJob, *(int32_t*)param);
D
dapan1121 已提交
44 45 46 47 48 49 50
      
      if (taosRemoveRef(schMgmt.jobRef, pJob->refId)) {
        SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, pJob->refId);
      } else {
        SCH_JOB_DLOG("job removed from jobRef list, refId:0x%" PRIx64, pJob->refId);
      }
      break;    
D
dapan1121 已提交
51
    default: {
D
dapan1121 已提交
52
      SCH_JOB_ELOG("unknown job status %d", status);
D
dapan1121 已提交
53 54 55 56 57
      SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
58 59 60 61

_return:

  SCH_RET(schProcessOnJobFailure(pJob, code));
D
dapan1121 已提交
62 63
}

D
dapan1121 已提交
64 65 66 67 68 69 70 71
int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SSchedulerReq* pReq) {
  SSchJob *pJob = schAcquireJob(jobId);
  if (NULL == pJob) {
    qError("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, jobId);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  *job = pJob;
D
dapan1121 已提交
72

D
dapan1121 已提交
73
  SCH_RET(schProcessOnOpBegin(pJob, type, pReq));
D
dapan1121 已提交
74 75
}

D
dapan1121 已提交
76 77 78
int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) {
  int32_t code = errCode;
  
D
dapan1121 已提交
79
  if (NULL == pJob) {
D
dapan1121 已提交
80
    SCH_RET(code);
D
dapan1121 已提交
81 82 83
  }
  
  schProcessOnOpEnd(pJob, type, pReq, errCode);
D
dapan1121 已提交
84

D
dapan1121 已提交
85 86 87 88
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    code = pJob->errCode;
  }

D
dapan1121 已提交
89
  schReleaseJob(pJob->refId);
D
dapan1121 已提交
90 91

  return code;
D
dapan1121 已提交
92
}
D
dapan1121 已提交
93 94