schJob.c 29.1 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalog.h"
#include "command.h"
#include "query.h"
D
dapan1121 已提交
19
#include "schInt.h"
D
dapan1121 已提交
20 21 22 23
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"

D
dapan1121 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
  if (TSDB_CODE_SUCCESS == errCode) {
    return;
  }

  int32_t origCode = atomic_load_32(&pJob->errCode);
  if (TSDB_CODE_SUCCESS == origCode) {
    if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) {
      goto _return;
    }

    origCode = atomic_load_32(&pJob->errCode);
  }

  if (NEED_CLIENT_HANDLE_ERROR(origCode)) {
    return;
  }

  if (NEED_CLIENT_HANDLE_ERROR(errCode)) {
    atomic_store_32(&pJob->errCode, errCode);
    goto _return;
  }

  return;

_return:
50
  SCH_JOB_DLOG("job errCode updated to %s", tstrerror(errCode));
D
dapan1121 已提交
51 52
}

D
dapan1121 已提交
53 54
bool schJobDone(SSchJob *pJob) {
  int8_t status = SCH_GET_JOB_STATUS(pJob);
H
Hongze Cheng 已提交
55 56

  return (status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_DROP || status == JOB_TASK_STATUS_SUCC);
D
dapan1121 已提交
57
}
D
dapan1121 已提交
58

D
dapan1121 已提交
59 60 61 62 63 64
FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (pStatus) {
    *pStatus = status;
  }

D
dapan1121 已提交
65 66 67 68
  if (schJobDone(pJob)) {
    return true;
  }

D
dapan1121 已提交
69
  if ((*pJob->chkKillFp)(pJob->chkKillParam)) {
D
dapan1121 已提交
70 71
    schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED);
    return true;
D
dapan1121 已提交
72
  }
D
dapan1121 已提交
73 74

  return false;
D
dapan1121 已提交
75 76
}

D
dapan1121 已提交
77
int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
D
dapan1121 已提交
78 79 80 81 82 83 84 85
  int32_t code = 0;

  int8_t oriStatus = 0;

  while (true) {
    oriStatus = SCH_GET_JOB_STATUS(pJob);

    if (oriStatus == newStatus) {
D
dapan1121 已提交
86
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
87 88 89 90
    }

    switch (oriStatus) {
      case JOB_TASK_STATUS_NULL:
D
dapan1121 已提交
91
        if (newStatus != JOB_TASK_STATUS_INIT) {
S
Shengliang Guan 已提交
92
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
93 94 95
        }

        break;
D
dapan1121 已提交
96 97
      case JOB_TASK_STATUS_INIT:
        if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_DROP) {
S
Shengliang Guan 已提交
98
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
99 100 101
        }

        break;
D
dapan1121 已提交
102 103 104
      case JOB_TASK_STATUS_EXEC:
        if (newStatus != JOB_TASK_STATUS_PART_SUCC && newStatus != JOB_TASK_STATUS_FAIL &&
            newStatus != JOB_TASK_STATUS_DROP) {
S
Shengliang Guan 已提交
105
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
106 107 108
        }

        break;
D
dapan1121 已提交
109 110
      case JOB_TASK_STATUS_PART_SUCC:
        if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC &&
D
dapan1121 已提交
111
            newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC) {
S
Shengliang Guan 已提交
112
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
113 114 115
        }

        break;
D
dapan1121 已提交
116 117 118
      case JOB_TASK_STATUS_SUCC:
      case JOB_TASK_STATUS_FAIL:
        if (newStatus != JOB_TASK_STATUS_DROP) {
S
Shengliang Guan 已提交
119
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
120 121 122
        }

        break;
D
dapan1121 已提交
123
      case JOB_TASK_STATUS_DROP:
D
dapan1121 已提交
124 125 126 127
        SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);

      default:
        SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));
S
Shengliang Guan 已提交
128
        SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
    }

    if (oriStatus != atomic_val_compare_exchange_8(&pJob->status, oriStatus, newStatus)) {
      continue;
    }

    SCH_JOB_DLOG("job status updated from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));

    break;
  }

  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
144 145 146 147 148
  if (TSDB_CODE_SCH_IGNORE_ERROR == code) {
    SCH_JOB_DLOG("ignore job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
  } else {
    SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
  }
D
dapan1121 已提交
149 150 151
  SCH_RET(code);
}

D
dapan1121 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
  for (int32_t i = 0; i < pJob->levelNum; ++i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);

    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
      SSubplan *pPlan = pTask->plan;
      int32_t   childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0;
      int32_t   parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0;

      if (childNum > 0) {
        if (pJob->levelIdx == pLevel->level) {
          SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        pTask->children = taosArrayInit(childNum, POINTER_BYTES);
        if (NULL == pTask->children) {
          SCH_TASK_ELOG("taosArrayInit %d children failed", childNum);
S
Shengliang Guan 已提交
171
          SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184
        }
      }

      for (int32_t n = 0; n < childNum; ++n) {
        SSubplan  *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
        SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
        if (NULL == childTask || NULL == *childTask) {
          SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        if (NULL == taosArrayPush(pTask->children, childTask)) {
          SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
S
Shengliang Guan 已提交
185
          SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
186 187
        }

D
dapan1121 已提交
188
        SCH_TASK_DLOG("children info, the %d child TID 0x%" PRIx64, n, (*childTask)->taskId);
D
dapan1121 已提交
189 190 191 192 193 194 195 196 197 198 199
      }

      if (parentNum > 0) {
        if (0 == pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        pTask->parents = taosArrayInit(parentNum, POINTER_BYTES);
        if (NULL == pTask->parents) {
          SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
S
Shengliang Guan 已提交
200
          SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
        }
      } else {
        if (0 != pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:%d, parentNum:%d", pLevel->level, parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
      }

      for (int32_t n = 0; n < parentNum; ++n) {
        SSubplan  *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
        SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
        if (NULL == parentTask || NULL == *parentTask) {
          SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        if (NULL == taosArrayPush(pTask->parents, parentTask)) {
          SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
S
Shengliang Guan 已提交
219
          SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
220 221
        }

H
Hongze Cheng 已提交
222
        SCH_TASK_DLOG("parents info, the %d parent TID 0x%" PRIx64, n, (*parentTask)->taskId);
D
dapan1121 已提交
223 224 225 226 227 228 229
      }

      SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
    }
  }

  SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
D
dapan1121 已提交
230 231 232 233 234 235
  if (SCH_IS_QUERY_JOB(pJob)) {
    if (pLevel->taskNum > 1) {
      SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }

H
Hongze Cheng 已提交
236
    SSchTask *pTask = taosArrayGet(pLevel->subTasks, 0);
X
Xiaoyu Wang 已提交
237
    if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType || EXPLAIN_MODE_DISABLE != pJob->attr.explainMode) {
D
dapan1121 已提交
238 239
      pJob->attr.needFetch = true;
    }
D
dapan1121 已提交
240 241 242 243 244
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
245
int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
246
  if (!SCH_IS_DATA_BIND_QRY_TASK(pTask)) {
D
dapan1121 已提交
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
    return TSDB_CODE_SUCCESS;
  }

  taosArrayPush(pJob->dataSrcTasks, &pTask);

  return TSDB_CODE_SUCCESS;
}

int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
  int32_t code = 0;
  pJob->queryId = pDag->queryId;

  if (pDag->numOfSubplans <= 0) {
    SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  pJob->dataSrcTasks = taosArrayInit(pDag->numOfSubplans, POINTER_BYTES);
  if (NULL == pJob->dataSrcTasks) {
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
  if (levelNum <= 0) {
    SCH_JOB_ELOG("invalid level num:%d", levelNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SHashObj *planToTask = taosHashInit(
D
dapan1121 已提交
276
      pDag->numOfSubplans,
D
dapan1121 已提交
277 278 279 280
      taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
      HASH_NO_LOCK);
  if (NULL == planToTask) {
    SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
S
Shengliang Guan 已提交
281
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
282 283 284 285 286
  }

  pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
  if (NULL == pJob->levels) {
    SCH_JOB_ELOG("taosArrayInit %d failed", levelNum);
S
Shengliang Guan 已提交
287
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
288 289 290 291 292 293 294 295 296 297
  }

  pJob->levelNum = levelNum;
  pJob->levelIdx = levelNum - 1;

  SSchLevel      level = {0};
  SNodeListNode *plans = NULL;
  int32_t        taskNum = 0;
  SSchLevel     *pLevel = NULL;

D
dapan1121 已提交
298
  level.status = JOB_TASK_STATUS_INIT;
D
dapan1121 已提交
299 300 301 302

  for (int32_t i = 0; i < levelNum; ++i) {
    if (NULL == taosArrayPush(pJob->levels, &level)) {
      SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i);
S
Shengliang Guan 已提交
303
      SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
    }

    pLevel = taosArrayGet(pJob->levels, i);
    pLevel->level = i;

    plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
    if (NULL == plans) {
      SCH_JOB_ELOG("empty level plan, level:%d", i);
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
    }

    taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
    if (taskNum <= 0) {
      SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
    }

    pLevel->taskNum = taskNum;

    pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
    if (NULL == pLevel->subTasks) {
      SCH_JOB_ELOG("taosArrayInit %d failed", taskNum);
S
Shengliang Guan 已提交
326
      SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
327 328 329 330 331 332 333 334
    }

    for (int32_t n = 0; n < taskNum; ++n) {
      SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);

      SCH_SET_JOB_TYPE(pJob, plan->subplanType);

      SSchTask  task = {0};
D
dapan1121 已提交
335 336
      SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task);
      if (NULL == pTask) {
D
dapan1121 已提交
337
        SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
S
Shengliang Guan 已提交
338
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
339 340
      }

D
dapan1121 已提交
341
      SCH_ERR_JRET(schInitTask(pJob, pTask, plan, pLevel));
342

D
dapan1121 已提交
343
      SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask));
D
dapan1121 已提交
344

D
dapan1121 已提交
345
      if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES)) {
D
dapan1121 已提交
346
        SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
S
Shengliang Guan 已提交
347
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
348 349
      }

D
dapan1121 已提交
350 351
      if (0 != taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) {
        SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d", n);
S
Shengliang Guan 已提交
352
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
353 354
      }

D
dapan1121 已提交
355 356 357
      ++pJob->taskNum;
    }

D
dapan1121 已提交
358
    SCH_JOB_DLOG("level %d initialized, taskNum:%d", i, taskNum);
D
dapan1121 已提交
359 360 361 362 363
  }

  SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));

_return:
H
Hongze Cheng 已提交
364

D
dapan1121 已提交
365 366 367 368 369 370 371
  if (planToTask) {
    taosHashCleanup(planToTask);
  }

  SCH_RET(code);
}

H
Hongze Cheng 已提交
372
int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes) {
D
dapan1121 已提交
373 374
  pRes->code = atomic_load_32(&pJob->errCode);
  pRes->numOfRows = pJob->resNumOfRows;
D
dapan1121 已提交
375 376
  pRes->res = pJob->execRes.res;
  pRes->msgType = pJob->execRes.msgType;
D
dapan1121 已提交
377
  pRes->numOfBytes = pJob->execRes.numOfBytes;
D
dapan1121 已提交
378
  pJob->execRes.res = NULL;
D
dapan1121 已提交
379

D
dapan1121 已提交
380 381
  SCH_JOB_DLOG("execRes dumped, code: %s", tstrerror(pRes->code));

D
dapan1121 已提交
382 383 384
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
385
int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) {
D
dapan1121 已提交
386
  int32_t code = 0;
H
Hongze Cheng 已提交
387

D
dapan1121 已提交
388 389 390
  SCH_LOCK(SCH_WRITE, &pJob->resLock);

  pJob->fetched = true;
H
Hongze Cheng 已提交
391

D
dapan1121 已提交
392 393
  if (pJob->fetchRes && ((SRetrieveTableRsp *)pJob->fetchRes)->completed) {
    SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_SUCC, NULL));
D
dapan1121 已提交
394 395 396
  }

  while (true) {
D
dapan1121 已提交
397 398
    *pData = atomic_load_ptr(&pJob->fetchRes);
    if (*pData != atomic_val_compare_exchange_ptr(&pJob->fetchRes, *pData, NULL)) {
D
dapan1121 已提交
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
      continue;
    }

    break;
  }

  if (NULL == *pData) {
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
    if (rsp) {
      rsp->completed = 1;
    }

    *pData = rsp;
    SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
  }

415
  SCH_JOB_DLOG("fetch done, totalRows:%" PRId64, pJob->resNumOfRows);
D
dapan1121 已提交
416

D
dapan1121 已提交
417 418 419
_return:

  SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
H
Hongze Cheng 已提交
420

D
dapan1121 已提交
421
  return code;
D
dapan1121 已提交
422 423
}

H
Hongze Cheng 已提交
424 425
int32_t schNotifyUserExecRes(SSchJob *pJob) {
  SExecResult *pRes = taosMemoryCalloc(1, sizeof(SExecResult));
D
dapan1121 已提交
426
  if (pRes) {
D
dapan1121 已提交
427
    schDumpJobExecRes(pJob, pRes);
D
dapan1121 已提交
428 429
  }

D
dapan1121 已提交
430
  SCH_JOB_DLOG("sch start to invoke exec cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
431
  (*pJob->userRes.execFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
432
  SCH_JOB_DLOG("sch end from exec cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
433 434 435 436

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
437 438 439
int32_t schNotifyUserFetchRes(SSchJob *pJob) {
  void *pRes = NULL;

D
dapan1121 已提交
440
  schDumpJobFetchRes(pJob, &pRes);
D
dapan1121 已提交
441 442

  SCH_JOB_DLOG("sch start to invoke fetch cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
443
  (*pJob->userRes.fetchFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
444
  SCH_JOB_DLOG("sch end from fetch cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
445 446 447 448

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
449
void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
D
dapan1121 已提交
450
  SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
H
Hongze Cheng 已提交
451

D
dapan1121 已提交
452
  if (SCH_OP_NULL == pJob->opStatus.op) {
D
dapan1121 已提交
453
    SCH_JOB_DLOG("job not in any operation, no need to post job res, status:%s", jobTaskStatusStr(pJob->status));
D
dapan1121 已提交
454
    goto _return;
D
dapan1121 已提交
455
  }
H
Hongze Cheng 已提交
456

D
dapan1121 已提交
457
  if (op && pJob->opStatus.op != op) {
D
dapan1121 已提交
458
    SCH_JOB_ELOG("job in operation %s mis-match with expected %s", schGetOpStr(pJob->opStatus.op), schGetOpStr(op));
D
dapan1121 已提交
459
    goto _return;
D
dapan1121 已提交
460
  }
H
Hongze Cheng 已提交
461

D
dapan1121 已提交
462
  if (SCH_JOB_IN_SYNC_OP(pJob)) {
D
dapan1121 已提交
463
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
464 465
    tsem_post(&pJob->rspSem);
  } else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) {
D
dapan1121 已提交
466
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
467 468
    schNotifyUserExecRes(pJob);
  } else if (SCH_JOB_IN_ASYNC_FETCH_OP(pJob)) {
D
dapan1121 已提交
469
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
470 471
    schNotifyUserFetchRes(pJob);
  } else {
D
dapan1121 已提交
472
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
473 474
    SCH_JOB_ELOG("job not in any operation, status:%s", jobTaskStatusStr(pJob->status));
  }
D
dapan1121 已提交
475 476 477 478 479 480

  return;

_return:

  SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
481 482
}

D
dapan1121 已提交
483
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
484 485 486
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    return TSDB_CODE_SCH_IGNORE_ERROR;
  }
X
Xiaoyu Wang 已提交
487

D
dapan1121 已提交
488
  schUpdateJobErrCode(pJob, errCode);
H
Hongze Cheng 已提交
489

D
dapan1121 已提交
490
  int32_t code = atomic_load_32(&pJob->errCode);
D
dapan1121 已提交
491
  if (code) {
492
    SCH_JOB_DLOG("job failed with error %s", tstrerror(code));
D
dapan1121 已提交
493
  }
D
dapan1121 已提交
494

D
dapan1121 已提交
495
  schPostJobRes(pJob, 0);
D
dapan1121 已提交
496

D
dapan1121 已提交
497
  SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
498 499
}

D
dapan1121 已提交
500
int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode) {
D
dapan1121 已提交
501 502 503 504
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    return TSDB_CODE_SCH_IGNORE_ERROR;
  }

D
dapan1121 已提交
505
  schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, &errCode);
D
dapan1121 已提交
506
  return TSDB_CODE_SCH_IGNORE_ERROR;
D
dapan1121 已提交
507 508
}

H
Hongze Cheng 已提交
509
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { SCH_RET(schProcessOnJobFailure(pJob, errCode)); }
D
dapan1121 已提交
510 511 512 513 514 515 516 517

int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) {
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    return TSDB_CODE_SCH_IGNORE_ERROR;
  }

  schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, &errCode);
  return TSDB_CODE_SCH_IGNORE_ERROR;
D
dapan1121 已提交
518 519
}

H
Hongze Cheng 已提交
520
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
D
dapan1121 已提交
521 522 523 524 525
  if (schChkCurrentOp(pJob, SCH_OP_FETCH, -1)) {
    SCH_ERR_RET(schLaunchFetchTask(pJob));
  } else {
    schPostJobRes(pJob, 0);
  }
D
dapan1121 已提交
526 527 528 529

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
530
void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); }
D
dapan1121 已提交
531

D
dapan1121 已提交
532
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
533
  SCH_TASK_DLOG("got explain rsp, rows:%" PRId64 ", complete:%d", htobe64(pRsp->numOfRows), pRsp->completed);
D
dapan1121 已提交
534

535
  atomic_store_64(&pJob->resNumOfRows, htobe64(pRsp->numOfRows));
D
dapan1121 已提交
536
  atomic_store_ptr(&pJob->fetchRes, pRsp);
D
dapan1121 已提交
537

D
dapan1121 已提交
538
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
D
dapan1121 已提交
539

X
Xiaoyu Wang 已提交
540 541 542
  if (!SCH_IS_INSERT_JOB(pJob)) {
    schProcessOnDataFetched(pJob);
  }
D
dapan1121 已提交
543

D
dapan1121 已提交
544
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
545 546
}

D
dapan1121 已提交
547
int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
548 549 550 551 552
  if (!SCH_IS_QUERY_JOB(pJob)) {
    return TSDB_CODE_SUCCESS;
  }

  SSchLevel *pLevel = pTask->level;
H
Hongze Cheng 已提交
553
  int32_t    doneNum = atomic_add_fetch_32(&pLevel->taskDoneNum, 1);
D
dapan1121 已提交
554 555 556 557 558 559 560 561 562 563
  if (doneNum == pLevel->taskNum) {
    pJob->levelIdx--;

    pLevel = taosArrayGet(pJob->levels, pJob->levelIdx);
    for (int32_t i = 0; i < pLevel->taskNum; ++i) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);

      if (pTask->children && taosArrayGetSize(pTask->children) > 0) {
        continue;
      }
H
Hongze Cheng 已提交
564

D
dapan1121 已提交
565 566 567 568 569 570 571
      SCH_ERR_RET(schLaunchTask(pJob, pTask));
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
572
int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) {
D
dapan1121 已提交
573
  if (rsp->tbFName[0]) {
D
dapan1121 已提交
574
    SCH_LOCK(SCH_WRITE, &pJob->resLock);
H
Hongze Cheng 已提交
575

D
dapan1121 已提交
576 577 578
    if (NULL == pJob->execRes.res) {
      pJob->execRes.res = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
      if (NULL == pJob->execRes.res) {
H
Hongze Cheng 已提交
579
        SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
D
dapan1121 已提交
580 581 582 583 584 585 586 587 588
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
      }
    }

    STbVerInfo tbInfo;
    strcpy(tbInfo.tbFName, rsp->tbFName);
    tbInfo.sversion = rsp->sversion;
    tbInfo.tversion = rsp->tversion;

D
dapan1121 已提交
589
    taosArrayPush((SArray *)pJob->execRes.res, &tbInfo);
D
dapan1121 已提交
590
    pJob->execRes.msgType = TDMT_SCH_QUERY;
D
dapan1121 已提交
591 592

    SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
D
dapan1121 已提交
593 594 595 596 597
  }

  return TSDB_CODE_SUCCESS;
}

598
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
D
dapan1121 已提交
599
  schGetTaskFromList(pJob->taskList, taskId, pTask);
600
  if (NULL == *pTask) {
D
dapan1121 已提交
601
    SCH_JOB_ELOG("task not found in job task list, taskId:0x%" PRIx64, taskId);
D
dapan1121 已提交
602
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
603 604 605 606 607
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
608 609
int32_t schLaunchJob(SSchJob *pJob) {
  if (EXPLAIN_MODE_STATIC == pJob->attr.explainMode) {
D
dapan1121 已提交
610
    SCH_ERR_RET(qExecStaticExplain(pJob->pDag, (SRetrieveTableRsp **)&pJob->fetchRes));
D
dapan1121 已提交
611
    SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
D
dapan1121 已提交
612
  } else {
D
dapan1121 已提交
613 614
    SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
    SCH_ERR_RET(schLaunchLevelTasks(pJob, level));
D
dapan1121 已提交
615 616 617 618 619 620 621
  }

  return TSDB_CODE_SUCCESS;
}

void schDropJobAllTasks(SSchJob *pJob) {
  schDropTaskInHashList(pJob, pJob->execTasks);
H
Hongze Cheng 已提交
622 623
  //  schDropTaskInHashList(pJob, pJob->succTasks);
  //  schDropTaskInHashList(pJob, pJob->failTasks);
D
dapan1121 已提交
624 625 626 627 628 629 630 631 632 633 634
}

void schFreeJobImpl(void *job) {
  if (NULL == job) {
    return;
  }

  SSchJob *pJob = job;
  uint64_t queryId = pJob->queryId;
  int64_t  refId = pJob->refId;

D
dapan1121 已提交
635 636
  qDebug("QID:0x%" PRIx64 " begin to free sch job, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);

D
dapan1121 已提交
637 638 639 640 641 642 643 644 645
  schDropJobAllTasks(pJob);

  int32_t numOfLevels = taosArrayGetSize(pJob->levels);
  for (int32_t i = 0; i < numOfLevels; ++i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);

    int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
    for (int32_t j = 0; j < numOfTasks; ++j) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
D
dapan1121 已提交
646
      schFreeTask(pJob, pTask);
D
dapan1121 已提交
647 648 649 650 651 652 653 654
    }

    taosArrayDestroy(pLevel->subTasks);
  }

  schFreeFlowCtrl(pJob);

  taosHashCleanup(pJob->execTasks);
H
Hongze Cheng 已提交
655 656
  //  taosHashCleanup(pJob->failTasks);
  //  taosHashCleanup(pJob->succTasks);
D
dapan1121 已提交
657
  taosHashCleanup(pJob->taskList);
H
Hongze Cheng 已提交
658

D
dapan1121 已提交
659 660 661 662 663 664
  taosArrayDestroy(pJob->levels);
  taosArrayDestroy(pJob->nodeList);
  taosArrayDestroy(pJob->dataSrcTasks);

  qExplainFreeCtx(pJob->explainCtx);

D
dapan1121 已提交
665
  destroyQueryExecRes(&pJob->execRes);
D
dapan1121 已提交
666

D
dapan1121 已提交
667
  qDestroyQueryPlan(pJob->pDag);
668
  nodesReleaseAllocatorWeakRef(pJob->allocatorRefId);
D
dapan1121 已提交
669

D
dapan1121 已提交
670
  taosMemoryFreeClear(pJob->userRes.execRes);
D
dapan1121 已提交
671
  taosMemoryFreeClear(pJob->fetchRes);
D
dapan1121 已提交
672
  taosMemoryFreeClear(pJob->sql);
D
dapan1121 已提交
673
  tsem_destroy(&pJob->rspSem);
D
dapan1121 已提交
674
  taosMemoryFree(pJob);
D
dapan1121 已提交
675

D
dapan1121 已提交
676 677 678 679
  int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
  if (jobNum == 0) {
    schCloseJobRef();
  }
D
dapan1121 已提交
680 681

  qDebug("QID:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
D
dapan1121 已提交
682 683
}

D
dapan1121 已提交
684
int32_t schJobFetchRows(SSchJob *pJob) {
H
Hongze Cheng 已提交
685
  int32_t code = 0;
D
dapan1121 已提交
686

X
Xiaoyu Wang 已提交
687
  if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC) && !(SCH_IS_EXPLAIN_JOB(pJob) && SCH_IS_INSERT_JOB(pJob))) {
D
dapan1121 已提交
688
    SCH_ERR_RET(schLaunchFetchTask(pJob));
H
Hongze Cheng 已提交
689

D
dapan1121 已提交
690
    if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
D
dapan1121 已提交
691 692
      SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
      tsem_wait(&pJob->rspSem);
H
Hongze Cheng 已提交
693
      SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
D
dapan1121 已提交
694 695
    }
  } else {
D
dapan1121 已提交
696
    if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
H
Hongze Cheng 已提交
697
      SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
D
dapan1121 已提交
698 699 700
    } else {
      schPostJobRes(pJob, SCH_OP_FETCH);
    }
D
dapan1121 已提交
701 702
  }

D
dapan1121 已提交
703
  SCH_RET(code);
D
dapan1121 已提交
704 705
}

D
dapan1121 已提交
706
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
D
dapan1121 已提交
707 708 709 710 711
  int32_t  code = 0;
  int64_t  refId = -1;
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
    qError("QID:0x%" PRIx64 " calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob));
S
Shengliang Guan 已提交
712
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
713 714 715
  }

  pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
H
Haojun Liao 已提交
716
  pJob->attr.localExec = pReq->localReq;
D
dapan1121 已提交
717
  pJob->conn = *pReq->pConn;
D
dapan1121 已提交
718
  if (pReq->sql) {
719
    pJob->sql = taosStrdup(pReq->sql);
D
dapan1121 已提交
720
  }
D
dapan1121 已提交
721
  pJob->pDag = pReq->pDag;
722
  pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId);
D
dapan1121 已提交
723 724 725
  pJob->chkKillFp = pReq->chkKillFp;
  pJob->chkKillParam = pReq->chkKillParam;
  pJob->userRes.execFp = pReq->execFp;
D
dapan1121 已提交
726
  pJob->userRes.cbParam = pReq->cbParam;
D
dapan1121 已提交
727 728 729 730

  if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
    qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
  } else {
H
Haojun Liao 已提交
731
    pJob->nodeList = taosArrayDup(pReq->pNodeList, NULL);
D
dapan1121 已提交
732
  }
H
Hongze Cheng 已提交
733 734 735

  pJob->taskList = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                                HASH_ENTRY_LOCK);
D
dapan1121 已提交
736 737
  if (NULL == pJob->taskList) {
    SCH_JOB_ELOG("taosHashInit %d taskList failed", pReq->pDag->numOfSubplans);
S
Shengliang Guan 已提交
738
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
739 740 741 742 743 744 745 746
  }

  SCH_ERR_JRET(schValidateAndBuildJob(pReq->pDag, pJob));

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_JRET(qExecExplainBegin(pReq->pDag, &pJob->explainCtx, pReq->startTs));
  }

H
Hongze Cheng 已提交
747 748
  pJob->execTasks = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                                 HASH_ENTRY_LOCK);
D
dapan1121 已提交
749 750
  if (NULL == pJob->execTasks) {
    SCH_JOB_ELOG("taosHashInit %d execTasks failed", pReq->pDag->numOfSubplans);
S
Shengliang Guan 已提交
751
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
752 753
  }

D
dapan1121 已提交
754 755 756 757
  if (tsem_init(&pJob->rspSem, 0, 0)) {
    SCH_JOB_ELOG("tsem_init failed, errno:%d", errno);
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
758

D
dapan1121 已提交
759 760
  pJob->refId = taosAddRef(schMgmt.jobRef, pJob);
  if (pJob->refId < 0) {
D
dapan1121 已提交
761 762 763 764 765 766
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
    SCH_ERR_JRET(terrno);
  }

  atomic_add_fetch_32(&schMgmt.jobNum, 1);

D
dapan1121 已提交
767
  *pJobId = pJob->refId;
D
dapan1121 已提交
768

H
Hongze Cheng 已提交
769
  SCH_JOB_DLOG("job refId:0x%" PRIx64 " created", pJob->refId);
D
dapan1121 已提交
770 771 772 773 774 775 776

  return TSDB_CODE_SUCCESS;

_return:

  if (NULL == pJob) {
    qDestroyQueryPlan(pReq->pDag);
D
dapan1121 已提交
777
  } else if (pJob->refId < 0) {
D
dapan1121 已提交
778 779
    schFreeJobImpl(pJob);
  } else {
D
dapan1121 已提交
780
    taosRemoveRef(schMgmt.jobRef, pJob->refId);
D
dapan1121 已提交
781
  }
H
Hongze Cheng 已提交
782

D
dapan1121 已提交
783 784 785
  SCH_RET(code);
}

D
dapan1121 已提交
786 787
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
  int32_t code = 0;
H
Hongze Cheng 已提交
788
  qDebug("QID:0x%" PRIx64 " sch job refId 0x%" PRIx64 " started", pReq->pDag->queryId, pJob->refId);
D
dapan1121 已提交
789

D
dapan1121 已提交
790
  SCH_ERR_RET(schLaunchJob(pJob));
H
Hongze Cheng 已提交
791

D
dapan1121 已提交
792 793 794
  if (pReq->syncReq) {
    SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
    tsem_wait(&pJob->rspSem);
D
dapan1121 已提交
795 796 797
  }

  SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
H
Hongze Cheng 已提交
798

D
dapan1121 已提交
799
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
800 801
}

H
Hongze Cheng 已提交
802
void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode) {
D
dapan1121 已提交
803
  if (NULL == pReq || pReq->syncReq) {
D
dapan1121 已提交
804 805
    return;
  }
H
Hongze Cheng 已提交
806

D
dapan1121 已提交
807 808 809 810 811 812
  if (pReq->execFp) {
    (*pReq->execFp)(NULL, pReq->cbParam, errCode);
  } else if (pReq->fetchFp) {
    (*pReq->fetchFp)(NULL, pReq->cbParam, errCode);
  }
}
D
dapan1121 已提交
813

D
dapan1121 已提交
814 815
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) {
  bool r = false;
D
dapan1121 已提交
816
  SCH_LOCK(SCH_READ, &pJob->opStatus.lock);
D
dapan1121 已提交
817 818 819 820 821
  if (sync >= 0) {
    r = (pJob->opStatus.op == op) && (pJob->opStatus.syncReq == sync);
  } else {
    r = (pJob->opStatus.op == op);
  }
D
dapan1121 已提交
822 823 824 825 826
  SCH_UNLOCK(SCH_READ, &pJob->opStatus.lock);

  return r;
}

H
Hongze Cheng 已提交
827
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode) {
D
dapan1121 已提交
828
  int32_t op = 0;
H
Hongze Cheng 已提交
829

D
dapan1121 已提交
830 831
  switch (type) {
    case SCH_OP_EXEC:
D
dapan1121 已提交
832
      if (pReq && pReq->syncReq) {
D
dapan1121 已提交
833
        SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
834 835
        op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
        if (SCH_OP_NULL == op || op != type) {
H
Hongze Cheng 已提交
836 837
          SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
                       jobTaskStatusStr(pJob->status));
D
dapan1121 已提交
838
        }
D
dapan1121 已提交
839
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
840 841 842 843
        schDumpJobExecRes(pJob, pReq->pExecRes);
      }
      break;
    case SCH_OP_FETCH:
D
dapan1121 已提交
844
      if (pReq && pReq->syncReq) {
D
dapan1121 已提交
845
        SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
846 847
        op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
        if (SCH_OP_NULL == op || op != type) {
H
Hongze Cheng 已提交
848 849
          SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
                       jobTaskStatusStr(pJob->status));
D
dapan1121 已提交
850
        }
D
dapan1121 已提交
851
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
852 853 854 855 856 857 858
      }
      break;
    case SCH_OP_GET_STATUS:
      errCode = TSDB_CODE_SUCCESS;
      break;
    default:
      break;
D
dapan1121 已提交
859 860
  }

D
dapan1121 已提交
861
  if (errCode) {
D
dapan1121 已提交
862
    schHandleJobFailure(pJob, errCode);
D
dapan1121 已提交
863 864 865
  }

  SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode));
D
dapan1121 已提交
866 867
}

H
Hongze Cheng 已提交
868
int32_t schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq) {
D
dapan1121 已提交
869
  int32_t code = 0;
H
Hongze Cheng 已提交
870 871
  int8_t  status = SCH_GET_JOB_STATUS(pJob);

D
dapan1121 已提交
872 873
  switch (type) {
    case SCH_OP_EXEC:
D
dapan1121 已提交
874
      SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
875 876
      if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
        SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
D
dapan1121 已提交
877
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
S
Shengliang Guan 已提交
878 879
        schDirectPostJobRes(pReq, TSDB_CODE_APP_ERROR);
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
880
      }
H
Hongze Cheng 已提交
881

D
dapan1121 已提交
882
      SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
H
Hongze Cheng 已提交
883

D
dapan1121 已提交
884
      pJob->opStatus.syncReq = pReq->syncReq;
D
dapan1121 已提交
885
      SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
886 887
      break;
    case SCH_OP_FETCH:
D
dapan1121 已提交
888
      SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
889 890
      if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
        SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
D
dapan1121 已提交
891
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
S
Shengliang Guan 已提交
892 893
        schDirectPostJobRes(pReq, TSDB_CODE_APP_ERROR);
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
894
      }
H
Hongze Cheng 已提交
895

D
dapan1121 已提交
896
      SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
H
Hongze Cheng 已提交
897

D
dapan1121 已提交
898 899 900
      pJob->userRes.fetchRes = pReq->pFetchRes;
      pJob->userRes.fetchFp = pReq->fetchFp;
      pJob->userRes.cbParam = pReq->cbParam;
H
Hongze Cheng 已提交
901

D
dapan1121 已提交
902
      pJob->opStatus.syncReq = pReq->syncReq;
D
dapan1121 已提交
903
      SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
H
Hongze Cheng 已提交
904

D
dapan1121 已提交
905 906
      if (!SCH_JOB_NEED_FETCH(pJob)) {
        SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
S
Shengliang Guan 已提交
907
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
908 909 910 911
      }

      if (status != JOB_TASK_STATUS_PART_SUCC) {
        SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
912 913
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
H
Hongze Cheng 已提交
914

D
dapan1121 已提交
915 916 917 918 919
      break;
    case SCH_OP_GET_STATUS:
      if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) {
        qDebug("job not initialized or not executable job, refId:0x%" PRIx64, pJob->refId);
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
920
      }
921
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
922 923
    default:
      SCH_JOB_ELOG("unknown operation type %d", type);
S
Shengliang Guan 已提交
924
      SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
925 926
  }

D
dapan1121 已提交
927 928 929 930 931
  if (schJobNeedToStop(pJob, &status)) {
    SCH_JOB_ELOG("abort op %s cause of job need to stop, status:%s", schGetOpStr(type), jobTaskStatusStr(status));
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
  }

D
dapan1121 已提交
932
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
933
}
D
dapan1121 已提交
934

D
dapan1121 已提交
935 936 937 938
void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
  if (pTask) {
    SCH_UNLOCK_TASK(pTask);
  }
D
dapan1121 已提交
939

D
dapan1121 已提交
940
  if (errCode) {
D
dapan1121 已提交
941
    schHandleJobFailure(pJob, errCode);
D
dapan1121 已提交
942
  }
H
Hongze Cheng 已提交
943

D
dapan1121 已提交
944 945 946
  if (pJob) {
    schReleaseJob(pJob->refId);
  }
D
dapan1121 已提交
947 948
}

H
Hongze Cheng 已提交
949
int32_t schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_t rId, uint64_t tId) {
D
dapan1121 已提交
950
  int32_t code = 0;
H
Hongze Cheng 已提交
951
  int8_t  status = 0;
D
dapan1121 已提交
952 953

  SSchTask *pTask = NULL;
H
Hongze Cheng 已提交
954
  SSchJob  *pJob = schAcquireJob(rId);
D
dapan1121 已提交
955 956 957
  if (NULL == pJob) {
    qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId);
    SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST);
D
dapan1121 已提交
958
  }
H
Hongze Cheng 已提交
959

D
dapan1121 已提交
960
  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
961
    SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
D
dapan1121 已提交
962 963 964 965 966 967 968
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
  }

  SCH_ERR_JRET(schGetTaskInJob(pJob, tId, &pTask));

  SCH_LOCK_TASK(pTask);

D
dapan1121 已提交
969 970 971
  *job = pJob;
  *task = pTask;

D
dapan1121 已提交
972
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
973

D
dapan1121 已提交
974 975 976 977 978 979 980 981
_return:

  if (pTask) {
    SCH_UNLOCK_TASK(pTask);
  }
  if (pJob) {
    schReleaseJob(rId);
  }
H
Hongze Cheng 已提交
982

D
dapan1121 已提交
983 984
  SCH_RET(code);
}