schJob.c 28.8 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalog.h"
#include "command.h"
#include "query.h"
D
dapan1121 已提交
19
#include "schInt.h"
D
dapan1121 已提交
20 21 22 23
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"

D
dapan1121 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
  if (TSDB_CODE_SUCCESS == errCode) {
    return;
  }

  int32_t origCode = atomic_load_32(&pJob->errCode);
  if (TSDB_CODE_SUCCESS == origCode) {
    if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) {
      goto _return;
    }

    origCode = atomic_load_32(&pJob->errCode);
  }

  if (NEED_CLIENT_HANDLE_ERROR(origCode)) {
    return;
  }

  if (NEED_CLIENT_HANDLE_ERROR(errCode)) {
    atomic_store_32(&pJob->errCode, errCode);
    goto _return;
  }

  return;

_return:
50
  SCH_JOB_DLOG("job errCode updated to %s", tstrerror(errCode));
D
dapan1121 已提交
51 52
}

D
dapan1121 已提交
53 54
bool schJobDone(SSchJob *pJob) {
  int8_t status = SCH_GET_JOB_STATUS(pJob);
H
Hongze Cheng 已提交
55 56

  return (status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_DROP || status == JOB_TASK_STATUS_SUCC);
D
dapan1121 已提交
57
}
D
dapan1121 已提交
58

D
dapan1121 已提交
59 60 61 62 63 64
FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (pStatus) {
    *pStatus = status;
  }

D
dapan1121 已提交
65 66 67 68
  if (schJobDone(pJob)) {
    return true;
  }

D
dapan1121 已提交
69
  if ((*pJob->chkKillFp)(pJob->chkKillParam)) {
D
dapan1121 已提交
70 71
    schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED);
    return true;
D
dapan1121 已提交
72
  }
D
dapan1121 已提交
73 74

  return false;
D
dapan1121 已提交
75 76
}

D
dapan1121 已提交
77
int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
D
dapan1121 已提交
78 79 80 81 82 83 84 85
  int32_t code = 0;

  int8_t oriStatus = 0;

  while (true) {
    oriStatus = SCH_GET_JOB_STATUS(pJob);

    if (oriStatus == newStatus) {
D
dapan1121 已提交
86
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
87 88 89 90
    }

    switch (oriStatus) {
      case JOB_TASK_STATUS_NULL:
D
dapan1121 已提交
91
        if (newStatus != JOB_TASK_STATUS_INIT) {
S
Shengliang Guan 已提交
92
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
93 94 95
        }

        break;
D
dapan1121 已提交
96 97
      case JOB_TASK_STATUS_INIT:
        if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_DROP) {
S
Shengliang Guan 已提交
98
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
99 100 101
        }

        break;
D
dapan1121 已提交
102 103 104
      case JOB_TASK_STATUS_EXEC:
        if (newStatus != JOB_TASK_STATUS_PART_SUCC && newStatus != JOB_TASK_STATUS_FAIL &&
            newStatus != JOB_TASK_STATUS_DROP) {
S
Shengliang Guan 已提交
105
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
106 107 108
        }

        break;
D
dapan1121 已提交
109 110
      case JOB_TASK_STATUS_PART_SUCC:
        if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC &&
D
dapan1121 已提交
111
            newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC) {
S
Shengliang Guan 已提交
112
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
113 114 115
        }

        break;
D
dapan1121 已提交
116 117 118
      case JOB_TASK_STATUS_SUCC:
      case JOB_TASK_STATUS_FAIL:
        if (newStatus != JOB_TASK_STATUS_DROP) {
S
Shengliang Guan 已提交
119
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
120 121 122
        }

        break;
D
dapan1121 已提交
123
      case JOB_TASK_STATUS_DROP:
D
dapan1121 已提交
124 125 126 127
        SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);

      default:
        SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));
S
Shengliang Guan 已提交
128
        SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
    }

    if (oriStatus != atomic_val_compare_exchange_8(&pJob->status, oriStatus, newStatus)) {
      continue;
    }

    SCH_JOB_DLOG("job status updated from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));

    break;
  }

  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
144 145 146 147 148
  if (TSDB_CODE_SCH_IGNORE_ERROR == code) {
    SCH_JOB_DLOG("ignore job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
  } else {
    SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
  }
D
dapan1121 已提交
149 150 151
  SCH_RET(code);
}

D
dapan1121 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
  for (int32_t i = 0; i < pJob->levelNum; ++i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);

    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
      SSubplan *pPlan = pTask->plan;
      int32_t   childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0;
      int32_t   parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0;

      if (childNum > 0) {
        if (pJob->levelIdx == pLevel->level) {
          SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        pTask->children = taosArrayInit(childNum, POINTER_BYTES);
        if (NULL == pTask->children) {
          SCH_TASK_ELOG("taosArrayInit %d children failed", childNum);
S
Shengliang Guan 已提交
171
          SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184
        }
      }

      for (int32_t n = 0; n < childNum; ++n) {
        SSubplan  *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
        SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
        if (NULL == childTask || NULL == *childTask) {
          SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        if (NULL == taosArrayPush(pTask->children, childTask)) {
          SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
S
Shengliang Guan 已提交
185
          SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
186 187
        }

D
dapan1121 已提交
188
        SCH_TASK_DLOG("children info, the %d child TID 0x%" PRIx64, n, (*childTask)->taskId);
D
dapan1121 已提交
189 190 191 192 193 194 195 196 197 198 199
      }

      if (parentNum > 0) {
        if (0 == pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        pTask->parents = taosArrayInit(parentNum, POINTER_BYTES);
        if (NULL == pTask->parents) {
          SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
S
Shengliang Guan 已提交
200
          SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
        }
      } else {
        if (0 != pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:%d, parentNum:%d", pLevel->level, parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
      }

      for (int32_t n = 0; n < parentNum; ++n) {
        SSubplan  *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
        SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
        if (NULL == parentTask || NULL == *parentTask) {
          SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        if (NULL == taosArrayPush(pTask->parents, parentTask)) {
          SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
S
Shengliang Guan 已提交
219
          SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
220 221
        }

H
Hongze Cheng 已提交
222
        SCH_TASK_DLOG("parents info, the %d parent TID 0x%" PRIx64, n, (*parentTask)->taskId);
D
dapan1121 已提交
223 224 225 226 227 228 229
      }

      SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
    }
  }

  SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
D
dapan1121 已提交
230 231 232 233 234 235
  if (SCH_IS_QUERY_JOB(pJob)) {
    if (pLevel->taskNum > 1) {
      SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }

H
Hongze Cheng 已提交
236
    SSchTask *pTask = taosArrayGet(pLevel->subTasks, 0);
D
dapan1121 已提交
237 238 239
    if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType) {
      pJob->attr.needFetch = true;
    }
D
dapan1121 已提交
240 241 242 243 244
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
245
int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
246
  if (!SCH_IS_DATA_BIND_QRY_TASK(pTask)) {
D
dapan1121 已提交
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
    return TSDB_CODE_SUCCESS;
  }

  taosArrayPush(pJob->dataSrcTasks, &pTask);

  return TSDB_CODE_SUCCESS;
}

int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
  int32_t code = 0;
  pJob->queryId = pDag->queryId;

  if (pDag->numOfSubplans <= 0) {
    SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  pJob->dataSrcTasks = taosArrayInit(pDag->numOfSubplans, POINTER_BYTES);
  if (NULL == pJob->dataSrcTasks) {
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
  if (levelNum <= 0) {
    SCH_JOB_ELOG("invalid level num:%d", levelNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SHashObj *planToTask = taosHashInit(
D
dapan1121 已提交
276
      pDag->numOfSubplans,
D
dapan1121 已提交
277 278 279 280
      taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
      HASH_NO_LOCK);
  if (NULL == planToTask) {
    SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
S
Shengliang Guan 已提交
281
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
282 283 284 285 286
  }

  pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
  if (NULL == pJob->levels) {
    SCH_JOB_ELOG("taosArrayInit %d failed", levelNum);
S
Shengliang Guan 已提交
287
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
288 289 290 291 292 293 294 295 296 297
  }

  pJob->levelNum = levelNum;
  pJob->levelIdx = levelNum - 1;

  SSchLevel      level = {0};
  SNodeListNode *plans = NULL;
  int32_t        taskNum = 0;
  SSchLevel     *pLevel = NULL;

D
dapan1121 已提交
298
  level.status = JOB_TASK_STATUS_INIT;
D
dapan1121 已提交
299 300 301 302

  for (int32_t i = 0; i < levelNum; ++i) {
    if (NULL == taosArrayPush(pJob->levels, &level)) {
      SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i);
S
Shengliang Guan 已提交
303
      SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
    }

    pLevel = taosArrayGet(pJob->levels, i);
    pLevel->level = i;

    plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
    if (NULL == plans) {
      SCH_JOB_ELOG("empty level plan, level:%d", i);
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
    }

    taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
    if (taskNum <= 0) {
      SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
    }

    pLevel->taskNum = taskNum;

    pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
    if (NULL == pLevel->subTasks) {
      SCH_JOB_ELOG("taosArrayInit %d failed", taskNum);
S
Shengliang Guan 已提交
326
      SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
327 328 329 330 331 332 333 334
    }

    for (int32_t n = 0; n < taskNum; ++n) {
      SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);

      SCH_SET_JOB_TYPE(pJob, plan->subplanType);

      SSchTask  task = {0};
D
dapan1121 已提交
335 336
      SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task);
      if (NULL == pTask) {
D
dapan1121 已提交
337
        SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
S
Shengliang Guan 已提交
338
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
339 340
      }

D
dapan1121 已提交
341
      SCH_ERR_JRET(schInitTask(pJob, pTask, plan, pLevel));
342

D
dapan1121 已提交
343
      SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask));
D
dapan1121 已提交
344

D
dapan1121 已提交
345
      if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES)) {
D
dapan1121 已提交
346
        SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
S
Shengliang Guan 已提交
347
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
348 349
      }

D
dapan1121 已提交
350 351
      if (0 != taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) {
        SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d", n);
S
Shengliang Guan 已提交
352
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
353 354
      }

D
dapan1121 已提交
355 356 357
      ++pJob->taskNum;
    }

D
dapan1121 已提交
358
    SCH_JOB_DLOG("level %d initialized, taskNum:%d", i, taskNum);
D
dapan1121 已提交
359 360 361 362 363
  }

  SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));

_return:
H
Hongze Cheng 已提交
364

D
dapan1121 已提交
365 366 367 368 369 370 371
  if (planToTask) {
    taosHashCleanup(planToTask);
  }

  SCH_RET(code);
}

H
Hongze Cheng 已提交
372
int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes) {
D
dapan1121 已提交
373 374
  pRes->code = atomic_load_32(&pJob->errCode);
  pRes->numOfRows = pJob->resNumOfRows;
D
dapan1121 已提交
375 376
  pRes->res = pJob->execRes.res;
  pRes->msgType = pJob->execRes.msgType;
D
dapan1121 已提交
377
  pRes->numOfBytes = pJob->execRes.numOfBytes;
D
dapan1121 已提交
378
  pJob->execRes.res = NULL;
D
dapan1121 已提交
379

D
dapan1121 已提交
380 381
  SCH_JOB_DLOG("execRes dumped, code: %s", tstrerror(pRes->code));

D
dapan1121 已提交
382 383 384
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
385
int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) {
D
dapan1121 已提交
386
  int32_t code = 0;
H
Hongze Cheng 已提交
387

D
dapan1121 已提交
388 389 390
  SCH_LOCK(SCH_WRITE, &pJob->resLock);

  pJob->fetched = true;
H
Hongze Cheng 已提交
391

D
dapan1121 已提交
392 393
  if (pJob->fetchRes && ((SRetrieveTableRsp *)pJob->fetchRes)->completed) {
    SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_SUCC, NULL));
D
dapan1121 已提交
394 395 396
  }

  while (true) {
D
dapan1121 已提交
397 398
    *pData = atomic_load_ptr(&pJob->fetchRes);
    if (*pData != atomic_val_compare_exchange_ptr(&pJob->fetchRes, *pData, NULL)) {
D
dapan1121 已提交
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
      continue;
    }

    break;
  }

  if (NULL == *pData) {
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
    if (rsp) {
      rsp->completed = 1;
    }

    *pData = rsp;
    SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
  }

415
  SCH_JOB_DLOG("fetch done, totalRows:%" PRId64, pJob->resNumOfRows);
D
dapan1121 已提交
416

D
dapan1121 已提交
417 418 419
_return:

  SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
H
Hongze Cheng 已提交
420

D
dapan1121 已提交
421
  return code;
D
dapan1121 已提交
422 423
}

H
Hongze Cheng 已提交
424 425
int32_t schNotifyUserExecRes(SSchJob *pJob) {
  SExecResult *pRes = taosMemoryCalloc(1, sizeof(SExecResult));
D
dapan1121 已提交
426
  if (pRes) {
D
dapan1121 已提交
427
    schDumpJobExecRes(pJob, pRes);
D
dapan1121 已提交
428 429
  }

D
dapan1121 已提交
430
  SCH_JOB_DLOG("sch start to invoke exec cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
431
  (*pJob->userRes.execFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
432
  SCH_JOB_DLOG("sch end from exec cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
433 434 435 436

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
437 438 439
int32_t schNotifyUserFetchRes(SSchJob *pJob) {
  void *pRes = NULL;

D
dapan1121 已提交
440
  schDumpJobFetchRes(pJob, &pRes);
D
dapan1121 已提交
441 442

  SCH_JOB_DLOG("sch start to invoke fetch cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
443
  (*pJob->userRes.fetchFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
444
  SCH_JOB_DLOG("sch end from fetch cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
445 446 447 448

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
449
void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
D
dapan1121 已提交
450
  SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
H
Hongze Cheng 已提交
451

D
dapan1121 已提交
452
  if (SCH_OP_NULL == pJob->opStatus.op) {
D
dapan1121 已提交
453
    SCH_JOB_DLOG("job not in any operation, no need to post job res, status:%s", jobTaskStatusStr(pJob->status));
D
dapan1121 已提交
454
    goto _return;
D
dapan1121 已提交
455
  }
H
Hongze Cheng 已提交
456

D
dapan1121 已提交
457
  if (op && pJob->opStatus.op != op) {
D
dapan1121 已提交
458
    SCH_JOB_ELOG("job in operation %s mis-match with expected %s", schGetOpStr(pJob->opStatus.op), schGetOpStr(op));
D
dapan1121 已提交
459
    goto _return;
D
dapan1121 已提交
460
  }
H
Hongze Cheng 已提交
461

D
dapan1121 已提交
462
  if (SCH_JOB_IN_SYNC_OP(pJob)) {
D
dapan1121 已提交
463
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
464 465
    tsem_post(&pJob->rspSem);
  } else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) {
D
dapan1121 已提交
466
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
467 468
    schNotifyUserExecRes(pJob);
  } else if (SCH_JOB_IN_ASYNC_FETCH_OP(pJob)) {
D
dapan1121 已提交
469
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
470 471
    schNotifyUserFetchRes(pJob);
  } else {
D
dapan1121 已提交
472
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
473 474
    SCH_JOB_ELOG("job not in any operation, status:%s", jobTaskStatusStr(pJob->status));
  }
D
dapan1121 已提交
475 476 477 478 479 480

  return;

_return:

  SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
481 482
}

D
dapan1121 已提交
483
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
484 485 486 487
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    return TSDB_CODE_SCH_IGNORE_ERROR;
  }
  
D
dapan1121 已提交
488
  schUpdateJobErrCode(pJob, errCode);
H
Hongze Cheng 已提交
489

D
dapan1121 已提交
490
  int32_t code = atomic_load_32(&pJob->errCode);
D
dapan1121 已提交
491
  if (code) {
492
    SCH_JOB_DLOG("job failed with error %s", tstrerror(code));
D
dapan1121 已提交
493
  }
D
dapan1121 已提交
494

D
dapan1121 已提交
495
  schPostJobRes(pJob, 0);
D
dapan1121 已提交
496

D
dapan1121 已提交
497
  SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
498 499
}

D
dapan1121 已提交
500
int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode) {
D
dapan1121 已提交
501 502 503 504
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    return TSDB_CODE_SCH_IGNORE_ERROR;
  }

D
dapan1121 已提交
505
  schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, &errCode);
D
dapan1121 已提交
506
  return TSDB_CODE_SCH_IGNORE_ERROR;
D
dapan1121 已提交
507 508
}

H
Hongze Cheng 已提交
509
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { SCH_RET(schProcessOnJobFailure(pJob, errCode)); }
D
dapan1121 已提交
510 511 512 513 514 515 516 517

int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) {
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    return TSDB_CODE_SCH_IGNORE_ERROR;
  }

  schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, &errCode);
  return TSDB_CODE_SCH_IGNORE_ERROR;
D
dapan1121 已提交
518 519
}

H
Hongze Cheng 已提交
520
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
D
dapan1121 已提交
521 522 523 524 525
  if (schChkCurrentOp(pJob, SCH_OP_FETCH, -1)) {
    SCH_ERR_RET(schLaunchFetchTask(pJob));
  } else {
    schPostJobRes(pJob, 0);
  }
D
dapan1121 已提交
526 527 528 529

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
530
void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); }
D
dapan1121 已提交
531

D
dapan1121 已提交
532
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
533
  SCH_TASK_DLOG("got explain rsp, rows:%" PRId64 ", complete:%d", htobe64(pRsp->numOfRows), pRsp->completed);
D
dapan1121 已提交
534

535
  atomic_store_64(&pJob->resNumOfRows, htobe64(pRsp->numOfRows));
D
dapan1121 已提交
536
  atomic_store_ptr(&pJob->fetchRes, pRsp);
D
dapan1121 已提交
537

D
dapan1121 已提交
538
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
D
dapan1121 已提交
539

D
dapan1121 已提交
540
  schProcessOnDataFetched(pJob);
D
dapan1121 已提交
541

D
dapan1121 已提交
542
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
543 544
}

D
dapan1121 已提交
545
int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
546 547 548 549 550
  if (!SCH_IS_QUERY_JOB(pJob)) {
    return TSDB_CODE_SUCCESS;
  }

  SSchLevel *pLevel = pTask->level;
H
Hongze Cheng 已提交
551
  int32_t    doneNum = atomic_add_fetch_32(&pLevel->taskDoneNum, 1);
D
dapan1121 已提交
552 553 554 555 556 557 558 559 560 561
  if (doneNum == pLevel->taskNum) {
    pJob->levelIdx--;

    pLevel = taosArrayGet(pJob->levels, pJob->levelIdx);
    for (int32_t i = 0; i < pLevel->taskNum; ++i) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);

      if (pTask->children && taosArrayGetSize(pTask->children) > 0) {
        continue;
      }
H
Hongze Cheng 已提交
562

D
dapan1121 已提交
563 564 565 566 567 568 569
      SCH_ERR_RET(schLaunchTask(pJob, pTask));
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
570
int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) {
D
dapan1121 已提交
571
  if (rsp->tbFName[0]) {
D
dapan1121 已提交
572
    SCH_LOCK(SCH_WRITE, &pJob->resLock);
H
Hongze Cheng 已提交
573

D
dapan1121 已提交
574 575 576
    if (NULL == pJob->execRes.res) {
      pJob->execRes.res = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
      if (NULL == pJob->execRes.res) {
H
Hongze Cheng 已提交
577
        SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
D
dapan1121 已提交
578 579 580 581 582 583 584 585 586
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
      }
    }

    STbVerInfo tbInfo;
    strcpy(tbInfo.tbFName, rsp->tbFName);
    tbInfo.sversion = rsp->sversion;
    tbInfo.tversion = rsp->tversion;

D
dapan1121 已提交
587
    taosArrayPush((SArray *)pJob->execRes.res, &tbInfo);
D
dapan1121 已提交
588
    pJob->execRes.msgType = TDMT_SCH_QUERY;
D
dapan1121 已提交
589 590

    SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
D
dapan1121 已提交
591 592 593 594 595
  }

  return TSDB_CODE_SUCCESS;
}

596
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
D
dapan1121 已提交
597
  schGetTaskFromList(pJob->taskList, taskId, pTask);
598
  if (NULL == *pTask) {
D
dapan1121 已提交
599
    SCH_JOB_ELOG("task not found in job task list, taskId:0x%" PRIx64, taskId);
D
dapan1121 已提交
600
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
601 602 603 604 605
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
606 607
int32_t schLaunchJob(SSchJob *pJob) {
  if (EXPLAIN_MODE_STATIC == pJob->attr.explainMode) {
D
dapan1121 已提交
608
    SCH_ERR_RET(qExecStaticExplain(pJob->pDag, (SRetrieveTableRsp **)&pJob->fetchRes));
D
dapan1121 已提交
609
    SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
D
dapan1121 已提交
610
  } else {
D
dapan1121 已提交
611 612
    SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
    SCH_ERR_RET(schLaunchLevelTasks(pJob, level));
D
dapan1121 已提交
613 614 615 616 617 618 619
  }

  return TSDB_CODE_SUCCESS;
}

void schDropJobAllTasks(SSchJob *pJob) {
  schDropTaskInHashList(pJob, pJob->execTasks);
H
Hongze Cheng 已提交
620 621
  //  schDropTaskInHashList(pJob, pJob->succTasks);
  //  schDropTaskInHashList(pJob, pJob->failTasks);
D
dapan1121 已提交
622 623 624 625 626 627 628 629 630 631 632
}

void schFreeJobImpl(void *job) {
  if (NULL == job) {
    return;
  }

  SSchJob *pJob = job;
  uint64_t queryId = pJob->queryId;
  int64_t  refId = pJob->refId;

D
dapan1121 已提交
633 634
  qDebug("QID:0x%" PRIx64 " begin to free sch job, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);

D
dapan1121 已提交
635 636 637 638 639 640 641 642 643
  schDropJobAllTasks(pJob);

  int32_t numOfLevels = taosArrayGetSize(pJob->levels);
  for (int32_t i = 0; i < numOfLevels; ++i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);

    int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
    for (int32_t j = 0; j < numOfTasks; ++j) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
D
dapan1121 已提交
644
      schFreeTask(pJob, pTask);
D
dapan1121 已提交
645 646 647 648 649 650 651 652
    }

    taosArrayDestroy(pLevel->subTasks);
  }

  schFreeFlowCtrl(pJob);

  taosHashCleanup(pJob->execTasks);
H
Hongze Cheng 已提交
653 654
  //  taosHashCleanup(pJob->failTasks);
  //  taosHashCleanup(pJob->succTasks);
D
dapan1121 已提交
655
  taosHashCleanup(pJob->taskList);
H
Hongze Cheng 已提交
656

D
dapan1121 已提交
657 658 659 660 661 662
  taosArrayDestroy(pJob->levels);
  taosArrayDestroy(pJob->nodeList);
  taosArrayDestroy(pJob->dataSrcTasks);

  qExplainFreeCtx(pJob->explainCtx);

D
dapan1121 已提交
663
  destroyQueryExecRes(&pJob->execRes);
D
dapan1121 已提交
664

D
dapan1121 已提交
665
  qDestroyQueryPlan(pJob->pDag);
666
  nodesReleaseAllocatorWeakRef(pJob->allocatorRefId);
D
dapan1121 已提交
667

D
dapan1121 已提交
668
  taosMemoryFreeClear(pJob->userRes.execRes);
D
dapan1121 已提交
669
  taosMemoryFreeClear(pJob->fetchRes);
D
dapan1121 已提交
670
  taosMemoryFreeClear(pJob->sql);
D
dapan1121 已提交
671
  taosMemoryFree(pJob);
D
dapan1121 已提交
672

D
dapan1121 已提交
673 674 675 676
  int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
  if (jobNum == 0) {
    schCloseJobRef();
  }
D
dapan1121 已提交
677 678

  qDebug("QID:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
D
dapan1121 已提交
679 680
}

D
dapan1121 已提交
681
int32_t schJobFetchRows(SSchJob *pJob) {
H
Hongze Cheng 已提交
682
  int32_t code = 0;
D
dapan1121 已提交
683 684

  if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
D
dapan1121 已提交
685
    SCH_ERR_RET(schLaunchFetchTask(pJob));
H
Hongze Cheng 已提交
686

D
dapan1121 已提交
687
    if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
D
dapan1121 已提交
688 689
      SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
      tsem_wait(&pJob->rspSem);
H
Hongze Cheng 已提交
690
      SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
D
dapan1121 已提交
691 692
    }
  } else {
D
dapan1121 已提交
693
    if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
H
Hongze Cheng 已提交
694
      SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
D
dapan1121 已提交
695 696 697
    } else {
      schPostJobRes(pJob, SCH_OP_FETCH);
    }
D
dapan1121 已提交
698 699
  }

D
dapan1121 已提交
700
  SCH_RET(code);
D
dapan1121 已提交
701 702
}

D
dapan1121 已提交
703
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
D
dapan1121 已提交
704 705 706 707 708
  int32_t  code = 0;
  int64_t  refId = -1;
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
    qError("QID:0x%" PRIx64 " calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob));
S
Shengliang Guan 已提交
709
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
710 711 712
  }

  pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
H
Haojun Liao 已提交
713
  pJob->attr.localExec = pReq->localReq;
D
dapan1121 已提交
714
  pJob->conn = *pReq->pConn;
D
dapan1121 已提交
715 716 717
  if (pReq->sql) {
    pJob->sql = strdup(pReq->sql);
  }
D
dapan1121 已提交
718
  pJob->pDag = pReq->pDag;
719
  pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId);
D
dapan1121 已提交
720 721 722
  pJob->chkKillFp = pReq->chkKillFp;
  pJob->chkKillParam = pReq->chkKillParam;
  pJob->userRes.execFp = pReq->execFp;
D
dapan1121 已提交
723
  pJob->userRes.cbParam = pReq->cbParam;
D
dapan1121 已提交
724 725 726 727

  if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
    qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
  } else {
H
Haojun Liao 已提交
728
    pJob->nodeList = taosArrayDup(pReq->pNodeList, NULL);
D
dapan1121 已提交
729
  }
H
Hongze Cheng 已提交
730 731 732

  pJob->taskList = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                                HASH_ENTRY_LOCK);
D
dapan1121 已提交
733 734
  if (NULL == pJob->taskList) {
    SCH_JOB_ELOG("taosHashInit %d taskList failed", pReq->pDag->numOfSubplans);
S
Shengliang Guan 已提交
735
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
736 737 738 739 740 741 742 743
  }

  SCH_ERR_JRET(schValidateAndBuildJob(pReq->pDag, pJob));

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_JRET(qExecExplainBegin(pReq->pDag, &pJob->explainCtx, pReq->startTs));
  }

H
Hongze Cheng 已提交
744 745
  pJob->execTasks = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                                 HASH_ENTRY_LOCK);
D
dapan1121 已提交
746 747
  if (NULL == pJob->execTasks) {
    SCH_JOB_ELOG("taosHashInit %d execTasks failed", pReq->pDag->numOfSubplans);
S
Shengliang Guan 已提交
748
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
749 750 751 752
  }

  tsem_init(&pJob->rspSem, 0, 0);

D
dapan1121 已提交
753 754
  pJob->refId = taosAddRef(schMgmt.jobRef, pJob);
  if (pJob->refId < 0) {
D
dapan1121 已提交
755 756 757 758 759 760
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
    SCH_ERR_JRET(terrno);
  }

  atomic_add_fetch_32(&schMgmt.jobNum, 1);

D
dapan1121 已提交
761
  *pJobId = pJob->refId;
D
dapan1121 已提交
762

H
Hongze Cheng 已提交
763
  SCH_JOB_DLOG("job refId:0x%" PRIx64 " created", pJob->refId);
D
dapan1121 已提交
764 765 766 767 768 769 770

  return TSDB_CODE_SUCCESS;

_return:

  if (NULL == pJob) {
    qDestroyQueryPlan(pReq->pDag);
D
dapan1121 已提交
771
  } else if (pJob->refId < 0) {
D
dapan1121 已提交
772 773
    schFreeJobImpl(pJob);
  } else {
D
dapan1121 已提交
774
    taosRemoveRef(schMgmt.jobRef, pJob->refId);
D
dapan1121 已提交
775
  }
H
Hongze Cheng 已提交
776

D
dapan1121 已提交
777 778 779
  SCH_RET(code);
}

D
dapan1121 已提交
780 781
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
  int32_t code = 0;
H
Hongze Cheng 已提交
782
  qDebug("QID:0x%" PRIx64 " sch job refId 0x%" PRIx64 " started", pReq->pDag->queryId, pJob->refId);
D
dapan1121 已提交
783

D
dapan1121 已提交
784
  SCH_ERR_RET(schLaunchJob(pJob));
H
Hongze Cheng 已提交
785

D
dapan1121 已提交
786 787 788
  if (pReq->syncReq) {
    SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
    tsem_wait(&pJob->rspSem);
D
dapan1121 已提交
789 790 791
  }

  SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
H
Hongze Cheng 已提交
792

D
dapan1121 已提交
793
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
794 795
}

H
Hongze Cheng 已提交
796
void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode) {
D
dapan1121 已提交
797
  if (NULL == pReq || pReq->syncReq) {
D
dapan1121 已提交
798 799
    return;
  }
H
Hongze Cheng 已提交
800

D
dapan1121 已提交
801 802 803 804 805 806
  if (pReq->execFp) {
    (*pReq->execFp)(NULL, pReq->cbParam, errCode);
  } else if (pReq->fetchFp) {
    (*pReq->fetchFp)(NULL, pReq->cbParam, errCode);
  }
}
D
dapan1121 已提交
807

D
dapan1121 已提交
808 809
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) {
  bool r = false;
D
dapan1121 已提交
810
  SCH_LOCK(SCH_READ, &pJob->opStatus.lock);
D
dapan1121 已提交
811 812 813 814 815
  if (sync >= 0) {
    r = (pJob->opStatus.op == op) && (pJob->opStatus.syncReq == sync);
  } else {
    r = (pJob->opStatus.op == op);
  }
D
dapan1121 已提交
816 817 818 819 820
  SCH_UNLOCK(SCH_READ, &pJob->opStatus.lock);

  return r;
}

H
Hongze Cheng 已提交
821
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode) {
D
dapan1121 已提交
822
  int32_t op = 0;
H
Hongze Cheng 已提交
823

D
dapan1121 已提交
824 825
  switch (type) {
    case SCH_OP_EXEC:
D
dapan1121 已提交
826
      if (pReq && pReq->syncReq) {
D
dapan1121 已提交
827
        SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
828 829
        op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
        if (SCH_OP_NULL == op || op != type) {
H
Hongze Cheng 已提交
830 831
          SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
                       jobTaskStatusStr(pJob->status));
D
dapan1121 已提交
832
        }
D
dapan1121 已提交
833
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
834 835 836 837
        schDumpJobExecRes(pJob, pReq->pExecRes);
      }
      break;
    case SCH_OP_FETCH:
D
dapan1121 已提交
838
      if (pReq && pReq->syncReq) {
D
dapan1121 已提交
839
        SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
840 841
        op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
        if (SCH_OP_NULL == op || op != type) {
H
Hongze Cheng 已提交
842 843
          SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
                       jobTaskStatusStr(pJob->status));
D
dapan1121 已提交
844
        }
D
dapan1121 已提交
845
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
846 847 848 849 850 851 852
      }
      break;
    case SCH_OP_GET_STATUS:
      errCode = TSDB_CODE_SUCCESS;
      break;
    default:
      break;
D
dapan1121 已提交
853 854
  }

D
dapan1121 已提交
855
  if (errCode) {
D
dapan1121 已提交
856
    schHandleJobFailure(pJob, errCode);
D
dapan1121 已提交
857 858 859
  }

  SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode));
D
dapan1121 已提交
860 861
}

H
Hongze Cheng 已提交
862
int32_t schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq) {
D
dapan1121 已提交
863
  int32_t code = 0;
H
Hongze Cheng 已提交
864 865
  int8_t  status = SCH_GET_JOB_STATUS(pJob);

D
dapan1121 已提交
866 867
  switch (type) {
    case SCH_OP_EXEC:
D
dapan1121 已提交
868
      SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
869 870
      if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
        SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
D
dapan1121 已提交
871
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
S
Shengliang Guan 已提交
872 873
        schDirectPostJobRes(pReq, TSDB_CODE_APP_ERROR);
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
874
      }
H
Hongze Cheng 已提交
875

D
dapan1121 已提交
876
      SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
H
Hongze Cheng 已提交
877

D
dapan1121 已提交
878
      pJob->opStatus.syncReq = pReq->syncReq;
D
dapan1121 已提交
879
      SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
880 881
      break;
    case SCH_OP_FETCH:
D
dapan1121 已提交
882
      SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
883 884
      if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
        SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
D
dapan1121 已提交
885
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
S
Shengliang Guan 已提交
886 887
        schDirectPostJobRes(pReq, TSDB_CODE_APP_ERROR);
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
888
      }
H
Hongze Cheng 已提交
889

D
dapan1121 已提交
890
      SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
H
Hongze Cheng 已提交
891

D
dapan1121 已提交
892 893 894
      pJob->userRes.fetchRes = pReq->pFetchRes;
      pJob->userRes.fetchFp = pReq->fetchFp;
      pJob->userRes.cbParam = pReq->cbParam;
H
Hongze Cheng 已提交
895

D
dapan1121 已提交
896
      pJob->opStatus.syncReq = pReq->syncReq;
D
dapan1121 已提交
897
      SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
H
Hongze Cheng 已提交
898

D
dapan1121 已提交
899 900
      if (!SCH_JOB_NEED_FETCH(pJob)) {
        SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
S
Shengliang Guan 已提交
901
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
902 903 904 905
      }

      if (status != JOB_TASK_STATUS_PART_SUCC) {
        SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
906 907
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
H
Hongze Cheng 已提交
908

D
dapan1121 已提交
909 910 911 912 913
      break;
    case SCH_OP_GET_STATUS:
      if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) {
        qDebug("job not initialized or not executable job, refId:0x%" PRIx64, pJob->refId);
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
914
      }
915
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
916 917
    default:
      SCH_JOB_ELOG("unknown operation type %d", type);
S
Shengliang Guan 已提交
918
      SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
919 920
  }

D
dapan1121 已提交
921 922 923 924 925
  if (schJobNeedToStop(pJob, &status)) {
    SCH_JOB_ELOG("abort op %s cause of job need to stop, status:%s", schGetOpStr(type), jobTaskStatusStr(status));
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
  }

D
dapan1121 已提交
926
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
927
}
D
dapan1121 已提交
928

D
dapan1121 已提交
929 930 931 932
void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
  if (pTask) {
    SCH_UNLOCK_TASK(pTask);
  }
D
dapan1121 已提交
933

D
dapan1121 已提交
934
  if (errCode) {
D
dapan1121 已提交
935
    schHandleJobFailure(pJob, errCode);
D
dapan1121 已提交
936
  }
H
Hongze Cheng 已提交
937

D
dapan1121 已提交
938 939 940
  if (pJob) {
    schReleaseJob(pJob->refId);
  }
D
dapan1121 已提交
941 942
}

H
Hongze Cheng 已提交
943
int32_t schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_t rId, uint64_t tId) {
D
dapan1121 已提交
944
  int32_t code = 0;
H
Hongze Cheng 已提交
945
  int8_t  status = 0;
D
dapan1121 已提交
946 947

  SSchTask *pTask = NULL;
H
Hongze Cheng 已提交
948
  SSchJob  *pJob = schAcquireJob(rId);
D
dapan1121 已提交
949 950 951
  if (NULL == pJob) {
    qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId);
    SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST);
D
dapan1121 已提交
952
  }
H
Hongze Cheng 已提交
953

D
dapan1121 已提交
954
  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
955
    SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
D
dapan1121 已提交
956 957 958 959 960 961 962
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
  }

  SCH_ERR_JRET(schGetTaskInJob(pJob, tId, &pTask));

  SCH_LOCK_TASK(pTask);

D
dapan1121 已提交
963 964 965
  *job = pJob;
  *task = pTask;

D
dapan1121 已提交
966
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
967

D
dapan1121 已提交
968 969 970 971 972 973 974 975
_return:

  if (pTask) {
    SCH_UNLOCK_TASK(pTask);
  }
  if (pJob) {
    schReleaseJob(rId);
  }
H
Hongze Cheng 已提交
976

D
dapan1121 已提交
977 978
  SCH_RET(code);
}