schJob.c 29.0 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalog.h"
#include "command.h"
#include "query.h"
D
dapan1121 已提交
19
#include "schInt.h"
D
dapan1121 已提交
20 21 22 23
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"

D
dapan1121 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
  if (TSDB_CODE_SUCCESS == errCode) {
    return;
  }

  int32_t origCode = atomic_load_32(&pJob->errCode);
  if (TSDB_CODE_SUCCESS == origCode) {
    if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) {
      goto _return;
    }

    origCode = atomic_load_32(&pJob->errCode);
  }

  if (NEED_CLIENT_HANDLE_ERROR(origCode)) {
    return;
  }

  if (NEED_CLIENT_HANDLE_ERROR(errCode)) {
    atomic_store_32(&pJob->errCode, errCode);
    goto _return;
  }

  return;

_return:
50
  SCH_JOB_DLOG("job errCode updated to %s", tstrerror(errCode));
D
dapan1121 已提交
51 52
}

D
dapan1121 已提交
53 54
bool schJobDone(SSchJob *pJob) {
  int8_t status = SCH_GET_JOB_STATUS(pJob);
H
Hongze Cheng 已提交
55 56

  return (status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_DROP || status == JOB_TASK_STATUS_SUCC);
D
dapan1121 已提交
57
}
D
dapan1121 已提交
58

D
dapan1121 已提交
59 60 61 62 63 64
FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (pStatus) {
    *pStatus = status;
  }

D
dapan1121 已提交
65 66 67 68
  if (schJobDone(pJob)) {
    return true;
  }

D
dapan1121 已提交
69
  if ((*pJob->chkKillFp)(pJob->chkKillParam)) {
D
dapan1121 已提交
70 71
    schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED);
    return true;
D
dapan1121 已提交
72
  }
D
dapan1121 已提交
73 74

  return false;
D
dapan1121 已提交
75 76
}

D
dapan1121 已提交
77
int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
D
dapan1121 已提交
78 79 80 81 82 83 84 85
  int32_t code = 0;

  int8_t oriStatus = 0;

  while (true) {
    oriStatus = SCH_GET_JOB_STATUS(pJob);

    if (oriStatus == newStatus) {
D
dapan1121 已提交
86
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
87 88 89 90
    }

    switch (oriStatus) {
      case JOB_TASK_STATUS_NULL:
D
dapan1121 已提交
91
        if (newStatus != JOB_TASK_STATUS_INIT) {
S
Shengliang Guan 已提交
92
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
93 94 95
        }

        break;
D
dapan1121 已提交
96 97
      case JOB_TASK_STATUS_INIT:
        if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_DROP) {
S
Shengliang Guan 已提交
98
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
99 100 101
        }

        break;
D
dapan1121 已提交
102 103 104
      case JOB_TASK_STATUS_EXEC:
        if (newStatus != JOB_TASK_STATUS_PART_SUCC && newStatus != JOB_TASK_STATUS_FAIL &&
            newStatus != JOB_TASK_STATUS_DROP) {
S
Shengliang Guan 已提交
105
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
106 107 108
        }

        break;
D
dapan1121 已提交
109 110
      case JOB_TASK_STATUS_PART_SUCC:
        if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC &&
D
dapan1121 已提交
111
            newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC) {
S
Shengliang Guan 已提交
112
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
113 114 115
        }

        break;
D
dapan1121 已提交
116 117 118
      case JOB_TASK_STATUS_SUCC:
      case JOB_TASK_STATUS_FAIL:
        if (newStatus != JOB_TASK_STATUS_DROP) {
S
Shengliang Guan 已提交
119
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
120 121 122
        }

        break;
D
dapan1121 已提交
123
      case JOB_TASK_STATUS_DROP:
D
dapan1121 已提交
124 125 126 127
        SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);

      default:
        SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));
S
Shengliang Guan 已提交
128
        SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
    }

    if (oriStatus != atomic_val_compare_exchange_8(&pJob->status, oriStatus, newStatus)) {
      continue;
    }

    SCH_JOB_DLOG("job status updated from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));

    break;
  }

  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
144 145 146 147 148
  if (TSDB_CODE_SCH_IGNORE_ERROR == code) {
    SCH_JOB_DLOG("ignore job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
  } else {
    SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
  }
D
dapan1121 已提交
149 150 151
  SCH_RET(code);
}

D
dapan1121 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
  for (int32_t i = 0; i < pJob->levelNum; ++i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);

    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
      SSubplan *pPlan = pTask->plan;
      int32_t   childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0;
      int32_t   parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0;

      if (childNum > 0) {
        if (pJob->levelIdx == pLevel->level) {
          SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        pTask->children = taosArrayInit(childNum, POINTER_BYTES);
        if (NULL == pTask->children) {
          SCH_TASK_ELOG("taosArrayInit %d children failed", childNum);
S
Shengliang Guan 已提交
171
          SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184
        }
      }

      for (int32_t n = 0; n < childNum; ++n) {
        SSubplan  *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
        SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
        if (NULL == childTask || NULL == *childTask) {
          SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        if (NULL == taosArrayPush(pTask->children, childTask)) {
          SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
S
Shengliang Guan 已提交
185
          SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
186 187
        }

D
dapan1121 已提交
188
        SCH_TASK_DLOG("children info, the %d child TID 0x%" PRIx64, n, (*childTask)->taskId);
D
dapan1121 已提交
189 190 191 192 193 194 195 196 197 198 199
      }

      if (parentNum > 0) {
        if (0 == pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        pTask->parents = taosArrayInit(parentNum, POINTER_BYTES);
        if (NULL == pTask->parents) {
          SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
S
Shengliang Guan 已提交
200
          SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
        }
      } else {
        if (0 != pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:%d, parentNum:%d", pLevel->level, parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
      }

      for (int32_t n = 0; n < parentNum; ++n) {
        SSubplan  *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
        SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
        if (NULL == parentTask || NULL == *parentTask) {
          SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        if (NULL == taosArrayPush(pTask->parents, parentTask)) {
          SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
S
Shengliang Guan 已提交
219
          SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
220 221
        }

H
Hongze Cheng 已提交
222
        SCH_TASK_DLOG("parents info, the %d parent TID 0x%" PRIx64, n, (*parentTask)->taskId);
D
dapan1121 已提交
223 224 225 226 227 228 229
      }

      SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
    }
  }

  SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
D
dapan1121 已提交
230 231 232 233 234 235
  if (SCH_IS_QUERY_JOB(pJob)) {
    if (pLevel->taskNum > 1) {
      SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }

H
Hongze Cheng 已提交
236
    SSchTask *pTask = taosArrayGet(pLevel->subTasks, 0);
D
dapan1121 已提交
237 238 239
    if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType) {
      pJob->attr.needFetch = true;
    }
D
dapan1121 已提交
240 241 242 243 244
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
245
int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
246
  if (!SCH_IS_DATA_BIND_QRY_TASK(pTask)) {
D
dapan1121 已提交
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
    return TSDB_CODE_SUCCESS;
  }

  taosArrayPush(pJob->dataSrcTasks, &pTask);

  return TSDB_CODE_SUCCESS;
}

int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
  int32_t code = 0;
  pJob->queryId = pDag->queryId;

  if (pDag->numOfSubplans <= 0) {
    SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  pJob->dataSrcTasks = taosArrayInit(pDag->numOfSubplans, POINTER_BYTES);
  if (NULL == pJob->dataSrcTasks) {
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
  if (levelNum <= 0) {
    SCH_JOB_ELOG("invalid level num:%d", levelNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SHashObj *planToTask = taosHashInit(
D
dapan1121 已提交
276
      pDag->numOfSubplans,
D
dapan1121 已提交
277 278 279 280
      taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
      HASH_NO_LOCK);
  if (NULL == planToTask) {
    SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
S
Shengliang Guan 已提交
281
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
282 283 284 285 286
  }

  pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
  if (NULL == pJob->levels) {
    SCH_JOB_ELOG("taosArrayInit %d failed", levelNum);
S
Shengliang Guan 已提交
287
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
288 289 290 291 292 293 294 295 296 297
  }

  pJob->levelNum = levelNum;
  pJob->levelIdx = levelNum - 1;

  SSchLevel      level = {0};
  SNodeListNode *plans = NULL;
  int32_t        taskNum = 0;
  SSchLevel     *pLevel = NULL;

D
dapan1121 已提交
298
  level.status = JOB_TASK_STATUS_INIT;
D
dapan1121 已提交
299 300 301 302

  for (int32_t i = 0; i < levelNum; ++i) {
    if (NULL == taosArrayPush(pJob->levels, &level)) {
      SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i);
S
Shengliang Guan 已提交
303
      SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
    }

    pLevel = taosArrayGet(pJob->levels, i);
    pLevel->level = i;

    plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
    if (NULL == plans) {
      SCH_JOB_ELOG("empty level plan, level:%d", i);
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
    }

    taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
    if (taskNum <= 0) {
      SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
    }

    pLevel->taskNum = taskNum;

    pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
    if (NULL == pLevel->subTasks) {
      SCH_JOB_ELOG("taosArrayInit %d failed", taskNum);
S
Shengliang Guan 已提交
326
      SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
327 328 329 330 331 332 333 334
    }

    for (int32_t n = 0; n < taskNum; ++n) {
      SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);

      SCH_SET_JOB_TYPE(pJob, plan->subplanType);

      SSchTask  task = {0};
D
dapan1121 已提交
335 336
      SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task);
      if (NULL == pTask) {
D
dapan1121 已提交
337
        SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
S
Shengliang Guan 已提交
338
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
339 340
      }

D
dapan1121 已提交
341
      SCH_ERR_JRET(schInitTask(pJob, pTask, plan, pLevel));
342

D
dapan1121 已提交
343
      SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask));
D
dapan1121 已提交
344

D
dapan1121 已提交
345
      if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES)) {
D
dapan1121 已提交
346
        SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
S
Shengliang Guan 已提交
347
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
348 349
      }

D
dapan1121 已提交
350 351
      if (0 != taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) {
        SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d", n);
S
Shengliang Guan 已提交
352
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
353 354
      }

D
dapan1121 已提交
355 356 357
      ++pJob->taskNum;
    }

D
dapan1121 已提交
358
    SCH_JOB_DLOG("level %d initialized, taskNum:%d", i, taskNum);
D
dapan1121 已提交
359 360 361 362 363
  }

  SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));

_return:
H
Hongze Cheng 已提交
364

D
dapan1121 已提交
365 366 367 368 369 370 371
  if (planToTask) {
    taosHashCleanup(planToTask);
  }

  SCH_RET(code);
}

H
Hongze Cheng 已提交
372
int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes) {
D
dapan1121 已提交
373 374
  pRes->code = atomic_load_32(&pJob->errCode);
  pRes->numOfRows = pJob->resNumOfRows;
D
dapan1121 已提交
375 376
  pRes->res = pJob->execRes.res;
  pRes->msgType = pJob->execRes.msgType;
D
dapan1121 已提交
377
  pRes->numOfBytes = pJob->execRes.numOfBytes;
D
dapan1121 已提交
378
  pJob->execRes.res = NULL;
D
dapan1121 已提交
379

D
dapan1121 已提交
380 381
  SCH_JOB_DLOG("execRes dumped, code: %s", tstrerror(pRes->code));

D
dapan1121 已提交
382 383 384
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
385
int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) {
D
dapan1121 已提交
386
  int32_t code = 0;
H
Hongze Cheng 已提交
387

D
dapan1121 已提交
388 389 390
  SCH_LOCK(SCH_WRITE, &pJob->resLock);

  pJob->fetched = true;
H
Hongze Cheng 已提交
391

D
dapan1121 已提交
392 393
  if (pJob->fetchRes && ((SRetrieveTableRsp *)pJob->fetchRes)->completed) {
    SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_SUCC, NULL));
D
dapan1121 已提交
394 395 396
  }

  while (true) {
D
dapan1121 已提交
397 398
    *pData = atomic_load_ptr(&pJob->fetchRes);
    if (*pData != atomic_val_compare_exchange_ptr(&pJob->fetchRes, *pData, NULL)) {
D
dapan1121 已提交
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
      continue;
    }

    break;
  }

  if (NULL == *pData) {
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
    if (rsp) {
      rsp->completed = 1;
    }

    *pData = rsp;
    SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
  }

415
  SCH_JOB_DLOG("fetch done, totalRows:%" PRId64, pJob->resNumOfRows);
D
dapan1121 已提交
416

D
dapan1121 已提交
417 418 419
_return:

  SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
H
Hongze Cheng 已提交
420

D
dapan1121 已提交
421
  return code;
D
dapan1121 已提交
422 423
}

H
Hongze Cheng 已提交
424 425
int32_t schNotifyUserExecRes(SSchJob *pJob) {
  SExecResult *pRes = taosMemoryCalloc(1, sizeof(SExecResult));
D
dapan1121 已提交
426
  if (pRes) {
D
dapan1121 已提交
427
    schDumpJobExecRes(pJob, pRes);
D
dapan1121 已提交
428 429
  }

D
dapan1121 已提交
430
  SCH_JOB_DLOG("sch start to invoke exec cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
431
  (*pJob->userRes.execFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
432
  SCH_JOB_DLOG("sch end from exec cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
433 434 435 436

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
437 438 439
int32_t schNotifyUserFetchRes(SSchJob *pJob) {
  void *pRes = NULL;

D
dapan1121 已提交
440
  schDumpJobFetchRes(pJob, &pRes);
D
dapan1121 已提交
441 442

  SCH_JOB_DLOG("sch start to invoke fetch cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
443
  (*pJob->userRes.fetchFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
444
  SCH_JOB_DLOG("sch end from fetch cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
445 446 447 448

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
449
void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
D
dapan1121 已提交
450
  SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
H
Hongze Cheng 已提交
451

D
dapan1121 已提交
452
  if (SCH_OP_NULL == pJob->opStatus.op) {
D
dapan1121 已提交
453
    SCH_JOB_DLOG("job not in any operation, no need to post job res, status:%s", jobTaskStatusStr(pJob->status));
D
dapan1121 已提交
454
    goto _return;
D
dapan1121 已提交
455
  }
H
Hongze Cheng 已提交
456

D
dapan1121 已提交
457
  if (op && pJob->opStatus.op != op) {
D
dapan1121 已提交
458
    SCH_JOB_ELOG("job in operation %s mis-match with expected %s", schGetOpStr(pJob->opStatus.op), schGetOpStr(op));
D
dapan1121 已提交
459
    goto _return;
D
dapan1121 已提交
460
  }
H
Hongze Cheng 已提交
461

D
dapan1121 已提交
462
  if (SCH_JOB_IN_SYNC_OP(pJob)) {
D
dapan1121 已提交
463
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
464 465
    tsem_post(&pJob->rspSem);
  } else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) {
D
dapan1121 已提交
466
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
467 468
    schNotifyUserExecRes(pJob);
  } else if (SCH_JOB_IN_ASYNC_FETCH_OP(pJob)) {
D
dapan1121 已提交
469
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
470 471
    schNotifyUserFetchRes(pJob);
  } else {
D
dapan1121 已提交
472
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
473 474
    SCH_JOB_ELOG("job not in any operation, status:%s", jobTaskStatusStr(pJob->status));
  }
D
dapan1121 已提交
475 476 477 478 479 480

  return;

_return:

  SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
481 482
}

D
dapan1121 已提交
483
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
484 485 486 487
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    return TSDB_CODE_SCH_IGNORE_ERROR;
  }
  
D
dapan1121 已提交
488
  schUpdateJobErrCode(pJob, errCode);
H
Hongze Cheng 已提交
489

D
dapan1121 已提交
490
  int32_t code = atomic_load_32(&pJob->errCode);
D
dapan1121 已提交
491
  if (code) {
492
    SCH_JOB_DLOG("job failed with error %s", tstrerror(code));
D
dapan1121 已提交
493
  }
D
dapan1121 已提交
494

D
dapan1121 已提交
495
  schPostJobRes(pJob, 0);
D
dapan1121 已提交
496

D
dapan1121 已提交
497
  SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
498 499
}

D
dapan1121 已提交
500
int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode) {
D
dapan1121 已提交
501 502 503 504
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    return TSDB_CODE_SCH_IGNORE_ERROR;
  }

D
dapan1121 已提交
505
  schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, &errCode);
D
dapan1121 已提交
506
  return TSDB_CODE_SCH_IGNORE_ERROR;
D
dapan1121 已提交
507 508
}

H
Hongze Cheng 已提交
509
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { SCH_RET(schProcessOnJobFailure(pJob, errCode)); }
D
dapan1121 已提交
510 511 512 513 514 515 516 517

int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) {
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    return TSDB_CODE_SCH_IGNORE_ERROR;
  }

  schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, &errCode);
  return TSDB_CODE_SCH_IGNORE_ERROR;
D
dapan1121 已提交
518 519
}

H
Hongze Cheng 已提交
520
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
D
dapan1121 已提交
521 522 523 524 525
  if (schChkCurrentOp(pJob, SCH_OP_FETCH, -1)) {
    SCH_ERR_RET(schLaunchFetchTask(pJob));
  } else {
    schPostJobRes(pJob, 0);
  }
D
dapan1121 已提交
526 527 528 529

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
530
void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); }
D
dapan1121 已提交
531

D
dapan1121 已提交
532
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
533
  SCH_TASK_DLOG("got explain rsp, rows:%" PRId64 ", complete:%d", htobe64(pRsp->numOfRows), pRsp->completed);
D
dapan1121 已提交
534

535
  atomic_store_64(&pJob->resNumOfRows, htobe64(pRsp->numOfRows));
D
dapan1121 已提交
536
  atomic_store_ptr(&pJob->fetchRes, pRsp);
D
dapan1121 已提交
537

D
dapan1121 已提交
538
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
D
dapan1121 已提交
539

D
dapan1121 已提交
540
  schProcessOnDataFetched(pJob);
D
dapan1121 已提交
541

D
dapan1121 已提交
542
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
543 544
}

D
dapan1121 已提交
545
int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
546 547 548 549 550
  if (!SCH_IS_QUERY_JOB(pJob)) {
    return TSDB_CODE_SUCCESS;
  }

  SSchLevel *pLevel = pTask->level;
H
Hongze Cheng 已提交
551
  int32_t    doneNum = atomic_add_fetch_32(&pLevel->taskDoneNum, 1);
D
dapan1121 已提交
552 553 554 555 556 557 558 559 560 561
  if (doneNum == pLevel->taskNum) {
    pJob->levelIdx--;

    pLevel = taosArrayGet(pJob->levels, pJob->levelIdx);
    for (int32_t i = 0; i < pLevel->taskNum; ++i) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);

      if (pTask->children && taosArrayGetSize(pTask->children) > 0) {
        continue;
      }
H
Hongze Cheng 已提交
562

D
dapan1121 已提交
563 564 565 566 567 568 569
      SCH_ERR_RET(schLaunchTask(pJob, pTask));
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
570
int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) {
D
dapan1121 已提交
571
  if (rsp->tbFName[0]) {
D
dapan1121 已提交
572
    SCH_LOCK(SCH_WRITE, &pJob->resLock);
H
Hongze Cheng 已提交
573

D
dapan1121 已提交
574 575 576
    if (NULL == pJob->execRes.res) {
      pJob->execRes.res = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
      if (NULL == pJob->execRes.res) {
H
Hongze Cheng 已提交
577
        SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
D
dapan1121 已提交
578 579 580 581 582 583 584 585 586
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
      }
    }

    STbVerInfo tbInfo;
    strcpy(tbInfo.tbFName, rsp->tbFName);
    tbInfo.sversion = rsp->sversion;
    tbInfo.tversion = rsp->tversion;

D
dapan1121 已提交
587
    taosArrayPush((SArray *)pJob->execRes.res, &tbInfo);
D
dapan1121 已提交
588
    pJob->execRes.msgType = TDMT_SCH_QUERY;
D
dapan1121 已提交
589 590

    SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
D
dapan1121 已提交
591 592 593 594 595
  }

  return TSDB_CODE_SUCCESS;
}

596
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
D
dapan1121 已提交
597
  schGetTaskFromList(pJob->taskList, taskId, pTask);
598
  if (NULL == *pTask) {
D
dapan1121 已提交
599
    SCH_JOB_ELOG("task not found in job task list, taskId:0x%" PRIx64, taskId);
D
dapan1121 已提交
600
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
601 602 603 604 605
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
606 607
int32_t schLaunchJob(SSchJob *pJob) {
  if (EXPLAIN_MODE_STATIC == pJob->attr.explainMode) {
D
dapan1121 已提交
608
    SCH_ERR_RET(qExecStaticExplain(pJob->pDag, (SRetrieveTableRsp **)&pJob->fetchRes));
D
dapan1121 已提交
609
    SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
D
dapan1121 已提交
610
  } else {
D
dapan1121 已提交
611 612
    SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
    SCH_ERR_RET(schLaunchLevelTasks(pJob, level));
D
dapan1121 已提交
613 614 615 616 617 618 619
  }

  return TSDB_CODE_SUCCESS;
}

void schDropJobAllTasks(SSchJob *pJob) {
  schDropTaskInHashList(pJob, pJob->execTasks);
H
Hongze Cheng 已提交
620 621
  //  schDropTaskInHashList(pJob, pJob->succTasks);
  //  schDropTaskInHashList(pJob, pJob->failTasks);
D
dapan1121 已提交
622 623 624 625 626 627 628 629 630 631 632
}

void schFreeJobImpl(void *job) {
  if (NULL == job) {
    return;
  }

  SSchJob *pJob = job;
  uint64_t queryId = pJob->queryId;
  int64_t  refId = pJob->refId;

D
dapan1121 已提交
633 634
  qDebug("QID:0x%" PRIx64 " begin to free sch job, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);

D
dapan1121 已提交
635 636 637 638 639 640 641 642 643
  schDropJobAllTasks(pJob);

  int32_t numOfLevels = taosArrayGetSize(pJob->levels);
  for (int32_t i = 0; i < numOfLevels; ++i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);

    int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
    for (int32_t j = 0; j < numOfTasks; ++j) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
D
dapan1121 已提交
644
      schFreeTask(pJob, pTask);
D
dapan1121 已提交
645 646 647 648 649 650 651 652
    }

    taosArrayDestroy(pLevel->subTasks);
  }

  schFreeFlowCtrl(pJob);

  taosHashCleanup(pJob->execTasks);
H
Hongze Cheng 已提交
653 654
  //  taosHashCleanup(pJob->failTasks);
  //  taosHashCleanup(pJob->succTasks);
D
dapan1121 已提交
655
  taosHashCleanup(pJob->taskList);
H
Hongze Cheng 已提交
656

D
dapan1121 已提交
657 658 659 660 661 662
  taosArrayDestroy(pJob->levels);
  taosArrayDestroy(pJob->nodeList);
  taosArrayDestroy(pJob->dataSrcTasks);

  qExplainFreeCtx(pJob->explainCtx);

D
dapan1121 已提交
663
  destroyQueryExecRes(&pJob->execRes);
D
dapan1121 已提交
664

D
dapan1121 已提交
665
  qDestroyQueryPlan(pJob->pDag);
666
  nodesReleaseAllocatorWeakRef(pJob->allocatorRefId);
D
dapan1121 已提交
667

D
dapan1121 已提交
668
  taosMemoryFreeClear(pJob->userRes.execRes);
D
dapan1121 已提交
669
  taosMemoryFreeClear(pJob->fetchRes);
D
dapan1121 已提交
670
  taosMemoryFreeClear(pJob->sql);
D
dapan1121 已提交
671
  tsem_destroy(&pJob->rspSem);
D
dapan1121 已提交
672
  taosMemoryFree(pJob);
D
dapan1121 已提交
673

D
dapan1121 已提交
674 675 676 677
  int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
  if (jobNum == 0) {
    schCloseJobRef();
  }
D
dapan1121 已提交
678 679

  qDebug("QID:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
D
dapan1121 已提交
680 681
}

D
dapan1121 已提交
682
int32_t schJobFetchRows(SSchJob *pJob) {
H
Hongze Cheng 已提交
683
  int32_t code = 0;
D
dapan1121 已提交
684 685

  if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
D
dapan1121 已提交
686
    SCH_ERR_RET(schLaunchFetchTask(pJob));
H
Hongze Cheng 已提交
687

D
dapan1121 已提交
688
    if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
D
dapan1121 已提交
689 690
      SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
      tsem_wait(&pJob->rspSem);
H
Hongze Cheng 已提交
691
      SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
D
dapan1121 已提交
692 693
    }
  } else {
D
dapan1121 已提交
694
    if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
H
Hongze Cheng 已提交
695
      SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
D
dapan1121 已提交
696 697 698
    } else {
      schPostJobRes(pJob, SCH_OP_FETCH);
    }
D
dapan1121 已提交
699 700
  }

D
dapan1121 已提交
701
  SCH_RET(code);
D
dapan1121 已提交
702 703
}

D
dapan1121 已提交
704
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
D
dapan1121 已提交
705 706 707 708 709
  int32_t  code = 0;
  int64_t  refId = -1;
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
    qError("QID:0x%" PRIx64 " calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob));
S
Shengliang Guan 已提交
710
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
711 712 713
  }

  pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
H
Haojun Liao 已提交
714
  pJob->attr.localExec = pReq->localReq;
D
dapan1121 已提交
715
  pJob->conn = *pReq->pConn;
D
dapan1121 已提交
716 717 718
  if (pReq->sql) {
    pJob->sql = strdup(pReq->sql);
  }
D
dapan1121 已提交
719
  pJob->pDag = pReq->pDag;
720
  pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId);
D
dapan1121 已提交
721 722 723
  pJob->chkKillFp = pReq->chkKillFp;
  pJob->chkKillParam = pReq->chkKillParam;
  pJob->userRes.execFp = pReq->execFp;
D
dapan1121 已提交
724
  pJob->userRes.cbParam = pReq->cbParam;
D
dapan1121 已提交
725 726 727 728

  if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
    qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
  } else {
H
Haojun Liao 已提交
729
    pJob->nodeList = taosArrayDup(pReq->pNodeList, NULL);
D
dapan1121 已提交
730
  }
H
Hongze Cheng 已提交
731 732 733

  pJob->taskList = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                                HASH_ENTRY_LOCK);
D
dapan1121 已提交
734 735
  if (NULL == pJob->taskList) {
    SCH_JOB_ELOG("taosHashInit %d taskList failed", pReq->pDag->numOfSubplans);
S
Shengliang Guan 已提交
736
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
737 738 739 740 741 742 743 744
  }

  SCH_ERR_JRET(schValidateAndBuildJob(pReq->pDag, pJob));

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_JRET(qExecExplainBegin(pReq->pDag, &pJob->explainCtx, pReq->startTs));
  }

H
Hongze Cheng 已提交
745 746
  pJob->execTasks = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                                 HASH_ENTRY_LOCK);
D
dapan1121 已提交
747 748
  if (NULL == pJob->execTasks) {
    SCH_JOB_ELOG("taosHashInit %d execTasks failed", pReq->pDag->numOfSubplans);
S
Shengliang Guan 已提交
749
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
750 751
  }

D
dapan1121 已提交
752 753 754 755
  if (tsem_init(&pJob->rspSem, 0, 0)) {
    SCH_JOB_ELOG("tsem_init failed, errno:%d", errno);
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
756

D
dapan1121 已提交
757 758
  pJob->refId = taosAddRef(schMgmt.jobRef, pJob);
  if (pJob->refId < 0) {
D
dapan1121 已提交
759 760 761 762 763 764
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
    SCH_ERR_JRET(terrno);
  }

  atomic_add_fetch_32(&schMgmt.jobNum, 1);

D
dapan1121 已提交
765
  *pJobId = pJob->refId;
D
dapan1121 已提交
766

H
Hongze Cheng 已提交
767
  SCH_JOB_DLOG("job refId:0x%" PRIx64 " created", pJob->refId);
D
dapan1121 已提交
768 769 770 771 772 773 774

  return TSDB_CODE_SUCCESS;

_return:

  if (NULL == pJob) {
    qDestroyQueryPlan(pReq->pDag);
D
dapan1121 已提交
775
  } else if (pJob->refId < 0) {
D
dapan1121 已提交
776 777
    schFreeJobImpl(pJob);
  } else {
D
dapan1121 已提交
778
    taosRemoveRef(schMgmt.jobRef, pJob->refId);
D
dapan1121 已提交
779
  }
H
Hongze Cheng 已提交
780

D
dapan1121 已提交
781 782 783
  SCH_RET(code);
}

D
dapan1121 已提交
784 785
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
  int32_t code = 0;
H
Hongze Cheng 已提交
786
  qDebug("QID:0x%" PRIx64 " sch job refId 0x%" PRIx64 " started", pReq->pDag->queryId, pJob->refId);
D
dapan1121 已提交
787

D
dapan1121 已提交
788
  SCH_ERR_RET(schLaunchJob(pJob));
H
Hongze Cheng 已提交
789

D
dapan1121 已提交
790 791 792
  if (pReq->syncReq) {
    SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
    tsem_wait(&pJob->rspSem);
D
dapan1121 已提交
793 794 795
  }

  SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
H
Hongze Cheng 已提交
796

D
dapan1121 已提交
797
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
798 799
}

H
Hongze Cheng 已提交
800
void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode) {
D
dapan1121 已提交
801
  if (NULL == pReq || pReq->syncReq) {
D
dapan1121 已提交
802 803
    return;
  }
H
Hongze Cheng 已提交
804

D
dapan1121 已提交
805 806 807 808 809 810
  if (pReq->execFp) {
    (*pReq->execFp)(NULL, pReq->cbParam, errCode);
  } else if (pReq->fetchFp) {
    (*pReq->fetchFp)(NULL, pReq->cbParam, errCode);
  }
}
D
dapan1121 已提交
811

D
dapan1121 已提交
812 813
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) {
  bool r = false;
D
dapan1121 已提交
814
  SCH_LOCK(SCH_READ, &pJob->opStatus.lock);
D
dapan1121 已提交
815 816 817 818 819
  if (sync >= 0) {
    r = (pJob->opStatus.op == op) && (pJob->opStatus.syncReq == sync);
  } else {
    r = (pJob->opStatus.op == op);
  }
D
dapan1121 已提交
820 821 822 823 824
  SCH_UNLOCK(SCH_READ, &pJob->opStatus.lock);

  return r;
}

H
Hongze Cheng 已提交
825
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode) {
D
dapan1121 已提交
826
  int32_t op = 0;
H
Hongze Cheng 已提交
827

D
dapan1121 已提交
828 829
  switch (type) {
    case SCH_OP_EXEC:
D
dapan1121 已提交
830
      if (pReq && pReq->syncReq) {
D
dapan1121 已提交
831
        SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
832 833
        op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
        if (SCH_OP_NULL == op || op != type) {
H
Hongze Cheng 已提交
834 835
          SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
                       jobTaskStatusStr(pJob->status));
D
dapan1121 已提交
836
        }
D
dapan1121 已提交
837
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
838 839 840 841
        schDumpJobExecRes(pJob, pReq->pExecRes);
      }
      break;
    case SCH_OP_FETCH:
D
dapan1121 已提交
842
      if (pReq && pReq->syncReq) {
D
dapan1121 已提交
843
        SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
844 845
        op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
        if (SCH_OP_NULL == op || op != type) {
H
Hongze Cheng 已提交
846 847
          SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
                       jobTaskStatusStr(pJob->status));
D
dapan1121 已提交
848
        }
D
dapan1121 已提交
849
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
850 851 852 853 854 855 856
      }
      break;
    case SCH_OP_GET_STATUS:
      errCode = TSDB_CODE_SUCCESS;
      break;
    default:
      break;
D
dapan1121 已提交
857 858
  }

D
dapan1121 已提交
859
  if (errCode) {
D
dapan1121 已提交
860
    schHandleJobFailure(pJob, errCode);
D
dapan1121 已提交
861 862 863
  }

  SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode));
D
dapan1121 已提交
864 865
}

H
Hongze Cheng 已提交
866
int32_t schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq) {
D
dapan1121 已提交
867
  int32_t code = 0;
H
Hongze Cheng 已提交
868 869
  int8_t  status = SCH_GET_JOB_STATUS(pJob);

D
dapan1121 已提交
870 871
  switch (type) {
    case SCH_OP_EXEC:
D
dapan1121 已提交
872
      SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
873 874
      if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
        SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
D
dapan1121 已提交
875
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
S
Shengliang Guan 已提交
876 877
        schDirectPostJobRes(pReq, TSDB_CODE_APP_ERROR);
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
878
      }
H
Hongze Cheng 已提交
879

D
dapan1121 已提交
880
      SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
H
Hongze Cheng 已提交
881

D
dapan1121 已提交
882
      pJob->opStatus.syncReq = pReq->syncReq;
D
dapan1121 已提交
883
      SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
884 885
      break;
    case SCH_OP_FETCH:
D
dapan1121 已提交
886
      SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
887 888
      if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
        SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
D
dapan1121 已提交
889
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
S
Shengliang Guan 已提交
890 891
        schDirectPostJobRes(pReq, TSDB_CODE_APP_ERROR);
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
892
      }
H
Hongze Cheng 已提交
893

D
dapan1121 已提交
894
      SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
H
Hongze Cheng 已提交
895

D
dapan1121 已提交
896 897 898
      pJob->userRes.fetchRes = pReq->pFetchRes;
      pJob->userRes.fetchFp = pReq->fetchFp;
      pJob->userRes.cbParam = pReq->cbParam;
H
Hongze Cheng 已提交
899

D
dapan1121 已提交
900
      pJob->opStatus.syncReq = pReq->syncReq;
D
dapan1121 已提交
901
      SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
H
Hongze Cheng 已提交
902

D
dapan1121 已提交
903 904
      if (!SCH_JOB_NEED_FETCH(pJob)) {
        SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
S
Shengliang Guan 已提交
905
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
906 907 908 909
      }

      if (status != JOB_TASK_STATUS_PART_SUCC) {
        SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
910 911
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
H
Hongze Cheng 已提交
912

D
dapan1121 已提交
913 914 915 916 917
      break;
    case SCH_OP_GET_STATUS:
      if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) {
        qDebug("job not initialized or not executable job, refId:0x%" PRIx64, pJob->refId);
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
918
      }
919
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
920 921
    default:
      SCH_JOB_ELOG("unknown operation type %d", type);
S
Shengliang Guan 已提交
922
      SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
923 924
  }

D
dapan1121 已提交
925 926 927 928 929
  if (schJobNeedToStop(pJob, &status)) {
    SCH_JOB_ELOG("abort op %s cause of job need to stop, status:%s", schGetOpStr(type), jobTaskStatusStr(status));
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
  }

D
dapan1121 已提交
930
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
931
}
D
dapan1121 已提交
932

D
dapan1121 已提交
933 934 935 936
void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
  if (pTask) {
    SCH_UNLOCK_TASK(pTask);
  }
D
dapan1121 已提交
937

D
dapan1121 已提交
938
  if (errCode) {
D
dapan1121 已提交
939
    schHandleJobFailure(pJob, errCode);
D
dapan1121 已提交
940
  }
H
Hongze Cheng 已提交
941

D
dapan1121 已提交
942 943 944
  if (pJob) {
    schReleaseJob(pJob->refId);
  }
D
dapan1121 已提交
945 946
}

H
Hongze Cheng 已提交
947
int32_t schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_t rId, uint64_t tId) {
D
dapan1121 已提交
948
  int32_t code = 0;
H
Hongze Cheng 已提交
949
  int8_t  status = 0;
D
dapan1121 已提交
950 951

  SSchTask *pTask = NULL;
H
Hongze Cheng 已提交
952
  SSchJob  *pJob = schAcquireJob(rId);
D
dapan1121 已提交
953 954 955
  if (NULL == pJob) {
    qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId);
    SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST);
D
dapan1121 已提交
956
  }
H
Hongze Cheng 已提交
957

D
dapan1121 已提交
958
  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
959
    SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
D
dapan1121 已提交
960 961 962 963 964 965 966
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
  }

  SCH_ERR_JRET(schGetTaskInJob(pJob, tId, &pTask));

  SCH_LOCK_TASK(pTask);

D
dapan1121 已提交
967 968 969
  *job = pJob;
  *task = pTask;

D
dapan1121 已提交
970
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
971

D
dapan1121 已提交
972 973 974 975 976 977 978 979
_return:

  if (pTask) {
    SCH_UNLOCK_TASK(pTask);
  }
  if (pJob) {
    schReleaseJob(rId);
  }
H
Hongze Cheng 已提交
980

D
dapan1121 已提交
981 982
  SCH_RET(code);
}