schTask.c 38.1 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalog.h"
#include "command.h"
#include "query.h"
H
Hongze Cheng 已提交
19
#include "qworker.h"
D
dapan1121 已提交
20
#include "schInt.h"
H
Hongze Cheng 已提交
21
#include "tglobal.h"
D
dapan1121 已提交
22 23 24 25 26 27
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"

void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
  schDeregisterTaskHb(pJob, pTask);
28

D
dapan1121 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
  if (pTask->candidateAddrs) {
    taosArrayDestroy(pTask->candidateAddrs);
  }

  taosMemoryFreeClear(pTask->msg);

  if (pTask->children) {
    taosArrayDestroy(pTask->children);
  }

  if (pTask->parents) {
    taosArrayDestroy(pTask->parents);
  }

  if (pTask->execNodes) {
    taosHashCleanup(pTask->execNodes);
  }
D
dapan1121 已提交
46

D
dapan1121 已提交
47 48 49
  taosArrayDestroy(pTask->profile.execTime);
}

D
dapan1121 已提交
50
void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) {
D
dapan1121 已提交
51
  if (SCH_IS_DATA_BIND_TASK(pTask) || (!SCH_IS_QUERY_JOB(pJob)) || (SCH_ALL != schMgmt.cfg.schPolicy)) {
D
dapan1121 已提交
52
    pTask->maxRetryTimes = SCH_DEFAULT_MAX_RETRY_NUM;
D
dapan1121 已提交
53 54
  } else {
    int32_t nodeNum = taosArrayGetSize(pJob->nodeList);
D
dapan1121 已提交
55
    pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM);
D
dapan1121 已提交
56
  }
dengyihao's avatar
dengyihao 已提交
57

D
dapan1121 已提交
58
  pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1);
D
dapan1121 已提交
59 60
}

D
dapan1121 已提交
61
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
D
dapan1121 已提交
62
  int32_t code = 0;
63

D
dapan1121 已提交
64 65 66
  pTask->plan = pPlan;
  pTask->level = pLevel;
  pTask->execId = -1;
67
  pTask->failedExecId = -2;
D
dapan1121 已提交
68 69
  pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
  pTask->taskId = schGenTaskId();
D
dapan1121 已提交
70

D
dapan1121 已提交
71
  schInitTaskRetryTimes(pJob, pTask, pLevel);
D
dapan1121 已提交
72

D
dapan1121 已提交
73 74
  pTask->execNodes =
      taosHashInit(pTask->maxExecTimes, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
D
dapan1121 已提交
75
  pTask->profile.execTime = taosArrayInit(pTask->maxExecTimes, sizeof(int64_t));
D
dapan1121 已提交
76
  if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
S
Shengliang Guan 已提交
77
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
78 79
  }

D
dapan1121 已提交
80 81
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);

D
dapan1121 已提交
82 83
  SCH_TASK_DLOG("task initialized, max times %d:%d", pTask->maxRetryTimes, pTask->maxExecTimes);

D
dapan1121 已提交
84
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
85 86 87

_return:

D
dapan1121 已提交
88
  taosArrayDestroy(pTask->profile.execTime);
D
dapan1121 已提交
89 90 91
  taosHashCleanup(pTask->execNodes);

  SCH_RET(code);
D
dapan1121 已提交
92 93 94
}

int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
95 96 97
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
98

D
dapan1121 已提交
99 100 101 102 103 104 105 106 107 108 109 110 111
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
  if (NULL == addr) {
    SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
                  (int32_t)taosArrayGetSize(pTask->candidateAddrs));
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

  pTask->succeedAddr = *addr;

  return TSDB_CODE_SUCCESS;
}

int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) {
D
dapan1121 已提交
112
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = SCH_GET_TASK_HANDLE(pTask)};
D
dapan1121 已提交
113 114 115

  if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) {
    SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno);
S
Shengliang Guan 已提交
116
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
117 118
  }

D
dapan1121 已提交
119
  SCH_TASK_DLOG("task execNode added, execId:%d, handle:%p", execId, nodeInfo.handle);
D
dapan1121 已提交
120 121 122 123 124 125 126 127 128 129

  return TSDB_CODE_SUCCESS;
}

int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
  if (NULL == pTask->execNodes) {
    return TSDB_CODE_SUCCESS;
  }

  if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) {
D
dapan1121 已提交
130
    SCH_TASK_DLOG("execId %d already not in execNodeList", execId);
D
dapan1121 已提交
131
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
132 133 134
  } else {
    SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
  }
135

D
dapan1121 已提交
136 137
  if ((execId != pTask->execId) || pTask->waitRetry) {  // ignore it
    SCH_TASK_DLOG("execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
D
dapan1121 已提交
138
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
139 140 141 142 143 144 145 146 147 148 149
  }

  return TSDB_CODE_SUCCESS;
}

int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
  if (taosHashGetSize(pTask->execNodes) <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
D
dapan1121 已提交
150
  if (NULL == nodeInfo) {  // ignore it
dengyihao's avatar
dengyihao 已提交
151 152
    SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId,
                  pTask->execId, pTask->waitRetry);
D
dapan1121 已提交
153 154 155
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
156 157 158 159 160 161 162 163 164 165 166
  nodeInfo->handle = handle;

  SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId);

  return TSDB_CODE_SUCCESS;
}

int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId) {
  if (dropExecNode) {
    SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
  }
H
Hongze Cheng 已提交
167

H
Haojun Liao 已提交
168
  schUpdateTaskExecNode(pJob, pTask, handle, execId);
D
dapan1121 已提交
169

170
  if ((execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) {  // ignore it
H
Hongze Cheng 已提交
171 172
    SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId,
                  pTask->execId, pTask->waitRetry);
H
Haojun Liao 已提交
173 174
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
  }
H
Hongze Cheng 已提交
175

D
dapan1121 已提交
176 177 178 179 180 181
  SCH_SET_TASK_HANDLE(pTask, handle);

  return TSDB_CODE_SUCCESS;
}

int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
D
dapan1121 已提交
182 183 184
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    return TSDB_CODE_SCH_IGNORE_ERROR;
  }
185

186 187
  pTask->failedExecId = pTask->execId;

D
dapan1121 已提交
188 189 190
  int8_t jobStatus = 0;
  if (schJobNeedToStop(pJob, &jobStatus)) {
    SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(jobStatus));
D
dapan1121 已提交
191 192
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
  }
193

D
dapan1121 已提交
194 195 196
  int8_t taskStatus = SCH_GET_TASK_STATUS(pTask);
  if (taskStatus == JOB_TASK_STATUS_FAIL || taskStatus == JOB_TASK_STATUS_SUCC) {
    SCH_TASK_ELOG("task already done, status:%s", jobTaskStatusStr(taskStatus));
D
dapan1121 已提交
197 198
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
D
dapan1121 已提交
199 200 201 202 203 204 205 206 207 208 209 210 211

  if (errCode == TSDB_CODE_SCH_TIMEOUT_ERROR) {
    SCH_LOG_TASK_WAIT_TS(pTask);
  } else {
    SCH_LOG_TASK_END_TS(pTask);
  }

  bool    needRetry = false;
  bool    moved = false;
  int32_t taskDone = 0;

  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));

D
dapan1121 已提交
212
  SCH_ERR_RET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
D
dapan1121 已提交
213 214 215 216 217 218

  if (!needRetry) {
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));

    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAIL);

D
dapan1121 已提交
219
    if (SCH_JOB_NEED_WAIT(pJob)) {
D
dapan1121 已提交
220 221 222 223 224 225 226 227 228
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskFailed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);

      schUpdateJobErrCode(pJob, errCode);

      if (taskDone < pTask->level->taskNum) {
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
229
        SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
230
      }
D
dapan1121 已提交
231 232

      SCH_RET(atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
233 234
    }
  } else {
D
dapan1121 已提交
235
    SCH_ERR_RET(schHandleTaskRetry(pJob, pTask));
D
dapan1121 已提交
236 237 238 239

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
240
  SCH_RET(errCode);
D
dapan1121 已提交
241 242 243 244 245 246 247 248 249 250 251 252
}

int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
  bool    moved = false;
  int32_t code = 0;

  SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));

  SCH_LOG_TASK_END_TS(pTask);

  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC);

D
dapan1121 已提交
253
  SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask));
D
dapan1121 已提交
254

D
dapan1121 已提交
255
  SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
D
dapan1121 已提交
256 257 258 259

  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
  if (parentNum == 0) {
    int32_t taskDone = 0;
D
dapan1121 已提交
260
    if (SCH_JOB_NEED_WAIT(pJob)) {
D
dapan1121 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskSucceed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);

      if (taskDone < pTask->level->taskNum) {
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
        return TSDB_CODE_SUCCESS;
      } else if (taskDone > pTask->level->taskNum) {
        SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
      }

      if (pTask->level->taskFailed > 0) {
D
dapan1121 已提交
274
        SCH_RET(schHandleJobFailure(pJob, pJob->errCode));
D
dapan1121 已提交
275
      } else {
D
dapan1121 已提交
276
        SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
D
dapan1121 已提交
277 278 279 280 281 282 283
      }
    } else {
      pJob->resNode = pTask->succeedAddr;
    }

    pJob->fetchTask = pTask;

D
dapan1121 已提交
284
    SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
D
dapan1121 已提交
285 286 287 288 289 290 291 292 293 294 295 296 297 298
  }

  /*
    if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
      strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
      job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;

      ++job->dataSrcEps.numOfEps;
    }
  */

  for (int32_t i = 0; i < parentNum; ++i) {
    SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);

299
    SCH_LOCK(SCH_WRITE, &parent->planLock);
300 301 302 303 304 305 306
    SDownstreamSourceNode source = {
        .type = QUERY_NODE_DOWNSTREAM_SOURCE,
        .taskId = pTask->taskId,
        .schedId = schMgmt.sId,
        .execId = pTask->execId,
        .addr = pTask->succeedAddr,
        .fetchMsgType = SCH_FETCH_TYPE(pTask),
D
dapan1121 已提交
307
        .localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask),
308
    };
D
dapan1121 已提交
309
    qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
310
    SCH_UNLOCK(SCH_WRITE, &parent->planLock);
D
dapan1121 已提交
311

X
Xiaoyu Wang 已提交
312 313
    int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);

D
dapan1121 已提交
314 315 316 317 318 319 320 321 322 323 324 325
    if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
      SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId);
      SCH_ERR_RET(schLaunchTask(pJob, parent));
    }
  }

  SCH_ERR_RET(schLaunchJobLowerLevel(pJob, pTask));

  return TSDB_CODE_SUCCESS;
}

int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
326 327 328
  if (!schMgmt.cfg.enableReSchedule) {
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
329

D
dapan1121 已提交
330
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
D
dapan1121 已提交
331 332 333
    return TSDB_CODE_SUCCESS;
  }

334 335
  if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && pJob->fetchTask != pTask &&
      taosArrayGetSize(pTask->candidateAddrs) > 1) {
D
dapan1121 已提交
336 337 338
    SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId);
    schDropTaskOnExecNode(pJob, pTask);
    taosHashClear(pTask->execNodes);
339

D
dapan1121 已提交
340
    SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR));
D
dapan1121 已提交
341 342 343 344 345
  }

  return TSDB_CODE_SUCCESS;
}

346
int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode) {
347 348 349 350 351
  SSchRedirectCtx *pCtx = &pTask->redirectCtx;
  if (!pCtx->inRedirect) {
    pCtx->inRedirect = true;
    pCtx->periodMs = tsRedirectPeriod;
    pCtx->startTs = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
352

353 354 355 356 357 358 359 360 361 362 363 364 365
    if (SCH_IS_DATA_BIND_TASK(pTask)) {
      if (pEpSet) {
        pCtx->roundTotal = pEpSet->numOfEps;
      } else {
        SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
        pCtx->roundTotal = pAddr->epSet.numOfEps;
      }
    } else {
      pCtx->roundTotal = 1;
    }

    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
366

367
  pCtx->totalTimes++;
D
dapan1121 已提交
368
  pCtx->roundTimes++;
369 370 371 372 373 374 375

  if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) {
    pCtx->roundTotal = pEpSet->numOfEps;
  }


  if (pCtx->roundTimes >= pCtx->roundTotal) {
376 377 378
    int64_t nowTs = taosGetTimestampMs();
    int64_t lastTime = nowTs - pCtx->startTs;
    if (lastTime > tsMaxRetryWaitTime) {
dengyihao's avatar
dengyihao 已提交
379 380
      SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d",
                    nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes);
381
      SCH_ERR_RET(SCH_GET_REDICT_CODE(pJob, rspCode));
382 383
    }

384 385 386 387 388
    pCtx->periodMs *= tsRedirectFactor;
    if (pCtx->periodMs > tsRedirectMaxPeriod) {
      pCtx->periodMs = tsRedirectMaxPeriod;
    }

389 390
    int64_t leftTime = tsMaxRetryWaitTime - lastTime;
    pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs;
391

D
dapan1121 已提交
392 393
    pCtx->roundTimes = 0;

394 395 396 397 398 399 400
    goto _return;
  }

  pTask->delayExecMs = 0;

_return:

dengyihao's avatar
dengyihao 已提交
401 402 403
  SCH_TASK_DLOG("task start %d/%d/%d redirect retry, delayExec:%d", pCtx->roundTimes, pCtx->roundTotal,
                pCtx->totalTimes, pTask->delayExecMs);

404 405 406
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
407
void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
408
  pTask->waitRetry = true;
dengyihao's avatar
dengyihao 已提交
409

D
dapan1121 已提交
410 411
  schDropTaskOnExecNode(pJob, pTask);
  taosHashClear(pTask->execNodes);
H
Haojun Liao 已提交
412
  schRemoveTaskFromExecList(pJob, pTask);
D
dapan1121 已提交
413 414
  schDeregisterTaskHb(pJob, pTask);
  atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
D
dapan1121 已提交
415 416 417
  if (SCH_TASK_EXEC_DONE(pTask)) {
    atomic_sub_fetch_32(&pTask->level->taskExecDoneNum, 1);
  }
D
dapan1121 已提交
418 419 420 421
  taosMemoryFreeClear(pTask->msg);
  pTask->msgLen = 0;
  pTask->lastMsgType = 0;
  memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
D
dapan1121 已提交
422 423 424 425 426 427 428 429 430 431 432 433 434 435
}

int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
  int32_t code = 0;

  SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode));

  if (!NO_RET_REDIRECT_ERROR(rspCode)) {
    SCH_UPDATE_REDICT_CODE(pJob, rspCode);
  }

  SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode));

  schResetTaskForRetry(pJob, pTask);
D
dapan1121 已提交
436

D
dapan1121 已提交
437
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
438
    if (pData && pData->pEpSet) {
D
dapan1121 已提交
439
      SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
D
dapan1121 已提交
440
    } else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) {
441
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
dengyihao's avatar
dengyihao 已提交
442
      SEp            *pEp = &addr->epSet.eps[addr->epSet.inUse];
D
dapan1121 已提交
443 444
      SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse,
                    addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode));
D
dapan1121 已提交
445 446 447 448
    } else {
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
      SCH_SWITCH_EPSET(addr);
      SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
D
dapan1121 已提交
449
    }
450

D
dapan1121 已提交
451
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
452

453
    SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask));
D
dapan1121 已提交
454 455 456 457 458

    return TSDB_CODE_SUCCESS;
  }

  // merge plan
459

D
dapan1121 已提交
460
  pTask->childReady = 0;
461

D
dapan1121 已提交
462 463
  qClearSubplanExecutionNode(pTask->plan);

D
dapan1121 已提交
464 465 466 467 468
  // Note: current error task and upper level merge task
  if ((pData && 0 == pData->len) || NULL == pData) {
    SCH_ERR_JRET(schSwitchTaskCandidateAddr(pJob, pTask));
  }

D
dapan1121 已提交
469
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
470

D
dapan1121 已提交
471 472
  int32_t childrenNum = taosArrayGetSize(pTask->children);
  for (int32_t i = 0; i < childrenNum; ++i) {
473
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
D
dapan1121 已提交
474 475 476 477 478 479 480 481 482
    SCH_LOCK_TASK(pChild);
    schDoTaskRedirect(pJob, pChild, NULL, rspCode);
    SCH_UNLOCK_TASK(pChild);
  }

  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
483
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
484 485
}

D
dapan1121 已提交
486
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
D
dapan1121 已提交
487 488
  int32_t code = 0;

D
dapan1121 已提交
489
  SCH_ERR_JRET(schChkResetJobRetry(pJob, rspCode));
D
dapan1121 已提交
490

491
  if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) {
D
dapan1121 已提交
492
    if (NULL == pData->pEpSet) {
493 494
      SCH_TASK_ELOG("epset updating excepted, error:%s", tstrerror(rspCode));
      code = TSDB_CODE_INVALID_MSG;
D
dapan1121 已提交
495
      goto _return;
D
dapan1121 已提交
496 497 498
    }
  }

D
dapan1121 已提交
499
  code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
D
dapan1121 已提交
500
  
501 502
  taosMemoryFreeClear(pData->pData);
  taosMemoryFreeClear(pData->pEpSet);
D
dapan1121 已提交
503 504

  SCH_RET(code);
D
dapan1121 已提交
505 506 507

_return:

508 509
  taosMemoryFreeClear(pData->pData);
  taosMemoryFreeClear(pData->pEpSet);
D
dapan1121 已提交
510

D
dapan1121 已提交
511
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
512 513 514 515 516 517 518 519 520 521 522
}

int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      SCH_TASK_ELOG("task already in execTask list, code:%x", code);
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
S
Shengliang Guan 已提交
523
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
  }

  SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));

  return TSDB_CODE_SUCCESS;
}

/*
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
  } else {
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
  }

  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno);
S
Shengliang Guan 已提交
548
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));

  return TSDB_CODE_SUCCESS;
}

int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  *moved = false;

  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
  }

  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;

      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno);
S
Shengliang Guan 已提交
575
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));

  return TSDB_CODE_SUCCESS;
}

int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
  }

  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;

      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
S
Shengliang Guan 已提交
600
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
601 602 603 604 605 606 607 608 609 610 611 612 613
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));

  return TSDB_CODE_SUCCESS;
}
*/

int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
  if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
    pTask->maxExecTimes++;
D
dapan1121 已提交
614
    pTask->maxRetryTimes++;
D
dapan1121 已提交
615 616 617 618 619 620 621 622
    if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) {
      pTask->timeoutUsec *= 2;
      if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) {
        pTask->timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC;
      }
    }
  }

D
dapan1121 已提交
623 624
  if ((pTask->retryTimes + 1) > pTask->maxRetryTimes) {
    *needRetry = false;
dengyihao's avatar
dengyihao 已提交
625 626
    SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d", pTask->retryTimes,
                  pTask->maxRetryTimes);
D
dapan1121 已提交
627 628 629
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
630 631
  if ((pTask->execId + 1) >= pTask->maxExecTimes) {
    *needRetry = false;
D
dapan1121 已提交
632
    SCH_TASK_DLOG("task no more retry since reach max exec times, execId:%d/%d", pTask->execId, pTask->maxExecTimes);
D
dapan1121 已提交
633 634 635
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
636
  if (!SCH_TASK_NEED_RETRY(pTask->lastMsgType, errCode)) {
D
dapan1121 已提交
637 638 639 640 641
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
642 643 644 645 646 647 648 649 650 651
  /*
    if (SCH_IS_DATA_BIND_TASK(pTask)) {
      if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
        *needRetry = false;
        SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId,
                      SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
        return TSDB_CODE_SUCCESS;
      }
    } else {
      int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
D
dapan1121 已提交
652

dengyihao's avatar
dengyihao 已提交
653 654 655 656 657 658
      if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) {
        *needRetry = false;
        SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
                      pTask->candidateIdx, candidateNum);
        return TSDB_CODE_SUCCESS;
      }
D
dapan1121 已提交
659
    }
dengyihao's avatar
dengyihao 已提交
660
  */
D
dapan1121 已提交
661 662 663 664 665 666 667 668 669 670

  *needRetry = true;
  SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execId + 1, errCode, tstrerror(errCode));

  return TSDB_CODE_SUCCESS;
}

int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
  atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);

H
Haojun Liao 已提交
671
  schRemoveTaskFromExecList(pJob, pTask);
D
dapan1121 已提交
672
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
673

D
dapan1121 已提交
674 675 676 677 678 679
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
  }

  schDeregisterTaskHb(pJob, pTask);

D
dapan1121 已提交
680
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
681 682
    SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
    SCH_SWITCH_EPSET(addr);
D
dapan1121 已提交
683
  } else {
D
dapan1121 已提交
684
    SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
D
dapan1121 已提交
685 686 687 688 689 690 691 692 693 694
  }

  SCH_ERR_RET(schLaunchTask(pJob, pTask));

  return TSDB_CODE_SUCCESS;
}

int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
  int32_t addNum = 0;
  int32_t nodeNum = 0;
695

D
dapan1121 已提交
696 697 698
  if (pJob->nodeList) {
    nodeNum = taosArrayGetSize(pJob->nodeList);

D
dapan1121 已提交
699
    for (int32_t i = 0; i < nodeNum; ++i) {
D
dapan1121 已提交
700 701
      SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
      SQueryNodeAddr *naddr = &nload->addr;
702

D
dapan1121 已提交
703 704
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
S
Shengliang Guan 已提交
705
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
706 707
      }

dengyihao's avatar
dengyihao 已提交
708 709 710
      SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId,
                    naddr->epSet.inUse, naddr->epSet.numOfEps, SCH_GET_CUR_EP(naddr)->fqdn,
                    SCH_GET_CUR_EP(naddr)->port);
D
dapan1121 已提交
711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729

      ++addNum;
    }
  }

  if (addNum <= 0) {
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
    SCH_ERR_RET(TSDB_CODE_TSC_NO_EXEC_NODE);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
  if (NULL != pTask->candidateAddrs) {
    return TSDB_CODE_SUCCESS;
  }

  pTask->candidateIdx = 0;
D
dapan1121 已提交
730
  pTask->candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr));
D
dapan1121 已提交
731
  if (NULL == pTask->candidateAddrs) {
D
dapan1121 已提交
732
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCHEDULE_DEFAULT_MAX_NODE_NUM);
S
Shengliang Guan 已提交
733
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
734 735 736 737 738
  }

  if (pTask->plan->execNode.epSet.numOfEps > 0) {
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
S
Shengliang Guan 已提交
739
      SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
740 741 742 743 744 745 746
    }

    SCH_TASK_DLOG("use execNode in plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
747
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
D
dapan1121 已提交
748
    SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
S
Shengliang Guan 已提交
749
    SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
750 751
  }

D
dapan1121 已提交
752 753 754 755 756 757 758 759 760 761 762 763 764 765
  SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));

  /*
    for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
      strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
      epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];

      ++epSet->numOfEps;
    }
  */

  return TSDB_CODE_SUCCESS;
}

766
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
D
dapan1121 已提交
767
  if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
768 769
    SCH_TASK_ELOG("not able to update cndidate addr, addr num %d",
                  (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0));
D
dapan1121 已提交
770 771 772
    SCH_ERR_RET(TSDB_CODE_APP_ERROR);
  }

773
  SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
D
dapan1121 已提交
774

775 776
  char *origEpset = schDumpEpSet(&pAddr->epSet);
  char *newEpset = schDumpEpSet(pEpSet);
D
dapan1121 已提交
777

778
  SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset);
D
dapan1121 已提交
779

dengyihao's avatar
dengyihao 已提交
780 781 782
  taosMemoryFree(origEpset);
  taosMemoryFree(newEpset);

D
dapan1121 已提交
783 784 785 786 787
  memcpy(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
788
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
789
  int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
D
dapan1121 已提交
790 791 792
  if (candidateNum <= 1) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
793

D
dapan1121 已提交
794 795
  switch (schMgmt.cfg.schPolicy) {
    case SCH_LOAD_SEQ:
dengyihao's avatar
dengyihao 已提交
796
    case SCH_ALL:
D
dapan1121 已提交
797 798 799 800 801 802 803 804 805 806 807 808
    default:
      if (++pTask->candidateIdx >= candidateNum) {
        pTask->candidateIdx = 0;
      }
      break;
    case SCH_RANDOM: {
      int32_t lastIdx = pTask->candidateIdx;
      while (lastIdx == pTask->candidateIdx) {
        pTask->candidateIdx = taosRand() % candidateNum;
      }
      break;
    }
D
dapan1121 已提交
809
  }
D
dapan1121 已提交
810 811 812 813

_return:

  SCH_TASK_DLOG("switch task candiateIdx to %d/%d", pTask->candidateIdx, candidateNum);
dengyihao's avatar
dengyihao 已提交
814

D
dapan1121 已提交
815 816 817
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
818 819 820
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
  int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
  if (code) {
H
Haojun Liao 已提交
821
    SCH_TASK_WLOG("task already not in execTask list, code:%x", code);
D
dapan1121 已提交
822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839
  }

  return TSDB_CODE_SUCCESS;
}

void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
  if (NULL == pTask->execNodes) {
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
    return;
  }

  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);

  if (size <= 0) {
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
    return;
  }

dengyihao's avatar
dengyihao 已提交
840
  int32_t       i = 0;
D
dapan1121 已提交
841 842
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
  while (nodeInfo) {
D
dapan1121 已提交
843 844 845 846 847 848 849
    if (nodeInfo->handle) {
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
      schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK);
      SCH_TASK_DLOG("start to drop task's %dth execNode", i);
    } else {
      SCH_TASK_DLOG("no need to drop task %dth execNode", i);
    }
D
dapan1121 已提交
850

D
dapan1121 已提交
851
    ++i;
D
dapan1121 已提交
852 853 854 855 856 857
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
  }

  SCH_TASK_DLOG("task has been dropped on %d exec nodes", size);
}

858 859
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
  int32_t   taskNum = (int32_t)taosArrayGetSize(pStatusList);
D
dapan1121 已提交
860
  SSchTask *pTask = NULL;
861
  SSchJob  *pJob = NULL;
D
dapan1121 已提交
862

863 864
  qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn,
         pEpId->ep.port);
D
dapan1121 已提交
865 866

  for (int32_t i = 0; i < taskNum; ++i) {
D
dapan1121 已提交
867
    STaskStatus *pStatus = taosArrayGet(pStatusList, i);
868 869 870 871
    int32_t      code = 0;

    qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId, pStatus->taskId,
           pStatus->execId, jobTaskStatusStr(pStatus->status));
D
dapan1121 已提交
872

D
dapan1121 已提交
873
    if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) {
D
dapan1121 已提交
874 875 876
      continue;
    }

D
dapan1121 已提交
877
    if (pStatus->execId != pTask->execId) {
878
      // TODO
D
dapan1121 已提交
879 880
      SCH_TASK_DLOG("execId %d mis-match current execId %d", pStatus->execId, pTask->execId);
      schProcessOnCbEnd(pJob, pTask, 0);
D
dapan1121 已提交
881 882
      continue;
    }
883

D
dapan1121 已提交
884
    if (pStatus->status == JOB_TASK_STATUS_FAIL) {
D
dapan1121 已提交
885
      // RECORD AND HANDLE ERROR!!!!
D
dapan1121 已提交
886
      schProcessOnCbEnd(pJob, pTask, 0);
D
dapan1121 已提交
887 888 889
      continue;
    }

D
dapan1121 已提交
890 891
    if (pStatus->status == JOB_TASK_STATUS_INIT) {
      code = schRescheduleTask(pJob, pTask);
D
dapan1121 已提交
892 893
    }

D
dapan1121 已提交
894
    schProcessOnCbEnd(pJob, pTask, code);
D
dapan1121 已提交
895 896 897 898 899
  }

  return TSDB_CODE_SUCCESS;
}

900 901 902 903 904 905 906
int32_t schHandleExplainRes(SArray *pExplainRes) {
  int32_t code = 0;
  int32_t resNum = taosArrayGetSize(pExplainRes);
  if (resNum <= 0) {
    goto _return;
  }

H
Hongze Cheng 已提交
907 908
  SSchTask *pTask = NULL;
  SSchJob  *pJob = NULL;
909 910

  for (int32_t i = 0; i < resNum; ++i) {
H
Hongze Cheng 已提交
911
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
912

D
dapan1121 已提交
913
    qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId);
914

D
dapan1121 已提交
915 916
    pJob = schAcquireJob(localRsp->rId);
    if (NULL == pJob) {
H
Hongze Cheng 已提交
917 918
      qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId,
            localRsp->tId, localRsp->rId);
D
dapan1121 已提交
919 920
      SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
    }
921

D
dapan1121 已提交
922 923 924 925 926
    int8_t status = 0;
    if (schJobNeedToStop(pJob, &status)) {
      SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
      schReleaseJob(pJob->refId);
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
927
    }
H
Haojun Liao 已提交
928

D
dapan1121 已提交
929 930 931 932
    code = schGetTaskInJob(pJob, localRsp->tId, &pTask);

    if (TSDB_CODE_SUCCESS == code) {
      code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp);
933
    }
H
Haojun Liao 已提交
934

D
dapan1121 已提交
935
    schReleaseJob(pJob->refId);
936

H
Hongze Cheng 已提交
937 938
    qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId,
           localRsp->tId, code);
939 940 941 942 943

    SCH_ERR_JRET(code);

    localRsp->rsp.numOfPlans = 0;
    localRsp->rsp.subplanInfo = NULL;
D
dapan1121 已提交
944 945
    pTask = NULL;
    pJob = NULL;
946 947 948 949 950
  }

_return:

  for (int32_t i = 0; i < resNum; ++i) {
H
Hongze Cheng 已提交
951
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
952 953 954 955 956 957 958 959
    tFreeSExplainRsp(&localRsp->rsp);
  }

  taosArrayDestroy(pExplainRes);

  SCH_RET(code);
}

D
dapan1121 已提交
960 961
int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
  SSubplan *plan = pTask->plan;
H
Hongze Cheng 已提交
962
  int32_t   code = 0;
D
dapan1121 已提交
963 964

  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
965
    code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
D
dapan1121 已提交
966 967 968 969
    if (TSDB_CODE_SUCCESS != code) {
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
                    pTask->msgLen);
      SCH_ERR_RET(code);
970
    } else if (tsQueryPlannerTrace) {
H
Hongze Cheng 已提交
971
      char   *msg = NULL;
972
      int32_t msgLen = 0;
D
dapan1121 已提交
973
      SCH_ERR_RET(qSubPlanToString(plan, &msg, &msgLen));
974 975
      SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
      taosMemoryFree(msg);
D
dapan1121 已提交
976 977 978 979 980 981
    }
  }

  SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));

  if (SCH_IS_QUERY_JOB(pJob)) {
982
    SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
D
dapan1121 已提交
983
  }
H
Haojun Liao 已提交
984

D
dapan1121 已提交
985 986 987 988
  SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
}

int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
H
Hongze Cheng 已提交
989
  // SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
D
dapan1121 已提交
990 991 992
  if (NULL == schMgmt.queryMgmt) {
    SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, (void **)&schMgmt.queryMgmt, NULL));
  }
D
dapan1121 已提交
993 994

  SArray *explainRes = NULL;
D
dapan1121 已提交
995
  int32_t code = 0;
H
Hongze Cheng 已提交
996
  SQWMsg  qwMsg = {0};
D
dapan1121 已提交
997 998 999 1000 1001
  qwMsg.msgInfo.taskType = TASK_TYPE_TEMP;
  qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
  qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask);
  qwMsg.msg = pTask->plan;
  qwMsg.msgType = pTask->plan->msgType;
D
dapan1121 已提交
1002 1003 1004
  qwMsg.connInfo.handle = pJob->conn.pTrans;

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
1005
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
D
dapan1121 已提交
1006
  }
H
Haojun Liao 已提交
1007

D
dapan1121 已提交
1008
  SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
dengyihao's avatar
dengyihao 已提交
1009
                                        pTask->execId, &qwMsg, explainRes));
D
dapan1121 已提交
1010

1011 1012
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_RET(schHandleExplainRes(explainRes));
D
dapan1121 已提交
1013
    explainRes = NULL;
1014 1015
  }

D
dapan1121 已提交
1016 1017 1018 1019 1020 1021 1022
  SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));

_return:

  taosArrayDestroy(explainRes);

  SCH_RET(code);
D
dapan1121 已提交
1023 1024
}

D
dapan1121 已提交
1025 1026
int32_t schLaunchTaskImpl(void *param) {
  SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
dengyihao's avatar
dengyihao 已提交
1027
  SSchJob     *pJob = schAcquireJob(pCtx->jobRid);
1028 1029
  if (NULL == pJob) {
    qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
dengyihao's avatar
dengyihao 已提交
1030
    taosMemoryFree(param);
1031 1032
    SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING);
  }
dengyihao's avatar
dengyihao 已提交
1033

D
dapan1121 已提交
1034
  SSchTask *pTask = pCtx->pTask;
H
Haojun Liao 已提交
1035 1036 1037 1038

  if (pCtx->asyncLaunch) {
    SCH_LOCK_TASK(pTask);
  }
H
Hongze Cheng 已提交
1039 1040 1041

  int8_t  status = 0;
  int32_t code = 0;
D
dapan1121 已提交
1042 1043 1044

  atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
  pTask->execId++;
D
dapan1121 已提交
1045
  pTask->retryTimes++;
D
dapan1121 已提交
1046
  pTask->waitRetry = false;
D
dapan1121 已提交
1047

H
Hongze Cheng 已提交
1048 1049
  SCH_TASK_DLOG("start to launch %s task, execId %d, retry %d",
                SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", pTask->execId, pTask->retryTimes);
D
dapan1121 已提交
1050 1051 1052 1053

  SCH_LOG_TASK_START_TS(pTask);

  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
1054
    SCH_TASK_DLOG("no need to launch task cause of job status %s", jobTaskStatusStr(status));
D
dapan1121 已提交
1055
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
1056 1057 1058
  }

  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
D
dapan1121 已提交
1059
    SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
D
dapan1121 已提交
1060 1061 1062
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
  }

D
dapan1121 已提交
1063
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
D
dapan1121 已提交
1064 1065 1066
    SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask));
  } else {
    SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask));
D
dapan1121 已提交
1067 1068
  }

1069
#if 0
D
dapan1121 已提交
1070
  if (SCH_IS_QUERY_JOB(pJob)) {
D
dapan1121 已提交
1071
    SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
D
dapan1121 已提交
1072
  }
1073
#endif
D
dapan1121 已提交
1074

D
dapan1121 已提交
1075 1076
_return:

D
dapan1121 已提交
1077 1078 1079 1080 1081 1082 1083
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
    if (code) {
      code = schProcessOnTaskFailure(pJob, pTask, code);
    }
    if (code) {
      code = schHandleJobFailure(pJob, code);
    }
D
dapan1121 已提交
1084 1085
  }

H
Haojun Liao 已提交
1086 1087 1088 1089
  if (pCtx->asyncLaunch) {
    SCH_UNLOCK_TASK(pTask);
  }

1090 1091
  schReleaseJob(pJob->refId);

H
Haojun Liao 已提交
1092 1093
  taosMemoryFree(param);

D
dapan1121 已提交
1094 1095 1096
  SCH_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1097
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
1098 1099 1100 1101
  SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx));
  if (NULL == param) {
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
dengyihao's avatar
dengyihao 已提交
1102

1103
  param->jobRid = pJob->refId;
D
dapan1121 已提交
1104 1105
  param->pTask = pTask;

D
dapan1121 已提交
1106
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
H
Haojun Liao 已提交
1107
    param->asyncLaunch = true;
D
dapan1121 已提交
1108 1109 1110 1111
    taosAsyncExec(schLaunchTaskImpl, param, NULL);
  } else {
    SCH_ERR_RET(schLaunchTaskImpl(param));
  }
dengyihao's avatar
dengyihao 已提交
1112

D
dapan1121 已提交
1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126
  return TSDB_CODE_SUCCESS;
}

// Note: no more error processing, handled in function internal
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
  bool    enough = false;
  int32_t code = 0;

  SCH_SET_TASK_HANDLE(pTask, NULL);

  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));

    if (enough) {
D
dapan1121 已提交
1127
      SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
D
dapan1121 已提交
1128 1129
    }
  } else {
D
dapan1121 已提交
1130
    SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
D
dapan1121 已提交
1131 1132 1133 1134 1135 1136 1137 1138 1139
  }

  return TSDB_CODE_SUCCESS;

_return:

  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}

1140 1141 1142 1143 1144
void schHandleTimerEvent(void *param, void *tmrId) {
  SSchTimerParam *pTimerParam = (SSchTimerParam *)param;
  SSchTask       *pTask = NULL;
  SSchJob        *pJob = NULL;
  int32_t         code = 0;
dengyihao's avatar
dengyihao 已提交
1145

dengyihao's avatar
dengyihao 已提交
1146 1147 1148 1149 1150 1151
  int64_t  rId = pTimerParam->rId;
  uint64_t queryId = pTimerParam->queryId;
  uint64_t taskId = pTimerParam->taskId;
  taosMemoryFree(pTimerParam);

  if (schProcessOnCbBegin(&pJob, &pTask, queryId, rId, taskId)) {
1152 1153
    return;
  }
1154

1155
  code = schLaunchTask(pJob, pTask);
1156

1157
  schProcessOnCbEnd(pJob, pTask, code);
1158 1159 1160 1161 1162 1163
}

int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
  if (pTask->delayExecMs > 0) {
    SSchTimerParam *param = taosMemoryMalloc(sizeof(SSchTimerParam));
    if (NULL == param) {
dengyihao's avatar
dengyihao 已提交
1164
      SCH_TASK_ELOG("taosMemoryMalloc %d failed", (int)sizeof(SSchTimerParam));
S
Shengliang Guan 已提交
1165
      SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
1166
    }
dengyihao's avatar
dengyihao 已提交
1167

1168 1169 1170 1171
    param->rId = pJob->refId;
    param->queryId = pJob->queryId;
    param->taskId = pTask->taskId;

dengyihao's avatar
dengyihao 已提交
1172
    if (NULL == pTask->delayTimer) {
1173 1174
      pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer);
      if (NULL == pTask->delayTimer) {
1175
        SCH_TASK_ELOG("start delay timer failed, handle:%p", schMgmt.timer);
S
Shengliang Guan 已提交
1176
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
1177 1178 1179 1180 1181
      }

      return TSDB_CODE_SUCCESS;
    }

dengyihao's avatar
dengyihao 已提交
1182
    taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer, &pTask->delayTimer);
1183 1184 1185 1186 1187 1188 1189

    return TSDB_CODE_SUCCESS;
  }

  SCH_RET(schLaunchTask(pJob, pTask));
}

D
dapan1121 已提交
1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
  SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level));

  for (int32_t i = 0; i < level->taskNum; ++i) {
    SSchTask *pTask = taosArrayGet(level->subTasks, i);

    SCH_ERR_RET(schLaunchTask(pJob, pTask));
  }

  return TSDB_CODE_SUCCESS;
}

void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
D
dapan1121 已提交
1203
  if (!SCH_JOB_NEED_DROP(pJob)) {
D
dapan1121 已提交
1204 1205 1206 1207 1208 1209 1210
    return;
  }

  void *pIter = taosHashIterate(list, NULL);
  while (pIter) {
    SSchTask *pTask = *(SSchTask **)pIter;

1211 1212 1213 1214
    SCH_LOCK_TASK(pTask);
    if (pTask->delayTimer) {
      taosTmrStopA(&pTask->delayTimer);
    }
D
dapan1121 已提交
1215
    schDropTaskOnExecNode(pJob, pTask);
1216
    SCH_UNLOCK_TASK(pTask);
D
dapan1121 已提交
1217 1218 1219 1220 1221

    pIter = taosHashIterate(list, pIter);
  }
}

D
dapan1121 已提交
1222 1223 1224 1225 1226
int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
  SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask)));
}

int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
H
Hongze Cheng 已提交
1227
  void   *pRsp = NULL;
D
dapan1121 已提交
1228
  int32_t code = 0;
D
dapan1121 已提交
1229 1230 1231
  SArray *explainRes = NULL;

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
1232
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
D
dapan1121 已提交
1233 1234
  }

D
dapan1121 已提交
1235
  SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
dengyihao's avatar
dengyihao 已提交
1236
                                        pTask->execId, &pRsp, explainRes));
1237 1238 1239

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_RET(schHandleExplainRes(explainRes));
D
dapan1121 已提交
1240
    explainRes = NULL;
1241
  }
H
Haojun Liao 已提交
1242

D
dapan1121 已提交
1243 1244
  SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));

D
dapan1121 已提交
1245 1246 1247 1248 1249
_return:

  taosArrayDestroy(explainRes);

  SCH_RET(code);
D
dapan1121 已提交
1250 1251
}

D
dapan1121 已提交
1252 1253 1254 1255
// Note: no more error processing, handled in function internal
int32_t schLaunchFetchTask(SSchJob *pJob) {
  int32_t code = 0;

D
dapan1121 已提交
1256 1257 1258
  void *fetchRes = atomic_load_ptr(&pJob->fetchRes);
  if (fetchRes) {
    SCH_JOB_DLOG("res already fetched, res:%p", fetchRes);
D
dapan1121 已提交
1259 1260 1261
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1262 1263
  SCH_SET_TASK_STATUS(pJob->fetchTask, JOB_TASK_STATUS_FETCH);

D
dapan1121 已提交
1264 1265 1266 1267 1268
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) {
    SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask));
  } else {
    SCH_ERR_JRET(schExecRemoteFetch(pJob, pJob->fetchTask));
  }
D
dapan1121 已提交
1269 1270 1271 1272 1273 1274 1275

  return TSDB_CODE_SUCCESS;

_return:

  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
}