schJob.c 30.2 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalog.h"
#include "command.h"
#include "query.h"
D
dapan1121 已提交
19
#include "schInt.h"
D
dapan1121 已提交
20 21 22 23
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"

D
dapan1121 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
  if (TSDB_CODE_SUCCESS == errCode) {
    return;
  }

  int32_t origCode = atomic_load_32(&pJob->errCode);
  if (TSDB_CODE_SUCCESS == origCode) {
    if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) {
      goto _return;
    }

    origCode = atomic_load_32(&pJob->errCode);
  }

  if (NEED_CLIENT_HANDLE_ERROR(origCode)) {
    return;
  }

  if (NEED_CLIENT_HANDLE_ERROR(errCode)) {
    atomic_store_32(&pJob->errCode, errCode);
    goto _return;
  }

  return;

_return:
50
  SCH_JOB_DLOG("job errCode updated to %s", tstrerror(errCode));
D
dapan1121 已提交
51 52
}

D
dapan1121 已提交
53 54
bool schJobDone(SSchJob *pJob) {
  int8_t status = SCH_GET_JOB_STATUS(pJob);
H
Hongze Cheng 已提交
55 56

  return (status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_DROP || status == JOB_TASK_STATUS_SUCC);
D
dapan1121 已提交
57
}
D
dapan1121 已提交
58

D
dapan1121 已提交
59 60 61 62 63 64
FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (pStatus) {
    *pStatus = status;
  }

D
dapan1121 已提交
65 66 67 68
  if (schJobDone(pJob)) {
    return true;
  }

D
dapan1121 已提交
69
  if ((*pJob->chkKillFp)(pJob->chkKillParam)) {
D
dapan1121 已提交
70 71
    schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED);
    return true;
D
dapan1121 已提交
72
  }
D
dapan1121 已提交
73 74

  return false;
D
dapan1121 已提交
75 76
}

D
dapan1121 已提交
77
int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
D
dapan1121 已提交
78 79 80 81 82 83 84 85
  int32_t code = 0;

  int8_t oriStatus = 0;

  while (true) {
    oriStatus = SCH_GET_JOB_STATUS(pJob);

    if (oriStatus == newStatus) {
D
dapan1121 已提交
86
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
87 88 89 90
    }

    switch (oriStatus) {
      case JOB_TASK_STATUS_NULL:
D
dapan1121 已提交
91
        if (newStatus != JOB_TASK_STATUS_INIT) {
S
Shengliang Guan 已提交
92
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
93 94 95
        }

        break;
D
dapan1121 已提交
96 97
      case JOB_TASK_STATUS_INIT:
        if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_DROP) {
S
Shengliang Guan 已提交
98
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
99 100 101
        }

        break;
D
dapan1121 已提交
102 103 104
      case JOB_TASK_STATUS_EXEC:
        if (newStatus != JOB_TASK_STATUS_PART_SUCC && newStatus != JOB_TASK_STATUS_FAIL &&
            newStatus != JOB_TASK_STATUS_DROP) {
S
Shengliang Guan 已提交
105
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
106 107 108
        }

        break;
D
dapan1121 已提交
109 110
      case JOB_TASK_STATUS_PART_SUCC:
        if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC &&
D
dapan1121 已提交
111 112
            newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC &&
            newStatus != JOB_TASK_STATUS_FETCH) {
S
Shengliang Guan 已提交
113
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
114 115
        }

D
dapan1121 已提交
116 117 118 119 120 121 122
        break;
      case JOB_TASK_STATUS_FETCH:
        if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC &&
            newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC) {
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
        }
      
D
dapan1121 已提交
123
        break;
D
dapan1121 已提交
124 125 126
      case JOB_TASK_STATUS_SUCC:
      case JOB_TASK_STATUS_FAIL:
        if (newStatus != JOB_TASK_STATUS_DROP) {
S
Shengliang Guan 已提交
127
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
128 129 130
        }

        break;
D
dapan1121 已提交
131
      case JOB_TASK_STATUS_DROP:
D
dapan1121 已提交
132 133 134 135
        SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);

      default:
        SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));
S
Shengliang Guan 已提交
136
        SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
    }

    if (oriStatus != atomic_val_compare_exchange_8(&pJob->status, oriStatus, newStatus)) {
      continue;
    }

    SCH_JOB_DLOG("job status updated from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));

    break;
  }

  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
152 153 154 155 156
  if (TSDB_CODE_SCH_IGNORE_ERROR == code) {
    SCH_JOB_DLOG("ignore job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
  } else {
    SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
  }
D
dapan1121 已提交
157 158 159
  SCH_RET(code);
}

D
dapan1121 已提交
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
  for (int32_t i = 0; i < pJob->levelNum; ++i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);

    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
      SSubplan *pPlan = pTask->plan;
      int32_t   childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0;
      int32_t   parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0;

      if (childNum > 0) {
        if (pJob->levelIdx == pLevel->level) {
          SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        pTask->children = taosArrayInit(childNum, POINTER_BYTES);
        if (NULL == pTask->children) {
          SCH_TASK_ELOG("taosArrayInit %d children failed", childNum);
S
Shengliang Guan 已提交
179
          SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
180 181 182 183 184 185 186 187 188 189 190 191 192
        }
      }

      for (int32_t n = 0; n < childNum; ++n) {
        SSubplan  *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
        SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
        if (NULL == childTask || NULL == *childTask) {
          SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        if (NULL == taosArrayPush(pTask->children, childTask)) {
          SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
S
Shengliang Guan 已提交
193
          SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
194 195
        }

D
dapan1121 已提交
196
        SCH_TASK_DLOG("children info, the %d child TID 0x%" PRIx64, n, (*childTask)->taskId);
D
dapan1121 已提交
197 198 199 200 201 202 203 204 205 206 207
      }

      if (parentNum > 0) {
        if (0 == pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        pTask->parents = taosArrayInit(parentNum, POINTER_BYTES);
        if (NULL == pTask->parents) {
          SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
S
Shengliang Guan 已提交
208
          SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
        }
      } else {
        if (0 != pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:%d, parentNum:%d", pLevel->level, parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
      }

      for (int32_t n = 0; n < parentNum; ++n) {
        SSubplan  *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
        SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
        if (NULL == parentTask || NULL == *parentTask) {
          SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        if (NULL == taosArrayPush(pTask->parents, parentTask)) {
          SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
S
Shengliang Guan 已提交
227
          SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
228 229
        }

H
Hongze Cheng 已提交
230
        SCH_TASK_DLOG("parents info, the %d parent TID 0x%" PRIx64, n, (*parentTask)->taskId);
D
dapan1121 已提交
231 232 233 234 235 236 237
      }

      SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
    }
  }

  SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
D
dapan1121 已提交
238 239 240 241 242 243
  if (SCH_IS_QUERY_JOB(pJob)) {
    if (pLevel->taskNum > 1) {
      SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }

H
Hongze Cheng 已提交
244
    SSchTask *pTask = taosArrayGet(pLevel->subTasks, 0);
X
Xiaoyu Wang 已提交
245
    if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType || EXPLAIN_MODE_DISABLE != pJob->attr.explainMode) {
D
dapan1121 已提交
246 247
      pJob->attr.needFetch = true;
    }
D
dapan1121 已提交
248 249 250 251 252
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
253
int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
254
  if (!SCH_IS_DATA_BIND_QRY_TASK(pTask)) {
D
dapan1121 已提交
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
    return TSDB_CODE_SUCCESS;
  }

  taosArrayPush(pJob->dataSrcTasks, &pTask);

  return TSDB_CODE_SUCCESS;
}

int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
  int32_t code = 0;
  pJob->queryId = pDag->queryId;

  if (pDag->numOfSubplans <= 0) {
    SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  pJob->dataSrcTasks = taosArrayInit(pDag->numOfSubplans, POINTER_BYTES);
  if (NULL == pJob->dataSrcTasks) {
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
  if (levelNum <= 0) {
    SCH_JOB_ELOG("invalid level num:%d", levelNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SHashObj *planToTask = taosHashInit(
D
dapan1121 已提交
284
      pDag->numOfSubplans,
D
dapan1121 已提交
285 286 287 288
      taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
      HASH_NO_LOCK);
  if (NULL == planToTask) {
    SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
S
Shengliang Guan 已提交
289
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
290 291 292 293 294
  }

  pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
  if (NULL == pJob->levels) {
    SCH_JOB_ELOG("taosArrayInit %d failed", levelNum);
S
Shengliang Guan 已提交
295
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
296 297 298 299 300 301 302 303 304 305
  }

  pJob->levelNum = levelNum;
  pJob->levelIdx = levelNum - 1;

  SSchLevel      level = {0};
  SNodeListNode *plans = NULL;
  int32_t        taskNum = 0;
  SSchLevel     *pLevel = NULL;

D
dapan1121 已提交
306
  level.status = JOB_TASK_STATUS_INIT;
D
dapan1121 已提交
307 308 309 310

  for (int32_t i = 0; i < levelNum; ++i) {
    if (NULL == taosArrayPush(pJob->levels, &level)) {
      SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i);
S
Shengliang Guan 已提交
311
      SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
    }

    pLevel = taosArrayGet(pJob->levels, i);
    pLevel->level = i;

    plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
    if (NULL == plans) {
      SCH_JOB_ELOG("empty level plan, level:%d", i);
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
    }

    taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
    if (taskNum <= 0) {
      SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
    }

    pLevel->taskNum = taskNum;

    pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
    if (NULL == pLevel->subTasks) {
      SCH_JOB_ELOG("taosArrayInit %d failed", taskNum);
S
Shengliang Guan 已提交
334
      SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
335 336 337 338 339 340 341 342
    }

    for (int32_t n = 0; n < taskNum; ++n) {
      SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);

      SCH_SET_JOB_TYPE(pJob, plan->subplanType);

      SSchTask  task = {0};
D
dapan1121 已提交
343 344
      SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task);
      if (NULL == pTask) {
D
dapan1121 已提交
345
        SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
S
Shengliang Guan 已提交
346
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
347 348
      }

D
dapan1121 已提交
349
      SCH_ERR_JRET(schInitTask(pJob, pTask, plan, pLevel));
350

D
dapan1121 已提交
351
      SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask));
D
dapan1121 已提交
352

D
dapan1121 已提交
353
      if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES)) {
D
dapan1121 已提交
354
        SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
S
Shengliang Guan 已提交
355
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
356 357
      }

D
dapan1121 已提交
358 359
      if (0 != taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) {
        SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d", n);
S
Shengliang Guan 已提交
360
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
361 362
      }

D
dapan1121 已提交
363 364 365
      ++pJob->taskNum;
    }

D
dapan1121 已提交
366
    SCH_JOB_DLOG("level %d initialized, taskNum:%d", i, taskNum);
D
dapan1121 已提交
367 368 369 370 371
  }

  SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));

_return:
H
Hongze Cheng 已提交
372

D
dapan1121 已提交
373 374 375 376 377 378 379
  if (planToTask) {
    taosHashCleanup(planToTask);
  }

  SCH_RET(code);
}

H
Hongze Cheng 已提交
380
int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes) {
D
dapan1121 已提交
381 382
  pRes->code = atomic_load_32(&pJob->errCode);
  pRes->numOfRows = pJob->resNumOfRows;
D
dapan1121 已提交
383 384
  pRes->res = pJob->execRes.res;
  pRes->msgType = pJob->execRes.msgType;
D
dapan1121 已提交
385
  pRes->numOfBytes = pJob->execRes.numOfBytes;
D
dapan1121 已提交
386
  pJob->execRes.res = NULL;
D
dapan1121 已提交
387

D
dapan1121 已提交
388 389
  SCH_JOB_DLOG("execRes dumped, code: %s", tstrerror(pRes->code));

D
dapan1121 已提交
390 391 392
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
393
int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) {
D
dapan1121 已提交
394
  int32_t code = 0;
H
Hongze Cheng 已提交
395

D
dapan1121 已提交
396 397 398
  SCH_LOCK(SCH_WRITE, &pJob->resLock);

  pJob->fetched = true;
H
Hongze Cheng 已提交
399

D
dapan1121 已提交
400 401
  if (pJob->fetchRes && ((SRetrieveTableRsp *)pJob->fetchRes)->completed) {
    SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_SUCC, NULL));
D
dapan1121 已提交
402 403 404
  }

  while (true) {
D
dapan1121 已提交
405 406
    *pData = atomic_load_ptr(&pJob->fetchRes);
    if (*pData != atomic_val_compare_exchange_ptr(&pJob->fetchRes, *pData, NULL)) {
D
dapan1121 已提交
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422
      continue;
    }

    break;
  }

  if (NULL == *pData) {
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
    if (rsp) {
      rsp->completed = 1;
    }

    *pData = rsp;
    SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
  }

423
  SCH_JOB_DLOG("fetch done, totalRows:%" PRId64, pJob->resNumOfRows);
D
dapan1121 已提交
424

D
dapan1121 已提交
425 426 427
_return:

  SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
H
Hongze Cheng 已提交
428

D
dapan1121 已提交
429
  return code;
D
dapan1121 已提交
430 431
}

H
Hongze Cheng 已提交
432 433
int32_t schNotifyUserExecRes(SSchJob *pJob) {
  SExecResult *pRes = taosMemoryCalloc(1, sizeof(SExecResult));
D
dapan1121 已提交
434
  if (pRes) {
D
dapan1121 已提交
435
    schDumpJobExecRes(pJob, pRes);
D
dapan1121 已提交
436 437
  }

D
dapan1121 已提交
438
  SCH_JOB_DLOG("sch start to invoke exec cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
439
  (*pJob->userRes.execFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
440
  SCH_JOB_DLOG("sch end from exec cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
441 442 443 444

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
445 446 447
int32_t schNotifyUserFetchRes(SSchJob *pJob) {
  void *pRes = NULL;

D
dapan1121 已提交
448
  schDumpJobFetchRes(pJob, &pRes);
D
dapan1121 已提交
449 450

  SCH_JOB_DLOG("sch start to invoke fetch cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
451
  (*pJob->userRes.fetchFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
452
  SCH_JOB_DLOG("sch end from fetch cb, code: %s", tstrerror(pJob->errCode));
D
dapan1121 已提交
453 454 455 456

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
457
void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
D
dapan1121 已提交
458
  SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
H
Hongze Cheng 已提交
459

D
dapan1121 已提交
460
  if (SCH_OP_NULL == pJob->opStatus.op) {
D
dapan1121 已提交
461
    SCH_JOB_DLOG("job not in any operation, no need to post job res, status:%s", jobTaskStatusStr(pJob->status));
D
dapan1121 已提交
462
    goto _return;
D
dapan1121 已提交
463
  }
H
Hongze Cheng 已提交
464

D
dapan1121 已提交
465
  if (op && pJob->opStatus.op != op) {
D
dapan1121 已提交
466
    SCH_JOB_ELOG("job in operation %s mis-match with expected %s", schGetOpStr(pJob->opStatus.op), schGetOpStr(op));
D
dapan1121 已提交
467
    goto _return;
D
dapan1121 已提交
468
  }
H
Hongze Cheng 已提交
469

D
dapan1121 已提交
470
  if (SCH_JOB_IN_SYNC_OP(pJob)) {
D
dapan1121 已提交
471
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
472 473
    tsem_post(&pJob->rspSem);
  } else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) {
D
dapan1121 已提交
474
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
475 476
    schNotifyUserExecRes(pJob);
  } else if (SCH_JOB_IN_ASYNC_FETCH_OP(pJob)) {
D
dapan1121 已提交
477
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
478 479
    schNotifyUserFetchRes(pJob);
  } else {
D
dapan1121 已提交
480
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
481 482
    SCH_JOB_ELOG("job not in any operation, status:%s", jobTaskStatusStr(pJob->status));
  }
D
dapan1121 已提交
483 484 485 486 487 488

  return;

_return:

  SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
489 490
}

D
dapan1121 已提交
491
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
492 493 494
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    return TSDB_CODE_SCH_IGNORE_ERROR;
  }
X
Xiaoyu Wang 已提交
495

D
dapan1121 已提交
496
  schUpdateJobErrCode(pJob, errCode);
H
Hongze Cheng 已提交
497

D
dapan1121 已提交
498
  int32_t code = atomic_load_32(&pJob->errCode);
D
dapan1121 已提交
499
  if (code) {
500
    SCH_JOB_DLOG("job failed with error %s", tstrerror(code));
D
dapan1121 已提交
501
  }
D
dapan1121 已提交
502

D
dapan1121 已提交
503
  schPostJobRes(pJob, 0);
D
dapan1121 已提交
504

D
dapan1121 已提交
505
  SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
506 507
}

D
dapan1121 已提交
508
int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode) {
D
dapan1121 已提交
509 510 511 512
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    return TSDB_CODE_SCH_IGNORE_ERROR;
  }

D
dapan1121 已提交
513
  schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, &errCode);
D
dapan1121 已提交
514
  return TSDB_CODE_SCH_IGNORE_ERROR;
D
dapan1121 已提交
515 516
}

H
Hongze Cheng 已提交
517
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { SCH_RET(schProcessOnJobFailure(pJob, errCode)); }
D
dapan1121 已提交
518 519 520 521 522 523 524 525

int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) {
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    return TSDB_CODE_SCH_IGNORE_ERROR;
  }

  schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, &errCode);
  return TSDB_CODE_SCH_IGNORE_ERROR;
D
dapan1121 已提交
526 527
}

H
Hongze Cheng 已提交
528
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
D
dapan1121 已提交
529 530 531 532 533
  if (schChkCurrentOp(pJob, SCH_OP_FETCH, -1)) {
    SCH_ERR_RET(schLaunchFetchTask(pJob));
  } else {
    schPostJobRes(pJob, 0);
  }
D
dapan1121 已提交
534 535 536 537

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
538
void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); }
D
dapan1121 已提交
539

D
dapan1121 已提交
540
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
541
  SCH_TASK_DLOG("got explain rsp, rows:%" PRId64 ", complete:%d", htobe64(pRsp->numOfRows), pRsp->completed);
D
dapan1121 已提交
542

543
  atomic_store_64(&pJob->resNumOfRows, htobe64(pRsp->numOfRows));
D
dapan1121 已提交
544
  atomic_store_ptr(&pJob->fetchRes, pRsp);
D
dapan1121 已提交
545

D
dapan1121 已提交
546
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
D
dapan1121 已提交
547

X
Xiaoyu Wang 已提交
548 549 550
  if (!SCH_IS_INSERT_JOB(pJob)) {
    schProcessOnDataFetched(pJob);
  }
D
dapan1121 已提交
551

D
dapan1121 已提交
552
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
553 554
}

D
dapan1121 已提交
555
int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
556 557 558 559 560
  if (!SCH_IS_QUERY_JOB(pJob)) {
    return TSDB_CODE_SUCCESS;
  }

  SSchLevel *pLevel = pTask->level;
D
dapan1121 已提交
561
  int32_t    doneNum = atomic_add_fetch_32(&pLevel->taskExecDoneNum, 1);
D
dapan1121 已提交
562
  if (doneNum == pLevel->taskNum) {
D
dapan1121 已提交
563
    atomic_sub_fetch_32(&pJob->levelIdx, 1);
D
dapan1121 已提交
564 565 566 567 568 569 570 571

    pLevel = taosArrayGet(pJob->levels, pJob->levelIdx);
    for (int32_t i = 0; i < pLevel->taskNum; ++i) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);

      if (pTask->children && taosArrayGetSize(pTask->children) > 0) {
        continue;
      }
H
Hongze Cheng 已提交
572

D
dapan1121 已提交
573 574 575 576
      if (SCH_TASK_ALREADY_LAUNCHED(pTask)) {
        continue;
      }

D
dapan1121 已提交
577 578 579 580 581 582 583
      SCH_ERR_RET(schLaunchTask(pJob, pTask));
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
584
int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) {
D
dapan1121 已提交
585
  if (rsp->tbFName[0]) {
D
dapan1121 已提交
586
    SCH_LOCK(SCH_WRITE, &pJob->resLock);
H
Hongze Cheng 已提交
587

D
dapan1121 已提交
588 589 590
    if (NULL == pJob->execRes.res) {
      pJob->execRes.res = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
      if (NULL == pJob->execRes.res) {
H
Hongze Cheng 已提交
591
        SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
D
dapan1121 已提交
592 593 594 595 596 597 598 599 600
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
      }
    }

    STbVerInfo tbInfo;
    strcpy(tbInfo.tbFName, rsp->tbFName);
    tbInfo.sversion = rsp->sversion;
    tbInfo.tversion = rsp->tversion;

D
dapan1121 已提交
601
    taosArrayPush((SArray *)pJob->execRes.res, &tbInfo);
D
dapan1121 已提交
602
    pJob->execRes.msgType = TDMT_SCH_QUERY;
D
dapan1121 已提交
603 604

    SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
D
dapan1121 已提交
605 606 607 608 609
  }

  return TSDB_CODE_SUCCESS;
}

610
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
D
dapan1121 已提交
611
  schGetTaskFromList(pJob->taskList, taskId, pTask);
612
  if (NULL == *pTask) {
D
dapan1121 已提交
613
    SCH_JOB_ELOG("task not found in job task list, taskId:0x%" PRIx64, taskId);
D
dapan1121 已提交
614
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
615 616 617 618 619
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
620 621
int32_t schLaunchJob(SSchJob *pJob) {
  if (EXPLAIN_MODE_STATIC == pJob->attr.explainMode) {
D
dapan1121 已提交
622
    SCH_ERR_RET(qExecStaticExplain(pJob->pDag, (SRetrieveTableRsp **)&pJob->fetchRes));
D
dapan1121 已提交
623
    SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
D
dapan1121 已提交
624
  } else {
D
dapan1121 已提交
625 626
    SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
    SCH_ERR_RET(schLaunchLevelTasks(pJob, level));
D
dapan1121 已提交
627 628 629 630 631 632 633
  }

  return TSDB_CODE_SUCCESS;
}

void schDropJobAllTasks(SSchJob *pJob) {
  schDropTaskInHashList(pJob, pJob->execTasks);
H
Hongze Cheng 已提交
634 635
  //  schDropTaskInHashList(pJob, pJob->succTasks);
  //  schDropTaskInHashList(pJob, pJob->failTasks);
D
dapan1121 已提交
636 637 638 639 640 641 642 643 644 645 646
}

void schFreeJobImpl(void *job) {
  if (NULL == job) {
    return;
  }

  SSchJob *pJob = job;
  uint64_t queryId = pJob->queryId;
  int64_t  refId = pJob->refId;

D
dapan1121 已提交
647 648
  qDebug("QID:0x%" PRIx64 " begin to free sch job, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);

D
dapan1121 已提交
649 650 651 652 653 654 655 656 657
  schDropJobAllTasks(pJob);

  int32_t numOfLevels = taosArrayGetSize(pJob->levels);
  for (int32_t i = 0; i < numOfLevels; ++i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);

    int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
    for (int32_t j = 0; j < numOfTasks; ++j) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
D
dapan1121 已提交
658
      schFreeTask(pJob, pTask);
D
dapan1121 已提交
659 660 661 662 663 664 665 666
    }

    taosArrayDestroy(pLevel->subTasks);
  }

  schFreeFlowCtrl(pJob);

  taosHashCleanup(pJob->execTasks);
H
Hongze Cheng 已提交
667 668
  //  taosHashCleanup(pJob->failTasks);
  //  taosHashCleanup(pJob->succTasks);
D
dapan1121 已提交
669
  taosHashCleanup(pJob->taskList);
H
Hongze Cheng 已提交
670

D
dapan1121 已提交
671 672 673 674 675 676
  taosArrayDestroy(pJob->levels);
  taosArrayDestroy(pJob->nodeList);
  taosArrayDestroy(pJob->dataSrcTasks);

  qExplainFreeCtx(pJob->explainCtx);

D
dapan1121 已提交
677
  destroyQueryExecRes(&pJob->execRes);
D
dapan1121 已提交
678

D
dapan1121 已提交
679
  qDestroyQueryPlan(pJob->pDag);
680
  nodesReleaseAllocatorWeakRef(pJob->allocatorRefId);
D
dapan1121 已提交
681

D
dapan1121 已提交
682
  taosMemoryFreeClear(pJob->userRes.execRes);
D
dapan1121 已提交
683
  taosMemoryFreeClear(pJob->fetchRes);
D
dapan1121 已提交
684
  taosMemoryFreeClear(pJob->sql);
D
dapan1121 已提交
685
  tsem_destroy(&pJob->rspSem);
D
dapan1121 已提交
686
  taosMemoryFree(pJob);
D
dapan1121 已提交
687

D
dapan1121 已提交
688 689 690 691
  int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
  if (jobNum == 0) {
    schCloseJobRef();
  }
D
dapan1121 已提交
692 693

  qDebug("QID:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
D
dapan1121 已提交
694 695
}

D
dapan1121 已提交
696
int32_t schJobFetchRows(SSchJob *pJob) {
H
Hongze Cheng 已提交
697
  int32_t code = 0;
D
dapan1121 已提交
698

X
Xiaoyu Wang 已提交
699
  if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC) && !(SCH_IS_EXPLAIN_JOB(pJob) && SCH_IS_INSERT_JOB(pJob))) {
D
dapan1121 已提交
700
    SCH_ERR_RET(schLaunchFetchTask(pJob));
H
Hongze Cheng 已提交
701

D
dapan1121 已提交
702
    if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
D
dapan1121 已提交
703 704
      SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
      tsem_wait(&pJob->rspSem);
H
Hongze Cheng 已提交
705
      SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
D
dapan1121 已提交
706 707
    }
  } else {
D
dapan1121 已提交
708
    if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
H
Hongze Cheng 已提交
709
      SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
D
dapan1121 已提交
710 711 712
    } else {
      schPostJobRes(pJob, SCH_OP_FETCH);
    }
D
dapan1121 已提交
713 714
  }

D
dapan1121 已提交
715
  SCH_RET(code);
D
dapan1121 已提交
716 717
}

D
dapan1121 已提交
718
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
D
dapan1121 已提交
719 720 721 722 723
  int32_t  code = 0;
  int64_t  refId = -1;
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
    qError("QID:0x%" PRIx64 " calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob));
S
Shengliang Guan 已提交
724
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
725 726 727
  }

  pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
H
Haojun Liao 已提交
728
  pJob->attr.localExec = pReq->localReq;
D
dapan1121 已提交
729
  pJob->conn = *pReq->pConn;
D
dapan1121 已提交
730
  if (pReq->sql) {
731
    pJob->sql = taosStrdup(pReq->sql);
D
dapan1121 已提交
732
  }
D
dapan1121 已提交
733
  pJob->pDag = pReq->pDag;
734
  pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId);
D
dapan1121 已提交
735 736 737
  pJob->chkKillFp = pReq->chkKillFp;
  pJob->chkKillParam = pReq->chkKillParam;
  pJob->userRes.execFp = pReq->execFp;
D
dapan1121 已提交
738
  pJob->userRes.cbParam = pReq->cbParam;
D
dapan1121 已提交
739 740 741 742

  if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
    qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
  } else {
H
Haojun Liao 已提交
743
    pJob->nodeList = taosArrayDup(pReq->pNodeList, NULL);
D
dapan1121 已提交
744
  }
H
Hongze Cheng 已提交
745 746 747

  pJob->taskList = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                                HASH_ENTRY_LOCK);
D
dapan1121 已提交
748 749
  if (NULL == pJob->taskList) {
    SCH_JOB_ELOG("taosHashInit %d taskList failed", pReq->pDag->numOfSubplans);
S
Shengliang Guan 已提交
750
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
751 752 753 754 755 756 757 758
  }

  SCH_ERR_JRET(schValidateAndBuildJob(pReq->pDag, pJob));

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_JRET(qExecExplainBegin(pReq->pDag, &pJob->explainCtx, pReq->startTs));
  }

H
Hongze Cheng 已提交
759 760
  pJob->execTasks = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                                 HASH_ENTRY_LOCK);
D
dapan1121 已提交
761 762
  if (NULL == pJob->execTasks) {
    SCH_JOB_ELOG("taosHashInit %d execTasks failed", pReq->pDag->numOfSubplans);
S
Shengliang Guan 已提交
763
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
764 765
  }

D
dapan1121 已提交
766 767 768 769
  if (tsem_init(&pJob->rspSem, 0, 0)) {
    SCH_JOB_ELOG("tsem_init failed, errno:%d", errno);
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
770

D
dapan1121 已提交
771 772
  pJob->refId = taosAddRef(schMgmt.jobRef, pJob);
  if (pJob->refId < 0) {
D
dapan1121 已提交
773 774 775 776 777 778
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
    SCH_ERR_JRET(terrno);
  }

  atomic_add_fetch_32(&schMgmt.jobNum, 1);

D
dapan1121 已提交
779
  *pJobId = pJob->refId;
D
dapan1121 已提交
780

H
Hongze Cheng 已提交
781
  SCH_JOB_DLOG("job refId:0x%" PRIx64 " created", pJob->refId);
D
dapan1121 已提交
782 783 784 785 786 787 788

  return TSDB_CODE_SUCCESS;

_return:

  if (NULL == pJob) {
    qDestroyQueryPlan(pReq->pDag);
D
dapan1121 已提交
789
  } else if (pJob->refId < 0) {
D
dapan1121 已提交
790 791
    schFreeJobImpl(pJob);
  } else {
D
dapan1121 已提交
792
    taosRemoveRef(schMgmt.jobRef, pJob->refId);
D
dapan1121 已提交
793
  }
H
Hongze Cheng 已提交
794

D
dapan1121 已提交
795 796 797
  SCH_RET(code);
}

D
dapan1121 已提交
798 799
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
  int32_t code = 0;
H
Hongze Cheng 已提交
800
  qDebug("QID:0x%" PRIx64 " sch job refId 0x%" PRIx64 " started", pReq->pDag->queryId, pJob->refId);
D
dapan1121 已提交
801

D
dapan1121 已提交
802
  SCH_ERR_RET(schLaunchJob(pJob));
H
Hongze Cheng 已提交
803

D
dapan1121 已提交
804 805 806
  if (pReq->syncReq) {
    SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
    tsem_wait(&pJob->rspSem);
D
dapan1121 已提交
807 808 809
  }

  SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
H
Hongze Cheng 已提交
810

D
dapan1121 已提交
811
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
812 813
}

H
Hongze Cheng 已提交
814
void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode) {
D
dapan1121 已提交
815
  if (NULL == pReq || pReq->syncReq) {
D
dapan1121 已提交
816 817
    return;
  }
H
Hongze Cheng 已提交
818

D
dapan1121 已提交
819 820 821 822 823 824
  if (pReq->execFp) {
    (*pReq->execFp)(NULL, pReq->cbParam, errCode);
  } else if (pReq->fetchFp) {
    (*pReq->fetchFp)(NULL, pReq->cbParam, errCode);
  }
}
D
dapan1121 已提交
825

D
dapan1121 已提交
826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849
int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) {
  if (pJob->status >= JOB_TASK_STATUS_PART_SUCC) {
    SCH_LOCK(SCH_WRITE, &pJob->resLock);
    if (pJob->fetched) {
      SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
      SCH_TASK_ELOG("already fetched while got error %s", tstrerror(rspCode));
      SCH_ERR_JRET(rspCode);
    }
    SCH_UNLOCK(SCH_WRITE, &pJob->resLock);

    schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC);
  }

  return TSDB_CODE_SUCCESS;
}


int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) {
  int32_t code = 0;

  SCH_ERR_JRET(schChkResetJobRetry(pJob, rspCode));

}

D
dapan1121 已提交
850 851
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) {
  bool r = false;
D
dapan1121 已提交
852
  SCH_LOCK(SCH_READ, &pJob->opStatus.lock);
D
dapan1121 已提交
853 854 855 856 857
  if (sync >= 0) {
    r = (pJob->opStatus.op == op) && (pJob->opStatus.syncReq == sync);
  } else {
    r = (pJob->opStatus.op == op);
  }
D
dapan1121 已提交
858 859 860 861 862
  SCH_UNLOCK(SCH_READ, &pJob->opStatus.lock);

  return r;
}

H
Hongze Cheng 已提交
863
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode) {
D
dapan1121 已提交
864
  int32_t op = 0;
H
Hongze Cheng 已提交
865

D
dapan1121 已提交
866 867
  switch (type) {
    case SCH_OP_EXEC:
D
dapan1121 已提交
868
      if (pReq && pReq->syncReq) {
D
dapan1121 已提交
869
        SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
870 871
        op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
        if (SCH_OP_NULL == op || op != type) {
H
Hongze Cheng 已提交
872 873
          SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
                       jobTaskStatusStr(pJob->status));
D
dapan1121 已提交
874
        }
D
dapan1121 已提交
875
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
876 877 878 879
        schDumpJobExecRes(pJob, pReq->pExecRes);
      }
      break;
    case SCH_OP_FETCH:
D
dapan1121 已提交
880
      if (pReq && pReq->syncReq) {
D
dapan1121 已提交
881
        SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
882 883
        op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
        if (SCH_OP_NULL == op || op != type) {
H
Hongze Cheng 已提交
884 885
          SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
                       jobTaskStatusStr(pJob->status));
D
dapan1121 已提交
886
        }
D
dapan1121 已提交
887
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
888 889 890 891 892 893 894
      }
      break;
    case SCH_OP_GET_STATUS:
      errCode = TSDB_CODE_SUCCESS;
      break;
    default:
      break;
D
dapan1121 已提交
895 896
  }

D
dapan1121 已提交
897
  if (errCode) {
D
dapan1121 已提交
898
    schHandleJobFailure(pJob, errCode);
D
dapan1121 已提交
899 900 901
  }

  SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode));
D
dapan1121 已提交
902 903
}

H
Hongze Cheng 已提交
904
int32_t schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq) {
D
dapan1121 已提交
905
  int32_t code = 0;
H
Hongze Cheng 已提交
906 907
  int8_t  status = SCH_GET_JOB_STATUS(pJob);

D
dapan1121 已提交
908 909
  switch (type) {
    case SCH_OP_EXEC:
D
dapan1121 已提交
910
      SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
911 912
      if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
        SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
D
dapan1121 已提交
913
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
S
Shengliang Guan 已提交
914 915
        schDirectPostJobRes(pReq, TSDB_CODE_APP_ERROR);
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
916
      }
H
Hongze Cheng 已提交
917

D
dapan1121 已提交
918
      SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
H
Hongze Cheng 已提交
919

D
dapan1121 已提交
920
      pJob->opStatus.syncReq = pReq->syncReq;
D
dapan1121 已提交
921
      SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
922 923
      break;
    case SCH_OP_FETCH:
D
dapan1121 已提交
924
      SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
D
dapan1121 已提交
925 926
      if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
        SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
D
dapan1121 已提交
927
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
S
Shengliang Guan 已提交
928 929
        schDirectPostJobRes(pReq, TSDB_CODE_APP_ERROR);
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
930
      }
H
Hongze Cheng 已提交
931

D
dapan1121 已提交
932
      SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
H
Hongze Cheng 已提交
933

D
dapan1121 已提交
934 935 936
      pJob->userRes.fetchRes = pReq->pFetchRes;
      pJob->userRes.fetchFp = pReq->fetchFp;
      pJob->userRes.cbParam = pReq->cbParam;
H
Hongze Cheng 已提交
937

D
dapan1121 已提交
938
      pJob->opStatus.syncReq = pReq->syncReq;
D
dapan1121 已提交
939
      SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
H
Hongze Cheng 已提交
940

D
dapan1121 已提交
941 942
      if (!SCH_JOB_NEED_FETCH(pJob)) {
        SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
S
Shengliang Guan 已提交
943
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
944 945 946 947
      }

      if (status != JOB_TASK_STATUS_PART_SUCC) {
        SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
948 949
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
H
Hongze Cheng 已提交
950

D
dapan1121 已提交
951 952 953 954 955
      break;
    case SCH_OP_GET_STATUS:
      if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) {
        qDebug("job not initialized or not executable job, refId:0x%" PRIx64, pJob->refId);
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
956
      }
957
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
958 959
    default:
      SCH_JOB_ELOG("unknown operation type %d", type);
S
Shengliang Guan 已提交
960
      SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
961 962
  }

D
dapan1121 已提交
963 964 965 966 967
  if (schJobNeedToStop(pJob, &status)) {
    SCH_JOB_ELOG("abort op %s cause of job need to stop, status:%s", schGetOpStr(type), jobTaskStatusStr(status));
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
  }

D
dapan1121 已提交
968
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
969
}
D
dapan1121 已提交
970

D
dapan1121 已提交
971 972 973 974
void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
  if (pTask) {
    SCH_UNLOCK_TASK(pTask);
  }
D
dapan1121 已提交
975

D
dapan1121 已提交
976
  if (errCode) {
D
dapan1121 已提交
977
    schHandleJobFailure(pJob, errCode);
D
dapan1121 已提交
978
  }
H
Hongze Cheng 已提交
979

D
dapan1121 已提交
980 981 982
  if (pJob) {
    schReleaseJob(pJob->refId);
  }
D
dapan1121 已提交
983 984
}

H
Hongze Cheng 已提交
985
int32_t schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_t rId, uint64_t tId) {
D
dapan1121 已提交
986
  int32_t code = 0;
H
Hongze Cheng 已提交
987
  int8_t  status = 0;
D
dapan1121 已提交
988 989

  SSchTask *pTask = NULL;
H
Hongze Cheng 已提交
990
  SSchJob  *pJob = schAcquireJob(rId);
D
dapan1121 已提交
991 992 993
  if (NULL == pJob) {
    qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId);
    SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST);
D
dapan1121 已提交
994
  }
H
Hongze Cheng 已提交
995

D
dapan1121 已提交
996
  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
997
    SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
D
dapan1121 已提交
998 999 1000 1001 1002 1003 1004
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
  }

  SCH_ERR_JRET(schGetTaskInJob(pJob, tId, &pTask));

  SCH_LOCK_TASK(pTask);

D
dapan1121 已提交
1005 1006 1007
  *job = pJob;
  *task = pTask;

D
dapan1121 已提交
1008
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1009

D
dapan1121 已提交
1010 1011 1012 1013 1014 1015 1016 1017
_return:

  if (pTask) {
    SCH_UNLOCK_TASK(pTask);
  }
  if (pJob) {
    schReleaseJob(rId);
  }
H
Hongze Cheng 已提交
1018

D
dapan1121 已提交
1019 1020
  SCH_RET(code);
}