schTask.c 30.1 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalog.h"
#include "command.h"
#include "query.h"
D
dapan1121 已提交
19
#include "schInt.h"
D
dapan1121 已提交
20 21 22 23 24 25
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"

void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
  schDeregisterTaskHb(pJob, pTask);
26

D
dapan1121 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
  if (pTask->candidateAddrs) {
    taosArrayDestroy(pTask->candidateAddrs);
  }

  taosMemoryFreeClear(pTask->msg);

  if (pTask->children) {
    taosArrayDestroy(pTask->children);
  }

  if (pTask->parents) {
    taosArrayDestroy(pTask->parents);
  }

  if (pTask->execNodes) {
    taosHashCleanup(pTask->execNodes);
  }
D
dapan1121 已提交
44

D
dapan1121 已提交
45 46 47
  taosArrayDestroy(pTask->profile.execTime);
}

D
dapan1121 已提交
48
void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) {
D
dapan1121 已提交
49
  if (SCH_IS_DATA_BIND_TASK(pTask) || (!SCH_IS_QUERY_JOB(pJob)) || (SCH_ALL != schMgmt.cfg.schPolicy)) {
D
dapan1121 已提交
50
    pTask->maxRetryTimes = SCH_DEFAULT_MAX_RETRY_NUM;
D
dapan1121 已提交
51 52
  } else {
    int32_t nodeNum = taosArrayGetSize(pJob->nodeList);
D
dapan1121 已提交
53
    pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM);
D
dapan1121 已提交
54
  }
dengyihao's avatar
dengyihao 已提交
55

D
dapan1121 已提交
56
  pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1);
D
dapan1121 已提交
57 58
}

D
dapan1121 已提交
59
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
D
dapan1121 已提交
60
  int32_t code = 0;
61

D
dapan1121 已提交
62 63 64 65 66
  pTask->plan = pPlan;
  pTask->level = pLevel;
  pTask->execId = -1;
  pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
  pTask->taskId = schGenTaskId();
D
dapan1121 已提交
67

D
dapan1121 已提交
68
  schInitTaskRetryTimes(pJob, pTask, pLevel);
D
dapan1121 已提交
69

D
dapan1121 已提交
70 71
  pTask->execNodes =
      taosHashInit(pTask->maxExecTimes, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
D
dapan1121 已提交
72
  pTask->profile.execTime = taosArrayInit(pTask->maxExecTimes, sizeof(int64_t));
D
dapan1121 已提交
73 74
  if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
75 76
  }

D
dapan1121 已提交
77 78
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);

D
dapan1121 已提交
79 80
  SCH_TASK_DLOG("task initialized, max times %d:%d", pTask->maxRetryTimes, pTask->maxExecTimes);

D
dapan1121 已提交
81
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
82 83 84

_return:

D
dapan1121 已提交
85
  taosArrayDestroy(pTask->profile.execTime);
D
dapan1121 已提交
86 87 88
  taosHashCleanup(pTask->execNodes);

  SCH_RET(code);
D
dapan1121 已提交
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
}

int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
  if (NULL == addr) {
    SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
                  (int32_t)taosArrayGetSize(pTask->candidateAddrs));
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

  pTask->succeedAddr = *addr;

  return TSDB_CODE_SUCCESS;
}

int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) {
D
dapan1121 已提交
105
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = SCH_GET_TASK_HANDLE(pTask)};
D
dapan1121 已提交
106 107 108 109 110 111

  if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) {
    SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
112
  SCH_TASK_DLOG("task execNode added, execId:%d, handle:%p", execId, nodeInfo.handle);
D
dapan1121 已提交
113 114 115 116 117 118 119 120 121 122

  return TSDB_CODE_SUCCESS;
}

int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
  if (NULL == pTask->execNodes) {
    return TSDB_CODE_SUCCESS;
  }

  if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) {
D
dapan1121 已提交
123
    SCH_TASK_DLOG("execId %d already not in execNodeList", execId);
D
dapan1121 已提交
124 125 126
  } else {
    SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
  }
127

D
dapan1121 已提交
128 129
  if ((execId != pTask->execId) || pTask->waitRetry) {  // ignore it
    SCH_TASK_DLOG("execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
D
dapan1121 已提交
130
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
131 132 133 134 135 136 137 138 139 140 141
  }

  return TSDB_CODE_SUCCESS;
}

int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
  if (taosHashGetSize(pTask->execNodes) <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
D
dapan1121 已提交
142
  if (NULL == nodeInfo) {  // ignore it
dengyihao's avatar
dengyihao 已提交
143 144
    SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId,
                  pTask->execId, pTask->waitRetry);
D
dapan1121 已提交
145 146 147
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
148 149 150 151 152 153 154 155 156 157 158
  nodeInfo->handle = handle;

  SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId);

  return TSDB_CODE_SUCCESS;
}

int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId) {
  if (dropExecNode) {
    SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
  }
159 160
  
  schUpdateTaskExecNode(pJob, pTask, handle, execId);
D
dapan1121 已提交
161

162 163 164 165 166
  if ((execId != pTask->execId) || pTask->waitRetry) {  // ignore it
    SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
  }
  
D
dapan1121 已提交
167 168 169 170 171 172
  SCH_SET_TASK_HANDLE(pTask, handle);

  return TSDB_CODE_SUCCESS;
}

int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
D
dapan1121 已提交
173 174 175
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    return TSDB_CODE_SCH_IGNORE_ERROR;
  }
176

D
dapan1121 已提交
177 178 179
  int8_t jobStatus = 0;
  if (schJobNeedToStop(pJob, &jobStatus)) {
    SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(jobStatus));
D
dapan1121 已提交
180 181
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
  }
182

D
dapan1121 已提交
183 184 185
  int8_t taskStatus = SCH_GET_TASK_STATUS(pTask);
  if (taskStatus == JOB_TASK_STATUS_FAIL || taskStatus == JOB_TASK_STATUS_SUCC) {
    SCH_TASK_ELOG("task already done, status:%s", jobTaskStatusStr(taskStatus));
D
dapan1121 已提交
186 187
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
D
dapan1121 已提交
188 189 190 191 192 193 194 195 196 197 198 199 200

  if (errCode == TSDB_CODE_SCH_TIMEOUT_ERROR) {
    SCH_LOG_TASK_WAIT_TS(pTask);
  } else {
    SCH_LOG_TASK_END_TS(pTask);
  }

  bool    needRetry = false;
  bool    moved = false;
  int32_t taskDone = 0;

  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));

D
dapan1121 已提交
201
  SCH_ERR_RET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
D
dapan1121 已提交
202 203 204 205 206 207

  if (!needRetry) {
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));

    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAIL);

D
dapan1121 已提交
208
    if (SCH_JOB_NEED_WAIT(pJob)) {
D
dapan1121 已提交
209 210 211 212 213 214 215 216 217
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskFailed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);

      schUpdateJobErrCode(pJob, errCode);

      if (taskDone < pTask->level->taskNum) {
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
218
        SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
219
      }
D
dapan1121 已提交
220 221

      SCH_RET(atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
222 223
    }
  } else {
D
dapan1121 已提交
224
    SCH_ERR_RET(schHandleTaskRetry(pJob, pTask));
D
dapan1121 已提交
225 226 227 228

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
229
  SCH_RET(errCode);
D
dapan1121 已提交
230 231 232 233 234 235 236 237 238 239 240 241 242
}

// Note: no more task error processing, handled in function internal
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
  bool    moved = false;
  int32_t code = 0;

  SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));

  SCH_LOG_TASK_END_TS(pTask);

  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC);

D
dapan1121 已提交
243
  SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask));
D
dapan1121 已提交
244

D
dapan1121 已提交
245
  SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
D
dapan1121 已提交
246 247 248 249

  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
  if (parentNum == 0) {
    int32_t taskDone = 0;
D
dapan1121 已提交
250
    if (SCH_JOB_NEED_WAIT(pJob)) {
D
dapan1121 已提交
251 252 253 254 255 256 257 258 259 260 261 262 263
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskSucceed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);

      if (taskDone < pTask->level->taskNum) {
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
        return TSDB_CODE_SUCCESS;
      } else if (taskDone > pTask->level->taskNum) {
        SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
      }

      if (pTask->level->taskFailed > 0) {
D
dapan1121 已提交
264
        SCH_RET(schHandleJobFailure(pJob, pJob->errCode));
D
dapan1121 已提交
265
      } else {
D
dapan1121 已提交
266
        SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
D
dapan1121 已提交
267 268 269 270 271 272 273
      }
    } else {
      pJob->resNode = pTask->succeedAddr;
    }

    pJob->fetchTask = pTask;

D
dapan1121 已提交
274
    SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
D
dapan1121 已提交
275 276 277 278 279 280 281 282 283 284 285 286 287 288
  }

  /*
    if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
      strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
      job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;

      ++job->dataSrcEps.numOfEps;
    }
  */

  for (int32_t i = 0; i < parentNum; ++i) {
    SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);

289
    SCH_LOCK(SCH_WRITE, &parent->planLock);
290 291 292 293 294 295 296 297
    SDownstreamSourceNode source = {
        .type = QUERY_NODE_DOWNSTREAM_SOURCE,
        .taskId = pTask->taskId,
        .schedId = schMgmt.sId,
        .execId = pTask->execId,
        .addr = pTask->succeedAddr,
        .fetchMsgType = SCH_FETCH_TYPE(pTask),
    };
D
dapan1121 已提交
298
    qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
299
    SCH_UNLOCK(SCH_WRITE, &parent->planLock);
D
dapan1121 已提交
300

X
Xiaoyu Wang 已提交
301 302
    int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);

D
dapan1121 已提交
303 304 305 306 307 308 309 310 311 312 313 314
    if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
      SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId);
      SCH_ERR_RET(schLaunchTask(pJob, parent));
    }
  }

  SCH_ERR_RET(schLaunchJobLowerLevel(pJob, pTask));

  return TSDB_CODE_SUCCESS;
}

int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
315 316 317
  if (!schMgmt.cfg.enableReSchedule) {
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
318

D
dapan1121 已提交
319
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
D
dapan1121 已提交
320 321 322
    return TSDB_CODE_SUCCESS;
  }

323 324
  if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && pJob->fetchTask != pTask &&
      taosArrayGetSize(pTask->candidateAddrs) > 1) {
D
dapan1121 已提交
325 326 327
    SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId);
    schDropTaskOnExecNode(pJob, pTask);
    taosHashClear(pTask->execNodes);
328

D
dapan1121 已提交
329
    SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR));
D
dapan1121 已提交
330 331 332 333 334
  }

  return TSDB_CODE_SUCCESS;
}

335
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
D
dapan1121 已提交
336
  int32_t code = 0;
337

D
dapan1121 已提交
338 339 340 341
  SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask));

  if (NULL == pData) {
    pTask->retryTimes = 0;
D
dapan1121 已提交
342 343
  }

D
dapan1121 已提交
344
  if (((pTask->execId + 1) >= pTask->maxExecTimes) || ((pTask->retryTimes + 1) > pTask->maxRetryTimes)) {
dengyihao's avatar
dengyihao 已提交
345 346
    SCH_TASK_DLOG("task no more retry since reach max times %d:%d, execId %d", pTask->maxRetryTimes,
                  pTask->maxExecTimes, pTask->execId);
D
dapan1121 已提交
347 348 349
    schHandleJobFailure(pJob, rspCode);
    return TSDB_CODE_SUCCESS;
  }
350

D
dapan1121 已提交
351
  pTask->waitRetry = true;
D
dapan1121 已提交
352 353
  schDropTaskOnExecNode(pJob, pTask);
  taosHashClear(pTask->execNodes);
354
  schRemoveTaskFromExecList(pJob, pTask);
D
dapan1121 已提交
355 356 357 358 359 360 361
  schDeregisterTaskHb(pJob, pTask);
  atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
  taosMemoryFreeClear(pTask->msg);
  pTask->msgLen = 0;
  pTask->lastMsgType = 0;
  memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));

D
dapan1121 已提交
362
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
D
dapan1121 已提交
363 364 365
    if (pData) {
      SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
    }
366

D
dapan1121 已提交
367 368 369 370
    if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
      if (JOB_TASK_STATUS_EXEC == SCH_GET_TASK_STATUS(pTask)) {
        SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
      }
371
    }
D
dapan1121 已提交
372 373

    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
374

D
dapan1121 已提交
375 376 377 378 379 380
    SCH_ERR_JRET(schLaunchTask(pJob, pTask));

    return TSDB_CODE_SUCCESS;
  }

  // merge plan
381

D
dapan1121 已提交
382
  pTask->childReady = 0;
383

D
dapan1121 已提交
384 385
  qClearSubplanExecutionNode(pTask->plan);

D
dapan1121 已提交
386 387 388 389 390
  // Note: current error task and upper level merge task
  if ((pData && 0 == pData->len) || NULL == pData) {
    SCH_ERR_JRET(schSwitchTaskCandidateAddr(pJob, pTask));
  }

D
dapan1121 已提交
391
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
392

D
dapan1121 已提交
393 394
  int32_t childrenNum = taosArrayGetSize(pTask->children);
  for (int32_t i = 0; i < childrenNum; ++i) {
395
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
D
dapan1121 已提交
396 397 398 399 400 401 402 403 404
    SCH_LOCK_TASK(pChild);
    schDoTaskRedirect(pJob, pChild, NULL, rspCode);
    SCH_UNLOCK_TASK(pChild);
  }

  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
405
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
406 407
}

408
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
D
dapan1121 已提交
409 410
  int32_t code = 0;

D
dapan1121 已提交
411 412 413 414 415
  if (JOB_TASK_STATUS_PART_SUCC == pJob->status) {
    SCH_LOCK(SCH_WRITE, &pJob->resLock);
    if (pJob->fetched) {
      SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
      SCH_TASK_ELOG("already fetched while got error %s", tstrerror(rspCode));
D
dapan1121 已提交
416
      SCH_ERR_JRET(rspCode);
D
dapan1121 已提交
417 418 419 420 421 422
    }
    SCH_UNLOCK(SCH_WRITE, &pJob->resLock);

    schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC);
  }

D
dapan1121 已提交
423
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
D
dapan1121 已提交
424 425 426 427 428 429
    if (NULL == pData->pEpSet) {
      SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode));
      SCH_ERR_JRET(rspCode);
    }
  }

D
dapan1121 已提交
430 431
  code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
  taosMemoryFree(pData->pData);
dengyihao's avatar
dengyihao 已提交
432
  taosMemoryFree(pData->pEpSet);
D
dapan1121 已提交
433 434

  SCH_RET(code);
D
dapan1121 已提交
435 436 437

_return:

D
dapan1121 已提交
438
  taosMemoryFree(pData->pData);
dengyihao's avatar
dengyihao 已提交
439
  taosMemoryFree(pData->pEpSet);
D
dapan1121 已提交
440

D
dapan1121 已提交
441
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543
}

int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      SCH_TASK_ELOG("task already in execTask list, code:%x", code);
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));

  return TSDB_CODE_SUCCESS;
}

/*
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
  } else {
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
  }

  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));

  return TSDB_CODE_SUCCESS;
}

int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  *moved = false;

  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
  }

  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;

      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));

  return TSDB_CODE_SUCCESS;
}

int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
  }

  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;

      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));

  return TSDB_CODE_SUCCESS;
}
*/

int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
  if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
    pTask->maxExecTimes++;
D
dapan1121 已提交
544
    pTask->maxRetryTimes++;
D
dapan1121 已提交
545 546 547 548 549 550 551 552
    if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) {
      pTask->timeoutUsec *= 2;
      if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) {
        pTask->timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC;
      }
    }
  }

D
dapan1121 已提交
553 554
  if ((pTask->retryTimes + 1) > pTask->maxRetryTimes) {
    *needRetry = false;
dengyihao's avatar
dengyihao 已提交
555 556
    SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d", pTask->retryTimes,
                  pTask->maxRetryTimes);
D
dapan1121 已提交
557 558 559
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
560 561
  if ((pTask->execId + 1) >= pTask->maxExecTimes) {
    *needRetry = false;
D
dapan1121 已提交
562
    SCH_TASK_DLOG("task no more retry since reach max exec times, execId:%d/%d", pTask->execId, pTask->maxExecTimes);
D
dapan1121 已提交
563 564 565 566 567 568 569 570 571
    return TSDB_CODE_SUCCESS;
  }

  if (!SCH_NEED_RETRY(pTask->lastMsgType, errCode)) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
572 573 574 575 576 577 578 579 580 581
  /*
    if (SCH_IS_DATA_BIND_TASK(pTask)) {
      if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
        *needRetry = false;
        SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId,
                      SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
        return TSDB_CODE_SUCCESS;
      }
    } else {
      int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
D
dapan1121 已提交
582

dengyihao's avatar
dengyihao 已提交
583 584 585 586 587 588
      if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) {
        *needRetry = false;
        SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
                      pTask->candidateIdx, candidateNum);
        return TSDB_CODE_SUCCESS;
      }
D
dapan1121 已提交
589
    }
dengyihao's avatar
dengyihao 已提交
590
  */
D
dapan1121 已提交
591 592 593 594 595 596 597 598 599 600

  *needRetry = true;
  SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execId + 1, errCode, tstrerror(errCode));

  return TSDB_CODE_SUCCESS;
}

int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
  atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);

601
  schRemoveTaskFromExecList(pJob, pTask);
D
dapan1121 已提交
602
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
603

D
dapan1121 已提交
604 605 606 607 608 609
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
  }

  schDeregisterTaskHb(pJob, pTask);

D
dapan1121 已提交
610
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
611 612
    SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
    SCH_SWITCH_EPSET(addr);
D
dapan1121 已提交
613
  } else {
D
dapan1121 已提交
614
    SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
D
dapan1121 已提交
615 616 617 618 619 620 621 622 623 624
  }

  SCH_ERR_RET(schLaunchTask(pJob, pTask));

  return TSDB_CODE_SUCCESS;
}

int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
  int32_t addNum = 0;
  int32_t nodeNum = 0;
625

D
dapan1121 已提交
626 627 628
  if (pJob->nodeList) {
    nodeNum = taosArrayGetSize(pJob->nodeList);

D
dapan1121 已提交
629
    for (int32_t i = 0; i < nodeNum; ++i) {
D
dapan1121 已提交
630 631
      SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
      SQueryNodeAddr *naddr = &nload->addr;
632

D
dapan1121 已提交
633 634 635 636 637
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

dengyihao's avatar
dengyihao 已提交
638 639 640
      SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId,
                    naddr->epSet.inUse, naddr->epSet.numOfEps, SCH_GET_CUR_EP(naddr)->fqdn,
                    SCH_GET_CUR_EP(naddr)->port);
D
dapan1121 已提交
641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659

      ++addNum;
    }
  }

  if (addNum <= 0) {
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
    SCH_ERR_RET(TSDB_CODE_TSC_NO_EXEC_NODE);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
  if (NULL != pTask->candidateAddrs) {
    return TSDB_CODE_SUCCESS;
  }

  pTask->candidateIdx = 0;
D
dapan1121 已提交
660
  pTask->candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr));
D
dapan1121 已提交
661
  if (NULL == pTask->candidateAddrs) {
D
dapan1121 已提交
662
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCHEDULE_DEFAULT_MAX_NODE_NUM);
D
dapan1121 已提交
663 664 665 666 667 668 669 670 671 672 673 674 675 676
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  if (pTask->plan->execNode.epSet.numOfEps > 0) {
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
      SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    SCH_TASK_DLOG("use execNode in plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
677
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
D
dapan1121 已提交
678 679 680 681
    SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

D
dapan1121 已提交
682 683 684 685 686 687 688 689 690 691 692 693 694 695
  SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));

  /*
    for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
      strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
      epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];

      ++epSet->numOfEps;
    }
  */

  return TSDB_CODE_SUCCESS;
}

696
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
D
dapan1121 已提交
697
  if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
698 699
    SCH_TASK_ELOG("not able to update cndidate addr, addr num %d",
                  (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0));
D
dapan1121 已提交
700 701 702
    SCH_ERR_RET(TSDB_CODE_APP_ERROR);
  }

703
  SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
D
dapan1121 已提交
704

705 706
  SEp *pOld = &pAddr->epSet.eps[pAddr->epSet.inUse];
  SEp *pNew = &pEpSet->eps[pEpSet->inUse];
D
dapan1121 已提交
707 708 709 710 711 712 713 714

  SCH_TASK_DLOG("update task ep from %s:%d to %s:%d", pOld->fqdn, pOld->port, pNew->fqdn, pNew->port);

  memcpy(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
715
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
716
  int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
D
dapan1121 已提交
717 718 719
  if (candidateNum <= 1) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
720

D
dapan1121 已提交
721 722
  switch (schMgmt.cfg.schPolicy) {
    case SCH_LOAD_SEQ:
dengyihao's avatar
dengyihao 已提交
723
    case SCH_ALL:
D
dapan1121 已提交
724 725 726 727 728 729 730 731 732 733 734 735
    default:
      if (++pTask->candidateIdx >= candidateNum) {
        pTask->candidateIdx = 0;
      }
      break;
    case SCH_RANDOM: {
      int32_t lastIdx = pTask->candidateIdx;
      while (lastIdx == pTask->candidateIdx) {
        pTask->candidateIdx = taosRand() % candidateNum;
      }
      break;
    }
D
dapan1121 已提交
736
  }
D
dapan1121 已提交
737 738 739 740

_return:

  SCH_TASK_DLOG("switch task candiateIdx to %d/%d", pTask->candidateIdx, candidateNum);
dengyihao's avatar
dengyihao 已提交
741

D
dapan1121 已提交
742 743 744
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
745 746 747
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
  int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
  if (code) {
748
    SCH_TASK_WLOG("task already not in execTask list, code:%x", code);
D
dapan1121 已提交
749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766
  }

  return TSDB_CODE_SUCCESS;
}

void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
  if (NULL == pTask->execNodes) {
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
    return;
  }

  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);

  if (size <= 0) {
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
    return;
  }

dengyihao's avatar
dengyihao 已提交
767
  int32_t       i = 0;
D
dapan1121 已提交
768 769
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
  while (nodeInfo) {
D
dapan1121 已提交
770 771 772 773 774 775 776
    if (nodeInfo->handle) {
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
      schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK);
      SCH_TASK_DLOG("start to drop task's %dth execNode", i);
    } else {
      SCH_TASK_DLOG("no need to drop task %dth execNode", i);
    }
D
dapan1121 已提交
777

D
dapan1121 已提交
778
    ++i;
D
dapan1121 已提交
779 780 781 782 783 784
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
  }

  SCH_TASK_DLOG("task has been dropped on %d exec nodes", size);
}

785 786
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
  int32_t   taskNum = (int32_t)taosArrayGetSize(pStatusList);
D
dapan1121 已提交
787
  SSchTask *pTask = NULL;
788
  SSchJob  *pJob = NULL;
D
dapan1121 已提交
789

790 791
  qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn,
         pEpId->ep.port);
D
dapan1121 已提交
792 793

  for (int32_t i = 0; i < taskNum; ++i) {
D
dapan1121 已提交
794
    STaskStatus *pStatus = taosArrayGet(pStatusList, i);
795 796 797 798
    int32_t      code = 0;

    qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId, pStatus->taskId,
           pStatus->execId, jobTaskStatusStr(pStatus->status));
D
dapan1121 已提交
799

D
dapan1121 已提交
800
    if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) {
D
dapan1121 已提交
801 802 803
      continue;
    }

D
dapan1121 已提交
804
    if (pStatus->execId != pTask->execId) {
805
      // TODO
D
dapan1121 已提交
806 807
      SCH_TASK_DLOG("execId %d mis-match current execId %d", pStatus->execId, pTask->execId);
      schProcessOnCbEnd(pJob, pTask, 0);
D
dapan1121 已提交
808 809
      continue;
    }
810

D
dapan1121 已提交
811
    if (pStatus->status == JOB_TASK_STATUS_FAIL) {
D
dapan1121 已提交
812
      // RECORD AND HANDLE ERROR!!!!
D
dapan1121 已提交
813
      schProcessOnCbEnd(pJob, pTask, 0);
D
dapan1121 已提交
814 815 816
      continue;
    }

D
dapan1121 已提交
817 818
    if (pStatus->status == JOB_TASK_STATUS_INIT) {
      code = schRescheduleTask(pJob, pTask);
D
dapan1121 已提交
819 820
    }

D
dapan1121 已提交
821
    schProcessOnCbEnd(pJob, pTask, code);
D
dapan1121 已提交
822 823 824 825 826
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
827 828
int32_t schLaunchTaskImpl(void *param) {
  SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
dengyihao's avatar
dengyihao 已提交
829
  SSchJob     *pJob = schAcquireJob(pCtx->jobRid);
830 831
  if (NULL == pJob) {
    qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
dengyihao's avatar
dengyihao 已提交
832
    taosMemoryFree(param);
833 834
    SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING);
  }
dengyihao's avatar
dengyihao 已提交
835

D
dapan1121 已提交
836
  SSchTask *pTask = pCtx->pTask;
837 838 839 840 841

  if (pCtx->asyncLaunch) {
    SCH_LOCK_TASK(pTask);
  }
  
dengyihao's avatar
dengyihao 已提交
842 843
  int8_t    status = 0;
  int32_t   code = 0;
D
dapan1121 已提交
844 845 846

  atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
  pTask->execId++;
D
dapan1121 已提交
847
  pTask->retryTimes++;
D
dapan1121 已提交
848
  pTask->waitRetry = false;
D
dapan1121 已提交
849

D
dapan1121 已提交
850
  SCH_TASK_DLOG("start to launch task, execId %d, retry %d", pTask->execId, pTask->retryTimes);
D
dapan1121 已提交
851 852 853 854

  SCH_LOG_TASK_START_TS(pTask);

  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
855
    SCH_TASK_DLOG("no need to launch task cause of job status %s", jobTaskStatusStr(status));
D
dapan1121 已提交
856
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
857 858 859 860
  }

  // NOTE: race condition: the task should be put into the hash table before send msg to server
  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
D
dapan1121 已提交
861
    SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
D
dapan1121 已提交
862 863 864 865 866 867
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
  }

  SSubplan *plan = pTask->plan;

  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
868
    code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
D
dapan1121 已提交
869 870 871
    if (TSDB_CODE_SUCCESS != code) {
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
                    pTask->msgLen);
D
dapan1121 已提交
872
      SCH_ERR_JRET(code);
D
dapan1121 已提交
873 874 875 876 877
    } else {
      SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg);
    }
  }

D
dapan1121 已提交
878
  SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
D
dapan1121 已提交
879 880

  if (SCH_IS_QUERY_JOB(pJob)) {
D
dapan1121 已提交
881
    SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
D
dapan1121 已提交
882 883
  }

D
dapan1121 已提交
884 885 886 887
  SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));

_return:

D
dapan1121 已提交
888 889 890 891 892 893 894
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
    if (code) {
      code = schProcessOnTaskFailure(pJob, pTask, code);
    }
    if (code) {
      code = schHandleJobFailure(pJob, code);
    }
D
dapan1121 已提交
895 896
  }

897 898 899 900
  if (pCtx->asyncLaunch) {
    SCH_UNLOCK_TASK(pTask);
  }

901 902
  schReleaseJob(pJob->refId);

903 904
  taosMemoryFree(param);

D
dapan1121 已提交
905 906 907
  SCH_RET(code);
}

dengyihao's avatar
dengyihao 已提交
908
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
909 910 911 912
  SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx));
  if (NULL == param) {
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
dengyihao's avatar
dengyihao 已提交
913

914
  param->jobRid = pJob->refId;
D
dapan1121 已提交
915 916
  param->pTask = pTask;

D
dapan1121 已提交
917
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
918
    param->asyncLaunch = true;
D
dapan1121 已提交
919 920 921 922
    taosAsyncExec(schLaunchTaskImpl, param, NULL);
  } else {
    SCH_ERR_RET(schLaunchTaskImpl(param));
  }
dengyihao's avatar
dengyihao 已提交
923

D
dapan1121 已提交
924 925 926 927 928 929 930 931 932 933 934 935 936 937
  return TSDB_CODE_SUCCESS;
}

// Note: no more error processing, handled in function internal
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
  bool    enough = false;
  int32_t code = 0;

  SCH_SET_TASK_HANDLE(pTask, NULL);

  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));

    if (enough) {
D
dapan1121 已提交
938
      SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
D
dapan1121 已提交
939 940
    }
  } else {
D
dapan1121 已提交
941
    SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
D
dapan1121 已提交
942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963
  }

  return TSDB_CODE_SUCCESS;

_return:

  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}

int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
  SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level));

  for (int32_t i = 0; i < level->taskNum; ++i) {
    SSchTask *pTask = taosArrayGet(level->subTasks, i);

    SCH_ERR_RET(schLaunchTask(pJob, pTask));
  }

  return TSDB_CODE_SUCCESS;
}

void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
D
dapan1121 已提交
964
  if (!SCH_JOB_NEED_DROP(pJob)) {
D
dapan1121 已提交
965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981
    return;
  }

  void *pIter = taosHashIterate(list, NULL);
  while (pIter) {
    SSchTask *pTask = *(SSchTask **)pIter;

    schDropTaskOnExecNode(pJob, pTask);

    pIter = taosHashIterate(list, pIter);
  }
}

// Note: no more error processing, handled in function internal
int32_t schLaunchFetchTask(SSchJob *pJob) {
  int32_t code = 0;

D
dapan1121 已提交
982 983 984
  void *fetchRes = atomic_load_ptr(&pJob->fetchRes);
  if (fetchRes) {
    SCH_JOB_DLOG("res already fetched, res:%p", fetchRes);
D
dapan1121 已提交
985 986 987
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
988
  SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask)));
D
dapan1121 已提交
989 990 991 992 993 994 995

  return TSDB_CODE_SUCCESS;

_return:

  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
}