schJob.c 28.8 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalog.h"
#include "command.h"
#include "query.h"
D
dapan1121 已提交
19
#include "schInt.h"
D
dapan1121 已提交
20 21 22 23
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"

D
dapan1121 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
  if (TSDB_CODE_SUCCESS == errCode) {
    return;
  }

  int32_t origCode = atomic_load_32(&pJob->errCode);
  if (TSDB_CODE_SUCCESS == origCode) {
    if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) {
      goto _return;
    }

    origCode = atomic_load_32(&pJob->errCode);
  }

  if (NEED_CLIENT_HANDLE_ERROR(origCode)) {
    return;
  }

  if (NEED_CLIENT_HANDLE_ERROR(errCode)) {
    atomic_store_32(&pJob->errCode, errCode);
    goto _return;
  }

  return;

_return:
50
  SCH_JOB_DLOG("job errCode updated to %s", tstrerror(errCode));
D
dapan1121 已提交
51 52
}

D
dapan1121 已提交
53 54
bool schJobDone(SSchJob *pJob) {
  int8_t status = SCH_GET_JOB_STATUS(pJob);
H
Hongze Cheng 已提交
55 56

  return (status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_DROP || status == JOB_TASK_STATUS_SUCC);
D
dapan1121 已提交
57
}
D
dapan1121 已提交
58

D
dapan1121 已提交
59 60 61 62 63 64
FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (pStatus) {
    *pStatus = status;
  }

D
dapan1121 已提交
65 66 67 68
  if (schJobDone(pJob)) {
    return true;
  }

D
dapan1121 已提交
69
  if ((*pJob->chkKillFp)(pJob->chkKillParam)) {
D
dapan1121 已提交
70 71
    schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED);
    return true;
D
dapan1121 已提交
72
  }
D
dapan1121 已提交
73 74

  return false;
D
dapan1121 已提交
75 76
}

D
dapan1121 已提交
77
int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
D
dapan1121 已提交
78 79 80 81 82 83 84 85
  int32_t code = 0;

  int8_t oriStatus = 0;

  while (true) {
    oriStatus = SCH_GET_JOB_STATUS(pJob);

    if (oriStatus == newStatus) {
D
dapan1121 已提交
86
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
87 88 89 90
    }

    switch (oriStatus) {
      case JOB_TASK_STATUS_NULL:
D
dapan1121 已提交
91
        if (newStatus != JOB_TASK_STATUS_INIT) {
D
dapan1121 已提交
92 93 94 95
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }

        break;
D
dapan1121 已提交
96 97
      case JOB_TASK_STATUS_INIT:
        if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_DROP) {
D
dapan1121 已提交
98 99 100 101
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }

        break;
D
dapan1121 已提交
102 103 104
      case JOB_TASK_STATUS_EXEC:
        if (newStatus != JOB_TASK_STATUS_PART_SUCC && newStatus != JOB_TASK_STATUS_FAIL &&
            newStatus != JOB_TASK_STATUS_DROP) {
D
dapan1121 已提交
105 106 107 108
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }

        break;
D
dapan1121 已提交
109 110
      case JOB_TASK_STATUS_PART_SUCC:
        if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC &&
D
dapan1121 已提交
111
            newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC) {
D
dapan1121 已提交
112 113 114 115
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }

        break;
D
dapan1121 已提交
116 117 118
      case JOB_TASK_STATUS_SUCC:
      case JOB_TASK_STATUS_FAIL:
        if (newStatus != JOB_TASK_STATUS_DROP) {
D
dapan1121 已提交
119 120 121 122
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }

        break;
D
dapan1121 已提交
123
      case JOB_TASK_STATUS_DROP:
D
dapan1121 已提交
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
        SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);

      default:
        SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));
        SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }

    if (oriStatus != atomic_val_compare_exchange_8(&pJob->status, oriStatus, newStatus)) {
      continue;
    }

    SCH_JOB_DLOG("job status updated from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));

    break;
  }

  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
144 145 146 147 148
  if (TSDB_CODE_SCH_IGNORE_ERROR == code) {
    SCH_JOB_DLOG("ignore job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
  } else {
    SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
  }
D
dapan1121 已提交
149 150 151
  SCH_RET(code);
}

D
dapan1121 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
  for (int32_t i = 0; i < pJob->levelNum; ++i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);

    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
      SSubplan *pPlan = pTask->plan;
      int32_t   childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0;
      int32_t   parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0;

      if (childNum > 0) {
        if (pJob->levelIdx == pLevel->level) {
          SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        pTask->children = taosArrayInit(childNum, POINTER_BYTES);
        if (NULL == pTask->children) {
          SCH_TASK_ELOG("taosArrayInit %d children failed", childNum);
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      for (int32_t n = 0; n < childNum; ++n) {
        SSubplan  *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
        SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
        if (NULL == childTask || NULL == *childTask) {
          SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        if (NULL == taosArrayPush(pTask->children, childTask)) {
          SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }

D
dapan1121 已提交
188
        SCH_TASK_DLOG("children info, the %d child TID 0x%" PRIx64, n, (*childTask)->taskId);
D
dapan1121 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
      }

      if (parentNum > 0) {
        if (0 == pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        pTask->parents = taosArrayInit(parentNum, POINTER_BYTES);
        if (NULL == pTask->parents) {
          SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      } else {
        if (0 != pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:%d, parentNum:%d", pLevel->level, parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
      }

      for (int32_t n = 0; n < parentNum; ++n) {
        SSubplan  *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
        SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
        if (NULL == parentTask || NULL == *parentTask) {
          SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        if (NULL == taosArrayPush(pTask->parents, parentTask)) {
          SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }

H
Hongze Cheng 已提交
222
        SCH_TASK_DLOG("parents info, the %d parent TID 0x%" PRIx64, n, (*parentTask)->taskId);
D
dapan1121 已提交
223 224 225 226 227 228 229
      }

      SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
    }
  }

  SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
D
dapan1121 已提交
230 231 232 233 234 235
  if (SCH_IS_QUERY_JOB(pJob)) {
    if (pLevel->taskNum > 1) {
      SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }

H
Hongze Cheng 已提交
236
    SSchTask *pTask = taosArrayGet(pLevel->subTasks, 0);
D
dapan1121 已提交
237 238 239
    if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType) {
      pJob->attr.needFetch = true;
    }
D
dapan1121 已提交
240 241 242 243 244
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
245
int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
246
  if (!SCH_IS_DATA_BIND_QRY_TASK(pTask)) {
D
dapan1121 已提交
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
    return TSDB_CODE_SUCCESS;
  }

  taosArrayPush(pJob->dataSrcTasks, &pTask);

  return TSDB_CODE_SUCCESS;
}

int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
  int32_t code = 0;
  pJob->queryId = pDag->queryId;

  if (pDag->numOfSubplans <= 0) {
    SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  pJob->dataSrcTasks = taosArrayInit(pDag->numOfSubplans, POINTER_BYTES);
  if (NULL == pJob->dataSrcTasks) {
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
  if (levelNum <= 0) {
    SCH_JOB_ELOG("invalid level num:%d", levelNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SHashObj *planToTask = taosHashInit(
D
dapan1121 已提交
276
      pDag->numOfSubplans,
D
dapan1121 已提交
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
      taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
      HASH_NO_LOCK);
  if (NULL == planToTask) {
    SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
  if (NULL == pJob->levels) {
    SCH_JOB_ELOG("taosArrayInit %d failed", levelNum);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->levelNum = levelNum;
  pJob->levelIdx = levelNum - 1;

  SSchLevel      level = {0};
  SNodeListNode *plans = NULL;
  int32_t        taskNum = 0;
  SSchLevel     *pLevel = NULL;

D
dapan1121 已提交
298
  level.status = JOB_TASK_STATUS_INIT;
D
dapan1121 已提交
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334

  for (int32_t i = 0; i < levelNum; ++i) {
    if (NULL == taosArrayPush(pJob->levels, &level)) {
      SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    pLevel = taosArrayGet(pJob->levels, i);
    pLevel->level = i;

    plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
    if (NULL == plans) {
      SCH_JOB_ELOG("empty level plan, level:%d", i);
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
    }

    taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
    if (taskNum <= 0) {
      SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
    }

    pLevel->taskNum = taskNum;

    pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
    if (NULL == pLevel->subTasks) {
      SCH_JOB_ELOG("taosArrayInit %d failed", taskNum);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    for (int32_t n = 0; n < taskNum; ++n) {
      SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);

      SCH_SET_JOB_TYPE(pJob, plan->subplanType);

      SSchTask  task = {0};
D
dapan1121 已提交
335 336
      SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task);
      if (NULL == pTask) {
D
dapan1121 已提交
337 338 339 340
        SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

D
dapan1121 已提交
341
      SCH_ERR_JRET(schInitTask(pJob, pTask, plan, pLevel));
342

D
dapan1121 已提交
343
      SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask));
D
dapan1121 已提交
344

D
dapan1121 已提交
345
      if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES)) {
D
dapan1121 已提交
346 347 348 349
        SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

D
dapan1121 已提交
350 351 352 353 354
      if (0 != taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) {
        SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d", n);
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

D
dapan1121 已提交
355 356 357
      ++pJob->taskNum;
    }

D
dapan1121 已提交
358
    SCH_JOB_DLOG("level %d initialized, taskNum:%d", i, taskNum);
D
dapan1121 已提交
359 360 361 362 363
  }

  SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));

_return:
H
Hongze Cheng 已提交
364

D
dapan1121 已提交
365 366 367 368 369 370 371
  if (planToTask) {
    taosHashCleanup(planToTask);
  }

  SCH_RET(code);
}

H
Hongze Cheng 已提交
372
int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes) {
D
dapan1121 已提交
373 374
  pRes->code = atomic_load_32(&pJob->errCode);
  pRes->numOfRows = pJob->resNumOfRows;
D
dapan1121 已提交
375 376
  pRes->res = pJob->execRes.res;
  pRes->msgType = pJob->execRes.msgType;
D
dapan1121 已提交
377
  pRes->numOfBytes = pJob->execRes.numOfBytes;
D
dapan1121 已提交
378
  pJob->execRes.res = NULL;
D
dapan1121 已提交
379

D
dapan1121 已提交
380 381
  SCH_JOB_DLOG("execRes dumped, code: %s", tstrerror(pRes->code));

D
dapan1121 已提交
382 383 384
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
385
int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) {
D
dapan1121 已提交
386
  int32_t code = 0;
H
Hongze Cheng 已提交
387

D
dapan1121 已提交
388 389 390
  SCH_LOCK(SCH_WRITE, &pJob->resLock);

  pJob->fetched = true;
H
Hongze Cheng 已提交
391

D
dapan1121 已提交
392 393
  if (pJob->fetchRes && ((SRetrieveTableRsp *)pJob->fetchRes)->completed) {
    SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_SUCC, NULL));
D
dapan1121 已提交
394 395 396
  }

  while (true) {
D
dapan1121 已提交
397 398
    *pData = atomic_load_ptr(&pJob->fetchRes);
    if (*pData != atomic_val_compare_exchange_ptr(&pJob->fetchRes, *pData, NULL)) {
D
dapan1121 已提交
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416
      continue;
    }

    break;
  }

  if (NULL == *pData) {
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
    if (rsp) {
      rsp->completed = 1;
    }

    *pData = rsp;
    SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
  }

  SCH_JOB_DLOG("fetch done, totalRows:%d", pJob->resNumOfRows);

D
dapan1121 已提交
417 418 419
_return:

  SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
H
Hongze Cheng 已提交
420

D
dapan1121 已提交
421
  return code;
D
dapan1121 已提交
422 423
}

H
Hongze Cheng 已提交
424 425
int32_t schNotifyUserExecRes(SSchJob *pJob) {
  SExecResult *pRes = taosMemoryCalloc(1, sizeof(SExecResult));
D
dapan1121 已提交
426
  if (pRes) {
D
dapan1121 已提交
427
    schDumpJobExecRes(pJob, pRes);
D
dapan1121 已提交
428 429
  }

D
dapan1121 已提交
430
  SCH_JOB_DLOG("sch start to invoke exec cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
431
  (*pJob->userRes.execFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
432
  SCH_JOB_DLOG("sch end from exec cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
433 434 435 436

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
437 438 439
int32_t schNotifyUserFetchRes(SSchJob *pJob) {
  void *pRes = NULL;

D
dapan1121 已提交
440
  schDumpJobFetchRes(pJob, &pRes);
D
dapan1121 已提交
441 442

  SCH_JOB_DLOG("sch start to invoke fetch cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
443
  (*pJob->userRes.fetchFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
444
  SCH_JOB_DLOG("sch end from fetch cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
445 446 447 448

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
449
void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
D
dapan1121 已提交
450
  SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
H
Hongze Cheng 已提交
451

D
dapan1121 已提交
452
  if (SCH_OP_NULL == pJob->opStatus.op) {
D
dapan1121 已提交
453
    SCH_JOB_DLOG("job not in any operation, no need to post job res, status:%s", jobTaskStatusStr(pJob->status));
D
dapan1121 已提交
454
    goto _return;
D
dapan1121 已提交
455
  }
H
Hongze Cheng 已提交
456

D
dapan1121 已提交
457
  if (op && pJob->opStatus.op != op) {
D
dapan1121 已提交
458
    SCH_JOB_ELOG("job in operation %s mis-match with expected %s", schGetOpStr(pJob->opStatus.op), schGetOpStr(op));
D
dapan1121 已提交
459
    goto _return;
D
dapan1121 已提交
460
  }
H
Hongze Cheng 已提交
461

D
dapan1121 已提交
462
  if (SCH_JOB_IN_SYNC_OP(pJob)) {
D
dapan1121 已提交
463
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
464 465
    tsem_post(&pJob->rspSem);
  } else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) {
D
dapan1121 已提交
466
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
467 468
    schNotifyUserExecRes(pJob);
  } else if (SCH_JOB_IN_ASYNC_FETCH_OP(pJob)) {
D
dapan1121 已提交
469
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
470 471
    schNotifyUserFetchRes(pJob);
  } else {
D
dapan1121 已提交
472
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
473 474
    SCH_JOB_ELOG("job not in any operation, status:%s", jobTaskStatusStr(pJob->status));
  }
D
dapan1121 已提交
475 476 477 478 479 480

  return;

_return:

  SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
481 482
}

D
dapan1121 已提交
483
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
D
dapan1121 已提交
484
  schUpdateJobErrCode(pJob, errCode);
H
Hongze Cheng 已提交
485

D
dapan1121 已提交
486
  int32_t code = atomic_load_32(&pJob->errCode);
D
dapan1121 已提交
487
  if (code) {
488
    SCH_JOB_DLOG("job failed with error %s", tstrerror(code));
D
dapan1121 已提交
489
  }
D
dapan1121 已提交
490

D
dapan1121 已提交
491
  schPostJobRes(pJob, 0);
D
dapan1121 已提交
492

D
dapan1121 已提交
493
  SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
494 495
}

D
dapan1121 已提交
496
int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode) {
D
dapan1121 已提交
497 498 499 500
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    return TSDB_CODE_SCH_IGNORE_ERROR;
  }

D
dapan1121 已提交
501
  schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, &errCode);
D
dapan1121 已提交
502
  return TSDB_CODE_SCH_IGNORE_ERROR;
D
dapan1121 已提交
503 504
}

H
Hongze Cheng 已提交
505
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { SCH_RET(schProcessOnJobFailure(pJob, errCode)); }
D
dapan1121 已提交
506 507 508 509 510 511 512 513

int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) {
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    return TSDB_CODE_SCH_IGNORE_ERROR;
  }

  schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, &errCode);
  return TSDB_CODE_SCH_IGNORE_ERROR;
D
dapan1121 已提交
514 515
}

H
Hongze Cheng 已提交
516
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
D
dapan1121 已提交
517 518 519 520 521
  if (schChkCurrentOp(pJob, SCH_OP_FETCH, -1)) {
    SCH_ERR_RET(schLaunchFetchTask(pJob));
  } else {
    schPostJobRes(pJob, 0);
  }
D
dapan1121 已提交
522 523 524 525

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
526
void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); }
D
dapan1121 已提交
527

D
dapan1121 已提交
528 529
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
  SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
D
dapan1121 已提交
530

D
dapan1121 已提交
531
  atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
D
dapan1121 已提交
532
  atomic_store_ptr(&pJob->fetchRes, pRsp);
D
dapan1121 已提交
533

D
dapan1121 已提交
534
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
D
dapan1121 已提交
535

D
dapan1121 已提交
536
  schProcessOnDataFetched(pJob);
D
dapan1121 已提交
537

D
dapan1121 已提交
538
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
539 540
}

D
dapan1121 已提交
541
int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
542 543 544 545 546
  if (!SCH_IS_QUERY_JOB(pJob)) {
    return TSDB_CODE_SUCCESS;
  }

  SSchLevel *pLevel = pTask->level;
H
Hongze Cheng 已提交
547
  int32_t    doneNum = atomic_add_fetch_32(&pLevel->taskDoneNum, 1);
D
dapan1121 已提交
548 549 550 551 552 553 554 555 556 557
  if (doneNum == pLevel->taskNum) {
    pJob->levelIdx--;

    pLevel = taosArrayGet(pJob->levels, pJob->levelIdx);
    for (int32_t i = 0; i < pLevel->taskNum; ++i) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);

      if (pTask->children && taosArrayGetSize(pTask->children) > 0) {
        continue;
      }
H
Hongze Cheng 已提交
558

D
dapan1121 已提交
559 560 561 562 563 564 565
      SCH_ERR_RET(schLaunchTask(pJob, pTask));
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
566
int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) {
D
dapan1121 已提交
567
  if (rsp->tbFName[0]) {
D
dapan1121 已提交
568
    SCH_LOCK(SCH_WRITE, &pJob->resLock);
H
Hongze Cheng 已提交
569

D
dapan1121 已提交
570 571 572
    if (NULL == pJob->execRes.res) {
      pJob->execRes.res = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
      if (NULL == pJob->execRes.res) {
H
Hongze Cheng 已提交
573
        SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
D
dapan1121 已提交
574 575 576 577 578 579 580 581 582
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
      }
    }

    STbVerInfo tbInfo;
    strcpy(tbInfo.tbFName, rsp->tbFName);
    tbInfo.sversion = rsp->sversion;
    tbInfo.tversion = rsp->tversion;

D
dapan1121 已提交
583
    taosArrayPush((SArray *)pJob->execRes.res, &tbInfo);
D
dapan1121 已提交
584
    pJob->execRes.msgType = TDMT_SCH_QUERY;
D
dapan1121 已提交
585 586

    SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
D
dapan1121 已提交
587 588 589 590 591
  }

  return TSDB_CODE_SUCCESS;
}

592
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
D
dapan1121 已提交
593
  schGetTaskFromList(pJob->taskList, taskId, pTask);
594
  if (NULL == *pTask) {
D
dapan1121 已提交
595
    SCH_JOB_ELOG("task not found in job task list, taskId:0x%" PRIx64, taskId);
D
dapan1121 已提交
596
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
597 598 599 600 601
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
602 603
int32_t schLaunchJob(SSchJob *pJob) {
  if (EXPLAIN_MODE_STATIC == pJob->attr.explainMode) {
D
dapan1121 已提交
604
    SCH_ERR_RET(qExecStaticExplain(pJob->pDag, (SRetrieveTableRsp **)&pJob->fetchRes));
D
dapan1121 已提交
605
    SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
D
dapan1121 已提交
606
  } else {
D
dapan1121 已提交
607 608
    SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
    SCH_ERR_RET(schLaunchLevelTasks(pJob, level));
D
dapan1121 已提交
609 610 611 612 613 614 615
  }

  return TSDB_CODE_SUCCESS;
}

void schDropJobAllTasks(SSchJob *pJob) {
  schDropTaskInHashList(pJob, pJob->execTasks);
H
Hongze Cheng 已提交
616 617
  //  schDropTaskInHashList(pJob, pJob->succTasks);
  //  schDropTaskInHashList(pJob, pJob->failTasks);
D
dapan1121 已提交
618 619 620 621 622 623 624 625 626 627 628
}

void schFreeJobImpl(void *job) {
  if (NULL == job) {
    return;
  }

  SSchJob *pJob = job;
  uint64_t queryId = pJob->queryId;
  int64_t  refId = pJob->refId;

D
dapan1121 已提交
629 630
  qDebug("QID:0x%" PRIx64 " begin to free sch job, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);

D
dapan1121 已提交
631 632 633 634 635 636 637 638 639
  schDropJobAllTasks(pJob);

  int32_t numOfLevels = taosArrayGetSize(pJob->levels);
  for (int32_t i = 0; i < numOfLevels; ++i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);

    int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
    for (int32_t j = 0; j < numOfTasks; ++j) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
D
dapan1121 已提交
640
      schFreeTask(pJob, pTask);
D
dapan1121 已提交
641 642 643 644 645 646 647 648
    }

    taosArrayDestroy(pLevel->subTasks);
  }

  schFreeFlowCtrl(pJob);

  taosHashCleanup(pJob->execTasks);
H
Hongze Cheng 已提交
649 650
  //  taosHashCleanup(pJob->failTasks);
  //  taosHashCleanup(pJob->succTasks);
D
dapan1121 已提交
651
  taosHashCleanup(pJob->taskList);
H
Hongze Cheng 已提交
652

D
dapan1121 已提交
653 654 655 656 657 658
  taosArrayDestroy(pJob->levels);
  taosArrayDestroy(pJob->nodeList);
  taosArrayDestroy(pJob->dataSrcTasks);

  qExplainFreeCtx(pJob->explainCtx);

D
dapan1121 已提交
659
  destroyQueryExecRes(&pJob->execRes);
D
dapan1121 已提交
660

D
dapan1121 已提交
661
  qDestroyQueryPlan(pJob->pDag);
662
  nodesReleaseAllocatorWeakRef(pJob->allocatorRefId);
D
dapan1121 已提交
663

D
dapan1121 已提交
664
  taosMemoryFreeClear(pJob->userRes.execRes);
D
dapan1121 已提交
665
  taosMemoryFreeClear(pJob->fetchRes);
D
dapan1121 已提交
666
  taosMemoryFreeClear(pJob->sql);
D
dapan1121 已提交
667
  taosMemoryFree(pJob);
D
dapan1121 已提交
668

D
dapan1121 已提交
669 670 671 672
  int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
  if (jobNum == 0) {
    schCloseJobRef();
  }
D
dapan1121 已提交
673 674

  qDebug("QID:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
D
dapan1121 已提交
675 676
}

D
dapan1121 已提交
677
int32_t schJobFetchRows(SSchJob *pJob) {
H
Hongze Cheng 已提交
678
  int32_t code = 0;
D
dapan1121 已提交
679 680

  if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
D
dapan1121 已提交
681
    SCH_ERR_RET(schLaunchFetchTask(pJob));
H
Hongze Cheng 已提交
682

D
dapan1121 已提交
683
    if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
D
dapan1121 已提交
684 685
      SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
      tsem_wait(&pJob->rspSem);
H
Hongze Cheng 已提交
686
      SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
D
dapan1121 已提交
687 688
    }
  } else {
D
dapan1121 已提交
689
    if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
H
Hongze Cheng 已提交
690
      SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
D
dapan1121 已提交
691 692 693
    } else {
      schPostJobRes(pJob, SCH_OP_FETCH);
    }
D
dapan1121 已提交
694 695
  }

D
dapan1121 已提交
696
  SCH_RET(code);
D
dapan1121 已提交
697 698
}

D
dapan1121 已提交
699
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
D
dapan1121 已提交
700 701 702 703 704 705 706 707 708
  int32_t  code = 0;
  int64_t  refId = -1;
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
    qError("QID:0x%" PRIx64 " calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
H
Haojun Liao 已提交
709
  pJob->attr.localExec = pReq->localReq;
D
dapan1121 已提交
710
  pJob->conn = *pReq->pConn;
D
dapan1121 已提交
711 712 713
  if (pReq->sql) {
    pJob->sql = strdup(pReq->sql);
  }
D
dapan1121 已提交
714
  pJob->pDag = pReq->pDag;
715
  pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId);
D
dapan1121 已提交
716 717 718
  pJob->chkKillFp = pReq->chkKillFp;
  pJob->chkKillParam = pReq->chkKillParam;
  pJob->userRes.execFp = pReq->execFp;
D
dapan1121 已提交
719
  pJob->userRes.cbParam = pReq->cbParam;
D
dapan1121 已提交
720 721 722 723 724 725

  if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
    qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
  } else {
    pJob->nodeList = taosArrayDup(pReq->pNodeList);
  }
H
Hongze Cheng 已提交
726 727 728

  pJob->taskList = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                                HASH_ENTRY_LOCK);
D
dapan1121 已提交
729 730 731 732 733 734 735 736 737 738 739
  if (NULL == pJob->taskList) {
    SCH_JOB_ELOG("taosHashInit %d taskList failed", pReq->pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SCH_ERR_JRET(schValidateAndBuildJob(pReq->pDag, pJob));

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_JRET(qExecExplainBegin(pReq->pDag, &pJob->explainCtx, pReq->startTs));
  }

H
Hongze Cheng 已提交
740 741
  pJob->execTasks = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                                 HASH_ENTRY_LOCK);
D
dapan1121 已提交
742 743 744 745 746 747 748
  if (NULL == pJob->execTasks) {
    SCH_JOB_ELOG("taosHashInit %d execTasks failed", pReq->pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  tsem_init(&pJob->rspSem, 0, 0);

D
dapan1121 已提交
749 750
  pJob->refId = taosAddRef(schMgmt.jobRef, pJob);
  if (pJob->refId < 0) {
D
dapan1121 已提交
751 752 753 754 755 756
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
    SCH_ERR_JRET(terrno);
  }

  atomic_add_fetch_32(&schMgmt.jobNum, 1);

D
dapan1121 已提交
757
  *pJobId = pJob->refId;
D
dapan1121 已提交
758

H
Hongze Cheng 已提交
759
  SCH_JOB_DLOG("job refId:0x%" PRIx64 " created", pJob->refId);
D
dapan1121 已提交
760 761 762 763 764 765 766

  return TSDB_CODE_SUCCESS;

_return:

  if (NULL == pJob) {
    qDestroyQueryPlan(pReq->pDag);
D
dapan1121 已提交
767
  } else if (pJob->refId < 0) {
D
dapan1121 已提交
768 769
    schFreeJobImpl(pJob);
  } else {
D
dapan1121 已提交
770
    taosRemoveRef(schMgmt.jobRef, pJob->refId);
D
dapan1121 已提交
771
  }
H
Hongze Cheng 已提交
772

D
dapan1121 已提交
773 774 775
  SCH_RET(code);
}

D
dapan1121 已提交
776 777
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
  int32_t code = 0;
H
Hongze Cheng 已提交
778
  qDebug("QID:0x%" PRIx64 " sch job refId 0x%" PRIx64 " started", pReq->pDag->queryId, pJob->refId);
D
dapan1121 已提交
779

D
dapan1121 已提交
780
  SCH_ERR_RET(schLaunchJob(pJob));
H
Hongze Cheng 已提交
781

D
dapan1121 已提交
782 783 784
  if (pReq->syncReq) {
    SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
    tsem_wait(&pJob->rspSem);
D
dapan1121 已提交
785 786 787
  }

  SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
H
Hongze Cheng 已提交
788

D
dapan1121 已提交
789
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
790 791
}

H
Hongze Cheng 已提交
792
void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode) {
D
dapan1121 已提交
793
  if (NULL == pReq || pReq->syncReq) {
D
dapan1121 已提交
794 795
    return;
  }
H
Hongze Cheng 已提交
796

D
dapan1121 已提交
797 798 799 800 801 802
  if (pReq->execFp) {
    (*pReq->execFp)(NULL, pReq->cbParam, errCode);
  } else if (pReq->fetchFp) {
    (*pReq->fetchFp)(NULL, pReq->cbParam, errCode);
  }
}
D
dapan1121 已提交
803

D
dapan1121 已提交
804 805
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) {
  bool r = false;
D
dapan1121 已提交
806
  SCH_LOCK(SCH_READ, &pJob->opStatus.lock);
D
dapan1121 已提交
807 808 809 810 811
  if (sync >= 0) {
    r = (pJob->opStatus.op == op) && (pJob->opStatus.syncReq == sync);
  } else {
    r = (pJob->opStatus.op == op);
  }
D
dapan1121 已提交
812 813 814 815 816
  SCH_UNLOCK(SCH_READ, &pJob->opStatus.lock);

  return r;
}

H
Hongze Cheng 已提交
817
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode) {
D
dapan1121 已提交
818
  int32_t op = 0;
H
Hongze Cheng 已提交
819

D
dapan1121 已提交
820 821
  switch (type) {
    case SCH_OP_EXEC:
D
dapan1121 已提交
822
      if (pReq && pReq->syncReq) {
D
dapan1121 已提交
823
        SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
824 825
        op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
        if (SCH_OP_NULL == op || op != type) {
H
Hongze Cheng 已提交
826 827
          SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
                       jobTaskStatusStr(pJob->status));
D
dapan1121 已提交
828
        }
D
dapan1121 已提交
829
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
830 831 832 833
        schDumpJobExecRes(pJob, pReq->pExecRes);
      }
      break;
    case SCH_OP_FETCH:
D
dapan1121 已提交
834
      if (pReq && pReq->syncReq) {
D
dapan1121 已提交
835
        SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
836 837
        op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
        if (SCH_OP_NULL == op || op != type) {
H
Hongze Cheng 已提交
838 839
          SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
                       jobTaskStatusStr(pJob->status));
D
dapan1121 已提交
840
        }
D
dapan1121 已提交
841
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
842 843 844 845 846 847 848
      }
      break;
    case SCH_OP_GET_STATUS:
      errCode = TSDB_CODE_SUCCESS;
      break;
    default:
      break;
D
dapan1121 已提交
849 850
  }

D
dapan1121 已提交
851
  if (errCode) {
D
dapan1121 已提交
852
    schHandleJobFailure(pJob, errCode);
D
dapan1121 已提交
853 854 855
  }

  SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode));
D
dapan1121 已提交
856 857
}

H
Hongze Cheng 已提交
858
int32_t schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq) {
D
dapan1121 已提交
859
  int32_t code = 0;
H
Hongze Cheng 已提交
860 861
  int8_t  status = SCH_GET_JOB_STATUS(pJob);

D
dapan1121 已提交
862 863
  switch (type) {
    case SCH_OP_EXEC:
D
dapan1121 已提交
864
      SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
865 866
      if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
        SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
D
dapan1121 已提交
867
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
868
        schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR);
D
dapan1121 已提交
869 870
        SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
      }
H
Hongze Cheng 已提交
871

D
dapan1121 已提交
872
      SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
H
Hongze Cheng 已提交
873

D
dapan1121 已提交
874
      pJob->opStatus.syncReq = pReq->syncReq;
D
dapan1121 已提交
875
      SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
876 877
      break;
    case SCH_OP_FETCH:
D
dapan1121 已提交
878
      SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
879 880
      if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
        SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
D
dapan1121 已提交
881
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
882
        schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR);
D
dapan1121 已提交
883 884
        SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
      }
H
Hongze Cheng 已提交
885

D
dapan1121 已提交
886
      SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
H
Hongze Cheng 已提交
887

D
dapan1121 已提交
888 889 890
      pJob->userRes.fetchRes = pReq->pFetchRes;
      pJob->userRes.fetchFp = pReq->fetchFp;
      pJob->userRes.cbParam = pReq->cbParam;
H
Hongze Cheng 已提交
891

D
dapan1121 已提交
892
      pJob->opStatus.syncReq = pReq->syncReq;
D
dapan1121 已提交
893
      SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
H
Hongze Cheng 已提交
894

D
dapan1121 已提交
895 896
      if (!SCH_JOB_NEED_FETCH(pJob)) {
        SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
897
        SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
898 899 900 901
      }

      if (status != JOB_TASK_STATUS_PART_SUCC) {
        SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
902 903
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
H
Hongze Cheng 已提交
904

D
dapan1121 已提交
905 906 907 908 909
      break;
    case SCH_OP_GET_STATUS:
      if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) {
        qDebug("job not initialized or not executable job, refId:0x%" PRIx64, pJob->refId);
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
910
      }
911
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
912 913
    default:
      SCH_JOB_ELOG("unknown operation type %d", type);
D
dapan1121 已提交
914
      SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
D
dapan1121 已提交
915 916
  }

D
dapan1121 已提交
917 918 919 920 921
  if (schJobNeedToStop(pJob, &status)) {
    SCH_JOB_ELOG("abort op %s cause of job need to stop, status:%s", schGetOpStr(type), jobTaskStatusStr(status));
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
  }

D
dapan1121 已提交
922
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
923
}
D
dapan1121 已提交
924

D
dapan1121 已提交
925 926 927 928
void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
  if (pTask) {
    SCH_UNLOCK_TASK(pTask);
  }
D
dapan1121 已提交
929

D
dapan1121 已提交
930
  if (errCode) {
D
dapan1121 已提交
931
    schHandleJobFailure(pJob, errCode);
D
dapan1121 已提交
932
  }
H
Hongze Cheng 已提交
933

D
dapan1121 已提交
934 935 936
  if (pJob) {
    schReleaseJob(pJob->refId);
  }
D
dapan1121 已提交
937 938
}

H
Hongze Cheng 已提交
939
int32_t schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_t rId, uint64_t tId) {
D
dapan1121 已提交
940
  int32_t code = 0;
H
Hongze Cheng 已提交
941
  int8_t  status = 0;
D
dapan1121 已提交
942 943

  SSchTask *pTask = NULL;
H
Hongze Cheng 已提交
944
  SSchJob  *pJob = schAcquireJob(rId);
D
dapan1121 已提交
945 946 947
  if (NULL == pJob) {
    qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId);
    SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST);
D
dapan1121 已提交
948
  }
H
Hongze Cheng 已提交
949

D
dapan1121 已提交
950
  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
951
    SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
D
dapan1121 已提交
952 953 954 955 956 957 958
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
  }

  SCH_ERR_JRET(schGetTaskInJob(pJob, tId, &pTask));

  SCH_LOCK_TASK(pTask);

D
dapan1121 已提交
959 960 961
  *job = pJob;
  *task = pTask;

D
dapan1121 已提交
962
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
963

D
dapan1121 已提交
964 965 966 967 968 969 970 971
_return:

  if (pTask) {
    SCH_UNLOCK_TASK(pTask);
  }
  if (pJob) {
    schReleaseJob(rId);
  }
H
Hongze Cheng 已提交
972

D
dapan1121 已提交
973 974
  SCH_RET(code);
}