schTask.c 38.5 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalog.h"
#include "command.h"
#include "query.h"
H
Hongze Cheng 已提交
19
#include "qworker.h"
D
dapan1121 已提交
20
#include "schInt.h"
H
Hongze Cheng 已提交
21
#include "tglobal.h"
D
dapan1121 已提交
22 23 24 25 26 27
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"

void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
  schDeregisterTaskHb(pJob, pTask);
28

D
dapan1121 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
  if (pTask->candidateAddrs) {
    taosArrayDestroy(pTask->candidateAddrs);
  }

  taosMemoryFreeClear(pTask->msg);

  if (pTask->children) {
    taosArrayDestroy(pTask->children);
  }

  if (pTask->parents) {
    taosArrayDestroy(pTask->parents);
  }

  if (pTask->execNodes) {
    taosHashCleanup(pTask->execNodes);
  }
D
dapan1121 已提交
46

D
dapan1121 已提交
47 48 49
  taosArrayDestroy(pTask->profile.execTime);
}

D
dapan1121 已提交
50
void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) {
D
dapan1121 已提交
51
  if (SCH_IS_DATA_BIND_TASK(pTask) || (!SCH_IS_QUERY_JOB(pJob)) || (SCH_ALL != schMgmt.cfg.schPolicy)) {
D
dapan1121 已提交
52
    pTask->maxRetryTimes = SCH_DEFAULT_MAX_RETRY_NUM;
D
dapan1121 已提交
53 54
  } else {
    int32_t nodeNum = taosArrayGetSize(pJob->nodeList);
D
dapan1121 已提交
55
    pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM);
D
dapan1121 已提交
56
  }
dengyihao's avatar
dengyihao 已提交
57

D
dapan1121 已提交
58
  pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1);
D
dapan1121 已提交
59 60
}

D
dapan1121 已提交
61
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
D
dapan1121 已提交
62
  int32_t code = 0;
63

D
dapan1121 已提交
64 65 66
  pTask->plan = pPlan;
  pTask->level = pLevel;
  pTask->execId = -1;
67
  pTask->failedExecId = -2;
D
dapan1121 已提交
68 69
  pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
  pTask->taskId = schGenTaskId();
D
dapan1121 已提交
70

D
dapan1121 已提交
71
  schInitTaskRetryTimes(pJob, pTask, pLevel);
D
dapan1121 已提交
72

D
dapan1121 已提交
73 74
  pTask->execNodes =
      taosHashInit(pTask->maxExecTimes, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
D
dapan1121 已提交
75
  pTask->profile.execTime = taosArrayInit(pTask->maxExecTimes, sizeof(int64_t));
D
dapan1121 已提交
76
  if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
S
Shengliang Guan 已提交
77
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
78 79
  }

D
dapan1121 已提交
80 81
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);

D
dapan1121 已提交
82 83
  SCH_TASK_DLOG("task initialized, max times %d:%d", pTask->maxRetryTimes, pTask->maxExecTimes);

D
dapan1121 已提交
84
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
85 86 87

_return:

D
dapan1121 已提交
88
  taosArrayDestroy(pTask->profile.execTime);
D
dapan1121 已提交
89 90 91
  taosHashCleanup(pTask->execNodes);

  SCH_RET(code);
D
dapan1121 已提交
92 93 94
}

int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
95 96 97
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
98

D
dapan1121 已提交
99 100 101 102 103 104 105 106 107 108 109 110 111
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
  if (NULL == addr) {
    SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
                  (int32_t)taosArrayGetSize(pTask->candidateAddrs));
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

  pTask->succeedAddr = *addr;

  return TSDB_CODE_SUCCESS;
}

int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) {
D
dapan1121 已提交
112
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = SCH_GET_TASK_HANDLE(pTask)};
D
dapan1121 已提交
113 114 115

  if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) {
    SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno);
S
Shengliang Guan 已提交
116
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
117 118
  }

D
dapan1121 已提交
119
  SCH_TASK_DLOG("task execNode added, execId:%d, handle:%p", execId, nodeInfo.handle);
D
dapan1121 已提交
120 121 122 123 124 125 126 127 128 129

  return TSDB_CODE_SUCCESS;
}

int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
  if (NULL == pTask->execNodes) {
    return TSDB_CODE_SUCCESS;
  }

  if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) {
D
dapan1121 已提交
130
    SCH_TASK_DLOG("execId %d already not in execNodeList", execId);
D
dapan1121 已提交
131
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
132 133 134
  } else {
    SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
  }
135

D
dapan1121 已提交
136 137
  if ((execId != pTask->execId) || pTask->waitRetry) {  // ignore it
    SCH_TASK_DLOG("execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
D
dapan1121 已提交
138
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
139 140 141 142 143 144 145 146 147 148 149
  }

  return TSDB_CODE_SUCCESS;
}

int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
  if (taosHashGetSize(pTask->execNodes) <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
D
dapan1121 已提交
150
  if (NULL == nodeInfo) {  // ignore it
dengyihao's avatar
dengyihao 已提交
151 152
    SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId,
                  pTask->execId, pTask->waitRetry);
D
dapan1121 已提交
153 154 155
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
156 157 158 159 160 161 162 163 164 165 166
  nodeInfo->handle = handle;

  SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId);

  return TSDB_CODE_SUCCESS;
}

int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId) {
  if (dropExecNode) {
    SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
  }
H
Hongze Cheng 已提交
167

H
Haojun Liao 已提交
168
  schUpdateTaskExecNode(pJob, pTask, handle, execId);
D
dapan1121 已提交
169

170
  if ((execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) {  // ignore it
H
Hongze Cheng 已提交
171 172
    SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId,
                  pTask->execId, pTask->waitRetry);
H
Haojun Liao 已提交
173 174
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
  }
H
Hongze Cheng 已提交
175

D
dapan1121 已提交
176 177 178 179 180 181
  SCH_SET_TASK_HANDLE(pTask, handle);

  return TSDB_CODE_SUCCESS;
}

int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
D
dapan1121 已提交
182 183 184
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    return TSDB_CODE_SCH_IGNORE_ERROR;
  }
185

186 187
  pTask->failedExecId = pTask->execId;

D
dapan1121 已提交
188 189 190
  int8_t jobStatus = 0;
  if (schJobNeedToStop(pJob, &jobStatus)) {
    SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(jobStatus));
D
dapan1121 已提交
191 192
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
  }
193

D
dapan1121 已提交
194 195 196
  int8_t taskStatus = SCH_GET_TASK_STATUS(pTask);
  if (taskStatus == JOB_TASK_STATUS_FAIL || taskStatus == JOB_TASK_STATUS_SUCC) {
    SCH_TASK_ELOG("task already done, status:%s", jobTaskStatusStr(taskStatus));
D
dapan1121 已提交
197 198
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
D
dapan1121 已提交
199 200 201 202 203 204 205 206 207 208 209 210 211

  if (errCode == TSDB_CODE_SCH_TIMEOUT_ERROR) {
    SCH_LOG_TASK_WAIT_TS(pTask);
  } else {
    SCH_LOG_TASK_END_TS(pTask);
  }

  bool    needRetry = false;
  bool    moved = false;
  int32_t taskDone = 0;

  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));

D
dapan1121 已提交
212
  SCH_ERR_RET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
D
dapan1121 已提交
213 214 215 216 217 218

  if (!needRetry) {
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));

    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAIL);

D
dapan1121 已提交
219
    if (SCH_JOB_NEED_WAIT(pJob)) {
D
dapan1121 已提交
220 221 222 223 224 225 226 227 228
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskFailed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);

      schUpdateJobErrCode(pJob, errCode);

      if (taskDone < pTask->level->taskNum) {
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
229
        SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
230
      }
D
dapan1121 已提交
231 232

      SCH_RET(atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
233 234
    }
  } else {
D
dapan1121 已提交
235
    SCH_ERR_RET(schHandleTaskRetry(pJob, pTask));
D
dapan1121 已提交
236 237 238 239

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
240
  SCH_RET(errCode);
D
dapan1121 已提交
241 242 243 244 245 246 247 248 249 250 251 252
}

int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
  bool    moved = false;
  int32_t code = 0;

  SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));

  SCH_LOG_TASK_END_TS(pTask);

  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC);

D
dapan1121 已提交
253
  SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask));
D
dapan1121 已提交
254

D
dapan1121 已提交
255
  SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
D
dapan1121 已提交
256 257 258 259

  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
  if (parentNum == 0) {
    int32_t taskDone = 0;
D
dapan1121 已提交
260
    if (SCH_JOB_NEED_WAIT(pJob)) {
D
dapan1121 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskSucceed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);

      if (taskDone < pTask->level->taskNum) {
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
        return TSDB_CODE_SUCCESS;
      } else if (taskDone > pTask->level->taskNum) {
        SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
      }

      if (pTask->level->taskFailed > 0) {
D
dapan1121 已提交
274
        SCH_RET(schHandleJobFailure(pJob, pJob->errCode));
D
dapan1121 已提交
275
      } else {
D
dapan1121 已提交
276
        SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
D
dapan1121 已提交
277 278 279 280 281 282 283
      }
    } else {
      pJob->resNode = pTask->succeedAddr;
    }

    pJob->fetchTask = pTask;

D
dapan1121 已提交
284
    SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
D
dapan1121 已提交
285 286 287 288 289 290 291 292 293 294 295 296 297 298
  }

  /*
    if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
      strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
      job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;

      ++job->dataSrcEps.numOfEps;
    }
  */

  for (int32_t i = 0; i < parentNum; ++i) {
    SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);

299
    SCH_LOCK(SCH_WRITE, &parent->planLock);
300 301 302 303 304 305 306
    SDownstreamSourceNode source = {
        .type = QUERY_NODE_DOWNSTREAM_SOURCE,
        .taskId = pTask->taskId,
        .schedId = schMgmt.sId,
        .execId = pTask->execId,
        .addr = pTask->succeedAddr,
        .fetchMsgType = SCH_FETCH_TYPE(pTask),
D
dapan1121 已提交
307
        .localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask),
308
    };
D
dapan1121 已提交
309
    qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
310
    SCH_UNLOCK(SCH_WRITE, &parent->planLock);
D
dapan1121 已提交
311

X
Xiaoyu Wang 已提交
312 313
    int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);

D
dapan1121 已提交
314 315 316 317 318 319 320 321 322 323 324 325
    if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
      SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId);
      SCH_ERR_RET(schLaunchTask(pJob, parent));
    }
  }

  SCH_ERR_RET(schLaunchJobLowerLevel(pJob, pTask));

  return TSDB_CODE_SUCCESS;
}

int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
326 327 328
  if (!schMgmt.cfg.enableReSchedule) {
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
329

D
dapan1121 已提交
330
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
D
dapan1121 已提交
331 332 333
    return TSDB_CODE_SUCCESS;
  }

334 335
  if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && pJob->fetchTask != pTask &&
      taosArrayGetSize(pTask->candidateAddrs) > 1) {
D
dapan1121 已提交
336 337 338
    SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId);
    schDropTaskOnExecNode(pJob, pTask);
    taosHashClear(pTask->execNodes);
339

D
dapan1121 已提交
340
    SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR));
D
dapan1121 已提交
341 342 343 344 345
  }

  return TSDB_CODE_SUCCESS;
}

346
int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode) {
347 348 349 350 351
  SSchRedirectCtx *pCtx = &pTask->redirectCtx;
  if (!pCtx->inRedirect) {
    pCtx->inRedirect = true;
    pCtx->periodMs = tsRedirectPeriod;
    pCtx->startTs = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
352

353 354 355 356 357 358 359 360 361 362 363 364 365
    if (SCH_IS_DATA_BIND_TASK(pTask)) {
      if (pEpSet) {
        pCtx->roundTotal = pEpSet->numOfEps;
      } else {
        SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
        pCtx->roundTotal = pAddr->epSet.numOfEps;
      }
    } else {
      pCtx->roundTotal = 1;
    }

    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
366

367
  pCtx->totalTimes++;
D
dapan1121 已提交
368
  pCtx->roundTimes++;
369 370 371 372 373 374 375

  if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) {
    pCtx->roundTotal = pEpSet->numOfEps;
  }


  if (pCtx->roundTimes >= pCtx->roundTotal) {
376 377 378
    int64_t nowTs = taosGetTimestampMs();
    int64_t lastTime = nowTs - pCtx->startTs;
    if (lastTime > tsMaxRetryWaitTime) {
dengyihao's avatar
dengyihao 已提交
379 380
      SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d",
                    nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes);
381
      SCH_ERR_RET(SCH_GET_REDICT_CODE(pJob, rspCode));
382 383
    }

384 385 386 387 388
    pCtx->periodMs *= tsRedirectFactor;
    if (pCtx->periodMs > tsRedirectMaxPeriod) {
      pCtx->periodMs = tsRedirectMaxPeriod;
    }

389 390
    int64_t leftTime = tsMaxRetryWaitTime - lastTime;
    pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs;
391

D
dapan1121 已提交
392 393
    pCtx->roundTimes = 0;

394 395 396 397 398 399 400
    goto _return;
  }

  pTask->delayExecMs = 0;

_return:

dengyihao's avatar
dengyihao 已提交
401 402 403
  SCH_TASK_DLOG("task start %d/%d/%d redirect retry, delayExec:%d", pCtx->roundTimes, pCtx->roundTotal,
                pCtx->totalTimes, pTask->delayExecMs);

404 405 406
  return TSDB_CODE_SUCCESS;
}

407
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
D
dapan1121 已提交
408
  int32_t code = 0;
409

D
dapan1121 已提交
410
  SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode));
D
dapan1121 已提交
411 412 413

  if (NULL == pData) {
    pTask->retryTimes = 0;
D
dapan1121 已提交
414 415
  }

416 417 418 419 420
  if (!NO_RET_REDIRECT_ERROR(rspCode)) {
    SCH_UPDATE_REDICT_CODE(pJob, rspCode);
  }

  SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode));
421

D
dapan1121 已提交
422
  pTask->waitRetry = true;
dengyihao's avatar
dengyihao 已提交
423

D
dapan1121 已提交
424 425
  schDropTaskOnExecNode(pJob, pTask);
  taosHashClear(pTask->execNodes);
H
Haojun Liao 已提交
426
  schRemoveTaskFromExecList(pJob, pTask);
D
dapan1121 已提交
427 428 429 430 431 432 433
  schDeregisterTaskHb(pJob, pTask);
  atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
  taosMemoryFreeClear(pTask->msg);
  pTask->msgLen = 0;
  pTask->lastMsgType = 0;
  memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));

D
dapan1121 已提交
434
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
435
    if (pData && pData->pEpSet) {
D
dapan1121 已提交
436
      SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
D
dapan1121 已提交
437
    } else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) {
438
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
dengyihao's avatar
dengyihao 已提交
439
      SEp            *pEp = &addr->epSet.eps[addr->epSet.inUse];
D
dapan1121 已提交
440 441
      SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse,
                    addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode));
D
dapan1121 已提交
442 443 444 445
    } else {
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
      SCH_SWITCH_EPSET(addr);
      SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
D
dapan1121 已提交
446
    }
447

D
dapan1121 已提交
448 449 450 451
    if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
      if (JOB_TASK_STATUS_EXEC == SCH_GET_TASK_STATUS(pTask)) {
        SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
      }
452
    }
D
dapan1121 已提交
453 454

    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
455

456
    SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask));
D
dapan1121 已提交
457 458 459 460 461

    return TSDB_CODE_SUCCESS;
  }

  // merge plan
462

D
dapan1121 已提交
463
  pTask->childReady = 0;
464

D
dapan1121 已提交
465 466
  qClearSubplanExecutionNode(pTask->plan);

D
dapan1121 已提交
467 468 469 470 471
  // Note: current error task and upper level merge task
  if ((pData && 0 == pData->len) || NULL == pData) {
    SCH_ERR_JRET(schSwitchTaskCandidateAddr(pJob, pTask));
  }

D
dapan1121 已提交
472
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
473

D
dapan1121 已提交
474 475
  int32_t childrenNum = taosArrayGetSize(pTask->children);
  for (int32_t i = 0; i < childrenNum; ++i) {
476
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
D
dapan1121 已提交
477 478 479 480 481 482 483 484 485
    SCH_LOCK_TASK(pChild);
    schDoTaskRedirect(pJob, pChild, NULL, rspCode);
    SCH_UNLOCK_TASK(pChild);
  }

  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
486
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
487 488
}

489
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
D
dapan1121 已提交
490 491
  int32_t code = 0;

D
dapan1121 已提交
492 493 494 495 496
  if (JOB_TASK_STATUS_PART_SUCC == pJob->status) {
    SCH_LOCK(SCH_WRITE, &pJob->resLock);
    if (pJob->fetched) {
      SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
      SCH_TASK_ELOG("already fetched while got error %s", tstrerror(rspCode));
D
dapan1121 已提交
497
      SCH_ERR_JRET(rspCode);
D
dapan1121 已提交
498 499 500 501 502 503
    }
    SCH_UNLOCK(SCH_WRITE, &pJob->resLock);

    schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC);
  }

504
  if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) {
D
dapan1121 已提交
505
    if (NULL == pData->pEpSet) {
506 507
      SCH_TASK_ELOG("epset updating excepted, error:%s", tstrerror(rspCode));
      code = TSDB_CODE_INVALID_MSG;
D
dapan1121 已提交
508
      goto _return;
D
dapan1121 已提交
509 510 511
    }
  }

D
dapan1121 已提交
512
  code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
513 514
  taosMemoryFreeClear(pData->pData);
  taosMemoryFreeClear(pData->pEpSet);
D
dapan1121 已提交
515 516

  SCH_RET(code);
D
dapan1121 已提交
517 518 519

_return:

520 521
  taosMemoryFreeClear(pData->pData);
  taosMemoryFreeClear(pData->pEpSet);
D
dapan1121 已提交
522

D
dapan1121 已提交
523
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
524 525 526 527 528 529 530 531 532 533 534
}

int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      SCH_TASK_ELOG("task already in execTask list, code:%x", code);
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
S
Shengliang Guan 已提交
535
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559
  }

  SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));

  return TSDB_CODE_SUCCESS;
}

/*
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
  } else {
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
  }

  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno);
S
Shengliang Guan 已提交
560
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));

  return TSDB_CODE_SUCCESS;
}

int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  *moved = false;

  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
  }

  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;

      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno);
S
Shengliang Guan 已提交
587
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));

  return TSDB_CODE_SUCCESS;
}

int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
  }

  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;

      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
S
Shengliang Guan 已提交
612
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
613 614 615 616 617 618 619 620 621 622 623 624 625
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));

  return TSDB_CODE_SUCCESS;
}
*/

int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
  if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
    pTask->maxExecTimes++;
D
dapan1121 已提交
626
    pTask->maxRetryTimes++;
D
dapan1121 已提交
627 628 629 630 631 632 633 634
    if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) {
      pTask->timeoutUsec *= 2;
      if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) {
        pTask->timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC;
      }
    }
  }

D
dapan1121 已提交
635 636
  if ((pTask->retryTimes + 1) > pTask->maxRetryTimes) {
    *needRetry = false;
dengyihao's avatar
dengyihao 已提交
637 638
    SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d", pTask->retryTimes,
                  pTask->maxRetryTimes);
D
dapan1121 已提交
639 640 641
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
642 643
  if ((pTask->execId + 1) >= pTask->maxExecTimes) {
    *needRetry = false;
D
dapan1121 已提交
644
    SCH_TASK_DLOG("task no more retry since reach max exec times, execId:%d/%d", pTask->execId, pTask->maxExecTimes);
D
dapan1121 已提交
645 646 647 648 649 650 651 652 653
    return TSDB_CODE_SUCCESS;
  }

  if (!SCH_NEED_RETRY(pTask->lastMsgType, errCode)) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
654 655 656 657 658 659 660 661 662 663
  /*
    if (SCH_IS_DATA_BIND_TASK(pTask)) {
      if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
        *needRetry = false;
        SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId,
                      SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
        return TSDB_CODE_SUCCESS;
      }
    } else {
      int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
D
dapan1121 已提交
664

dengyihao's avatar
dengyihao 已提交
665 666 667 668 669 670
      if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) {
        *needRetry = false;
        SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
                      pTask->candidateIdx, candidateNum);
        return TSDB_CODE_SUCCESS;
      }
D
dapan1121 已提交
671
    }
dengyihao's avatar
dengyihao 已提交
672
  */
D
dapan1121 已提交
673 674 675 676 677 678 679 680 681 682

  *needRetry = true;
  SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execId + 1, errCode, tstrerror(errCode));

  return TSDB_CODE_SUCCESS;
}

int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
  atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);

H
Haojun Liao 已提交
683
  schRemoveTaskFromExecList(pJob, pTask);
D
dapan1121 已提交
684
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
685

D
dapan1121 已提交
686 687 688 689 690 691
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
  }

  schDeregisterTaskHb(pJob, pTask);

D
dapan1121 已提交
692
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
693 694
    SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
    SCH_SWITCH_EPSET(addr);
D
dapan1121 已提交
695
  } else {
D
dapan1121 已提交
696
    SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
D
dapan1121 已提交
697 698 699 700 701 702 703 704 705 706
  }

  SCH_ERR_RET(schLaunchTask(pJob, pTask));

  return TSDB_CODE_SUCCESS;
}

int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
  int32_t addNum = 0;
  int32_t nodeNum = 0;
707

D
dapan1121 已提交
708 709 710
  if (pJob->nodeList) {
    nodeNum = taosArrayGetSize(pJob->nodeList);

D
dapan1121 已提交
711
    for (int32_t i = 0; i < nodeNum; ++i) {
D
dapan1121 已提交
712 713
      SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
      SQueryNodeAddr *naddr = &nload->addr;
714

D
dapan1121 已提交
715 716
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
S
Shengliang Guan 已提交
717
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
718 719
      }

dengyihao's avatar
dengyihao 已提交
720 721 722
      SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId,
                    naddr->epSet.inUse, naddr->epSet.numOfEps, SCH_GET_CUR_EP(naddr)->fqdn,
                    SCH_GET_CUR_EP(naddr)->port);
D
dapan1121 已提交
723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741

      ++addNum;
    }
  }

  if (addNum <= 0) {
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
    SCH_ERR_RET(TSDB_CODE_TSC_NO_EXEC_NODE);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
  if (NULL != pTask->candidateAddrs) {
    return TSDB_CODE_SUCCESS;
  }

  pTask->candidateIdx = 0;
D
dapan1121 已提交
742
  pTask->candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr));
D
dapan1121 已提交
743
  if (NULL == pTask->candidateAddrs) {
D
dapan1121 已提交
744
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCHEDULE_DEFAULT_MAX_NODE_NUM);
S
Shengliang Guan 已提交
745
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
746 747 748 749 750
  }

  if (pTask->plan->execNode.epSet.numOfEps > 0) {
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
S
Shengliang Guan 已提交
751
      SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
752 753 754 755 756 757 758
    }

    SCH_TASK_DLOG("use execNode in plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
759
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
D
dapan1121 已提交
760
    SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
S
Shengliang Guan 已提交
761
    SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
762 763
  }

D
dapan1121 已提交
764 765 766 767 768 769 770 771 772 773 774 775 776 777
  SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));

  /*
    for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
      strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
      epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];

      ++epSet->numOfEps;
    }
  */

  return TSDB_CODE_SUCCESS;
}

778
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
D
dapan1121 已提交
779
  if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
780 781
    SCH_TASK_ELOG("not able to update cndidate addr, addr num %d",
                  (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0));
D
dapan1121 已提交
782 783 784
    SCH_ERR_RET(TSDB_CODE_APP_ERROR);
  }

785
  SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
D
dapan1121 已提交
786

787 788
  char *origEpset = schDumpEpSet(&pAddr->epSet);
  char *newEpset = schDumpEpSet(pEpSet);
D
dapan1121 已提交
789

790
  SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset);
D
dapan1121 已提交
791

dengyihao's avatar
dengyihao 已提交
792 793 794
  taosMemoryFree(origEpset);
  taosMemoryFree(newEpset);

D
dapan1121 已提交
795 796 797 798 799
  memcpy(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
800
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
801
  int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
D
dapan1121 已提交
802 803 804
  if (candidateNum <= 1) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
805

D
dapan1121 已提交
806 807
  switch (schMgmt.cfg.schPolicy) {
    case SCH_LOAD_SEQ:
dengyihao's avatar
dengyihao 已提交
808
    case SCH_ALL:
D
dapan1121 已提交
809 810 811 812 813 814 815 816 817 818 819 820
    default:
      if (++pTask->candidateIdx >= candidateNum) {
        pTask->candidateIdx = 0;
      }
      break;
    case SCH_RANDOM: {
      int32_t lastIdx = pTask->candidateIdx;
      while (lastIdx == pTask->candidateIdx) {
        pTask->candidateIdx = taosRand() % candidateNum;
      }
      break;
    }
D
dapan1121 已提交
821
  }
D
dapan1121 已提交
822 823 824 825

_return:

  SCH_TASK_DLOG("switch task candiateIdx to %d/%d", pTask->candidateIdx, candidateNum);
dengyihao's avatar
dengyihao 已提交
826

D
dapan1121 已提交
827 828 829
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
830 831 832
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
  int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
  if (code) {
H
Haojun Liao 已提交
833
    SCH_TASK_WLOG("task already not in execTask list, code:%x", code);
D
dapan1121 已提交
834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851
  }

  return TSDB_CODE_SUCCESS;
}

void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
  if (NULL == pTask->execNodes) {
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
    return;
  }

  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);

  if (size <= 0) {
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
    return;
  }

dengyihao's avatar
dengyihao 已提交
852
  int32_t       i = 0;
D
dapan1121 已提交
853 854
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
  while (nodeInfo) {
D
dapan1121 已提交
855 856 857 858 859 860 861
    if (nodeInfo->handle) {
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
      schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK);
      SCH_TASK_DLOG("start to drop task's %dth execNode", i);
    } else {
      SCH_TASK_DLOG("no need to drop task %dth execNode", i);
    }
D
dapan1121 已提交
862

D
dapan1121 已提交
863
    ++i;
D
dapan1121 已提交
864 865 866 867 868 869
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
  }

  SCH_TASK_DLOG("task has been dropped on %d exec nodes", size);
}

870 871
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
  int32_t   taskNum = (int32_t)taosArrayGetSize(pStatusList);
D
dapan1121 已提交
872
  SSchTask *pTask = NULL;
873
  SSchJob  *pJob = NULL;
D
dapan1121 已提交
874

875 876
  qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn,
         pEpId->ep.port);
D
dapan1121 已提交
877 878

  for (int32_t i = 0; i < taskNum; ++i) {
D
dapan1121 已提交
879
    STaskStatus *pStatus = taosArrayGet(pStatusList, i);
880 881 882 883
    int32_t      code = 0;

    qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId, pStatus->taskId,
           pStatus->execId, jobTaskStatusStr(pStatus->status));
D
dapan1121 已提交
884

D
dapan1121 已提交
885
    if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) {
D
dapan1121 已提交
886 887 888
      continue;
    }

D
dapan1121 已提交
889
    if (pStatus->execId != pTask->execId) {
890
      // TODO
D
dapan1121 已提交
891 892
      SCH_TASK_DLOG("execId %d mis-match current execId %d", pStatus->execId, pTask->execId);
      schProcessOnCbEnd(pJob, pTask, 0);
D
dapan1121 已提交
893 894
      continue;
    }
895

D
dapan1121 已提交
896
    if (pStatus->status == JOB_TASK_STATUS_FAIL) {
D
dapan1121 已提交
897
      // RECORD AND HANDLE ERROR!!!!
D
dapan1121 已提交
898
      schProcessOnCbEnd(pJob, pTask, 0);
D
dapan1121 已提交
899 900 901
      continue;
    }

D
dapan1121 已提交
902 903
    if (pStatus->status == JOB_TASK_STATUS_INIT) {
      code = schRescheduleTask(pJob, pTask);
D
dapan1121 已提交
904 905
    }

D
dapan1121 已提交
906
    schProcessOnCbEnd(pJob, pTask, code);
D
dapan1121 已提交
907 908 909 910 911
  }

  return TSDB_CODE_SUCCESS;
}

912 913 914 915 916 917 918
int32_t schHandleExplainRes(SArray *pExplainRes) {
  int32_t code = 0;
  int32_t resNum = taosArrayGetSize(pExplainRes);
  if (resNum <= 0) {
    goto _return;
  }

H
Hongze Cheng 已提交
919 920
  SSchTask *pTask = NULL;
  SSchJob  *pJob = NULL;
921 922

  for (int32_t i = 0; i < resNum; ++i) {
H
Hongze Cheng 已提交
923
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
924

D
dapan1121 已提交
925
    qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId);
926

D
dapan1121 已提交
927 928
    pJob = schAcquireJob(localRsp->rId);
    if (NULL == pJob) {
H
Hongze Cheng 已提交
929 930
      qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId,
            localRsp->tId, localRsp->rId);
D
dapan1121 已提交
931 932
      SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
    }
933

D
dapan1121 已提交
934 935 936 937 938
    int8_t status = 0;
    if (schJobNeedToStop(pJob, &status)) {
      SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
      schReleaseJob(pJob->refId);
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
939
    }
H
Haojun Liao 已提交
940

D
dapan1121 已提交
941 942 943 944
    code = schGetTaskInJob(pJob, localRsp->tId, &pTask);

    if (TSDB_CODE_SUCCESS == code) {
      code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp);
945
    }
H
Haojun Liao 已提交
946

D
dapan1121 已提交
947
    schReleaseJob(pJob->refId);
948

H
Hongze Cheng 已提交
949 950
    qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId,
           localRsp->tId, code);
951 952 953 954 955

    SCH_ERR_JRET(code);

    localRsp->rsp.numOfPlans = 0;
    localRsp->rsp.subplanInfo = NULL;
D
dapan1121 已提交
956 957
    pTask = NULL;
    pJob = NULL;
958 959 960 961 962
  }

_return:

  for (int32_t i = 0; i < resNum; ++i) {
H
Hongze Cheng 已提交
963
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
964 965 966 967 968 969 970 971
    tFreeSExplainRsp(&localRsp->rsp);
  }

  taosArrayDestroy(pExplainRes);

  SCH_RET(code);
}

D
dapan1121 已提交
972 973
int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
  SSubplan *plan = pTask->plan;
H
Hongze Cheng 已提交
974
  int32_t   code = 0;
D
dapan1121 已提交
975 976

  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
977
    code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
D
dapan1121 已提交
978 979 980 981
    if (TSDB_CODE_SUCCESS != code) {
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
                    pTask->msgLen);
      SCH_ERR_RET(code);
982
    } else if (tsQueryPlannerTrace) {
H
Hongze Cheng 已提交
983
      char   *msg = NULL;
984
      int32_t msgLen = 0;
D
dapan1121 已提交
985
      SCH_ERR_RET(qSubPlanToString(plan, &msg, &msgLen));
986 987
      SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
      taosMemoryFree(msg);
D
dapan1121 已提交
988 989 990 991 992 993
    }
  }

  SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));

  if (SCH_IS_QUERY_JOB(pJob)) {
994
    SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
D
dapan1121 已提交
995
  }
H
Haojun Liao 已提交
996

D
dapan1121 已提交
997 998 999 1000
  SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
}

int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
H
Hongze Cheng 已提交
1001
  // SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
D
dapan1121 已提交
1002 1003 1004
  if (NULL == schMgmt.queryMgmt) {
    SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, (void **)&schMgmt.queryMgmt, NULL));
  }
D
dapan1121 已提交
1005 1006

  SArray *explainRes = NULL;
D
dapan1121 已提交
1007
  int32_t code = 0;
H
Hongze Cheng 已提交
1008
  SQWMsg  qwMsg = {0};
D
dapan1121 已提交
1009 1010 1011 1012 1013
  qwMsg.msgInfo.taskType = TASK_TYPE_TEMP;
  qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
  qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask);
  qwMsg.msg = pTask->plan;
  qwMsg.msgType = pTask->plan->msgType;
D
dapan1121 已提交
1014 1015 1016
  qwMsg.connInfo.handle = pJob->conn.pTrans;

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
1017
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
D
dapan1121 已提交
1018
  }
H
Haojun Liao 已提交
1019

D
dapan1121 已提交
1020
  SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
dengyihao's avatar
dengyihao 已提交
1021
                                        pTask->execId, &qwMsg, explainRes));
D
dapan1121 已提交
1022

1023 1024
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_RET(schHandleExplainRes(explainRes));
D
dapan1121 已提交
1025
    explainRes = NULL;
1026 1027
  }

D
dapan1121 已提交
1028 1029 1030 1031 1032 1033 1034
  SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));

_return:

  taosArrayDestroy(explainRes);

  SCH_RET(code);
D
dapan1121 已提交
1035 1036
}

D
dapan1121 已提交
1037 1038
int32_t schLaunchTaskImpl(void *param) {
  SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
dengyihao's avatar
dengyihao 已提交
1039
  SSchJob     *pJob = schAcquireJob(pCtx->jobRid);
1040 1041
  if (NULL == pJob) {
    qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
dengyihao's avatar
dengyihao 已提交
1042
    taosMemoryFree(param);
1043 1044
    SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING);
  }
dengyihao's avatar
dengyihao 已提交
1045

D
dapan1121 已提交
1046
  SSchTask *pTask = pCtx->pTask;
H
Haojun Liao 已提交
1047 1048 1049 1050

  if (pCtx->asyncLaunch) {
    SCH_LOCK_TASK(pTask);
  }
H
Hongze Cheng 已提交
1051 1052 1053

  int8_t  status = 0;
  int32_t code = 0;
D
dapan1121 已提交
1054 1055 1056

  atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
  pTask->execId++;
D
dapan1121 已提交
1057
  pTask->retryTimes++;
D
dapan1121 已提交
1058
  pTask->waitRetry = false;
D
dapan1121 已提交
1059

H
Hongze Cheng 已提交
1060 1061
  SCH_TASK_DLOG("start to launch %s task, execId %d, retry %d",
                SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", pTask->execId, pTask->retryTimes);
D
dapan1121 已提交
1062 1063 1064 1065

  SCH_LOG_TASK_START_TS(pTask);

  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
1066
    SCH_TASK_DLOG("no need to launch task cause of job status %s", jobTaskStatusStr(status));
D
dapan1121 已提交
1067
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
1068 1069 1070 1071
  }

  // NOTE: race condition: the task should be put into the hash table before send msg to server
  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
D
dapan1121 已提交
1072
    SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
D
dapan1121 已提交
1073 1074 1075
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
  }

D
dapan1121 已提交
1076
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
D
dapan1121 已提交
1077 1078 1079
    SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask));
  } else {
    SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask));
D
dapan1121 已提交
1080 1081
  }

1082
#if 0
D
dapan1121 已提交
1083
  if (SCH_IS_QUERY_JOB(pJob)) {
D
dapan1121 已提交
1084
    SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
D
dapan1121 已提交
1085
  }
1086
#endif
D
dapan1121 已提交
1087

D
dapan1121 已提交
1088 1089
_return:

D
dapan1121 已提交
1090 1091 1092 1093 1094 1095 1096
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
    if (code) {
      code = schProcessOnTaskFailure(pJob, pTask, code);
    }
    if (code) {
      code = schHandleJobFailure(pJob, code);
    }
D
dapan1121 已提交
1097 1098
  }

H
Haojun Liao 已提交
1099 1100 1101 1102
  if (pCtx->asyncLaunch) {
    SCH_UNLOCK_TASK(pTask);
  }

1103 1104
  schReleaseJob(pJob->refId);

H
Haojun Liao 已提交
1105 1106
  taosMemoryFree(param);

D
dapan1121 已提交
1107 1108 1109
  SCH_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1110
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
1111 1112 1113 1114
  SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx));
  if (NULL == param) {
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
dengyihao's avatar
dengyihao 已提交
1115

1116
  param->jobRid = pJob->refId;
D
dapan1121 已提交
1117 1118
  param->pTask = pTask;

D
dapan1121 已提交
1119
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
H
Haojun Liao 已提交
1120
    param->asyncLaunch = true;
D
dapan1121 已提交
1121 1122 1123 1124
    taosAsyncExec(schLaunchTaskImpl, param, NULL);
  } else {
    SCH_ERR_RET(schLaunchTaskImpl(param));
  }
dengyihao's avatar
dengyihao 已提交
1125

D
dapan1121 已提交
1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139
  return TSDB_CODE_SUCCESS;
}

// Note: no more error processing, handled in function internal
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
  bool    enough = false;
  int32_t code = 0;

  SCH_SET_TASK_HANDLE(pTask, NULL);

  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));

    if (enough) {
D
dapan1121 已提交
1140
      SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
D
dapan1121 已提交
1141 1142
    }
  } else {
D
dapan1121 已提交
1143
    SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
D
dapan1121 已提交
1144 1145 1146 1147 1148 1149 1150 1151 1152
  }

  return TSDB_CODE_SUCCESS;

_return:

  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}

1153 1154 1155 1156 1157
void schHandleTimerEvent(void *param, void *tmrId) {
  SSchTimerParam *pTimerParam = (SSchTimerParam *)param;
  SSchTask       *pTask = NULL;
  SSchJob        *pJob = NULL;
  int32_t         code = 0;
dengyihao's avatar
dengyihao 已提交
1158

dengyihao's avatar
dengyihao 已提交
1159 1160 1161 1162 1163 1164
  int64_t  rId = pTimerParam->rId;
  uint64_t queryId = pTimerParam->queryId;
  uint64_t taskId = pTimerParam->taskId;
  taosMemoryFree(pTimerParam);

  if (schProcessOnCbBegin(&pJob, &pTask, queryId, rId, taskId)) {
1165 1166
    return;
  }
1167

1168
  code = schLaunchTask(pJob, pTask);
1169

1170
  schProcessOnCbEnd(pJob, pTask, code);
1171 1172 1173 1174 1175 1176
}

int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
  if (pTask->delayExecMs > 0) {
    SSchTimerParam *param = taosMemoryMalloc(sizeof(SSchTimerParam));
    if (NULL == param) {
dengyihao's avatar
dengyihao 已提交
1177
      SCH_TASK_ELOG("taosMemoryMalloc %d failed", (int)sizeof(SSchTimerParam));
S
Shengliang Guan 已提交
1178
      SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
1179
    }
dengyihao's avatar
dengyihao 已提交
1180

1181 1182 1183 1184
    param->rId = pJob->refId;
    param->queryId = pJob->queryId;
    param->taskId = pTask->taskId;

dengyihao's avatar
dengyihao 已提交
1185
    if (NULL == pTask->delayTimer) {
1186 1187
      pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer);
      if (NULL == pTask->delayTimer) {
1188
        SCH_TASK_ELOG("start delay timer failed, handle:%p", schMgmt.timer);
S
Shengliang Guan 已提交
1189
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
1190 1191 1192 1193 1194
      }

      return TSDB_CODE_SUCCESS;
    }

dengyihao's avatar
dengyihao 已提交
1195
    taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer, &pTask->delayTimer);
1196 1197 1198 1199 1200 1201 1202

    return TSDB_CODE_SUCCESS;
  }

  SCH_RET(schLaunchTask(pJob, pTask));
}

D
dapan1121 已提交
1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
  SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level));

  for (int32_t i = 0; i < level->taskNum; ++i) {
    SSchTask *pTask = taosArrayGet(level->subTasks, i);

    SCH_ERR_RET(schLaunchTask(pJob, pTask));
  }

  return TSDB_CODE_SUCCESS;
}

void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
D
dapan1121 已提交
1216
  if (!SCH_JOB_NEED_DROP(pJob)) {
D
dapan1121 已提交
1217 1218 1219 1220 1221 1222 1223
    return;
  }

  void *pIter = taosHashIterate(list, NULL);
  while (pIter) {
    SSchTask *pTask = *(SSchTask **)pIter;

1224 1225 1226 1227
    SCH_LOCK_TASK(pTask);
    if (pTask->delayTimer) {
      taosTmrStopA(&pTask->delayTimer);
    }
D
dapan1121 已提交
1228
    schDropTaskOnExecNode(pJob, pTask);
1229
    SCH_UNLOCK_TASK(pTask);
D
dapan1121 已提交
1230 1231 1232 1233 1234

    pIter = taosHashIterate(list, pIter);
  }
}

D
dapan1121 已提交
1235 1236 1237 1238 1239
int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
  SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask)));
}

int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
H
Hongze Cheng 已提交
1240
  void   *pRsp = NULL;
D
dapan1121 已提交
1241
  int32_t code = 0;
D
dapan1121 已提交
1242 1243 1244
  SArray *explainRes = NULL;

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
1245
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
D
dapan1121 已提交
1246 1247
  }

D
dapan1121 已提交
1248
  SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
dengyihao's avatar
dengyihao 已提交
1249
                                        pTask->execId, &pRsp, explainRes));
1250 1251 1252

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_RET(schHandleExplainRes(explainRes));
D
dapan1121 已提交
1253
    explainRes = NULL;
1254
  }
H
Haojun Liao 已提交
1255

D
dapan1121 已提交
1256 1257
  SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));

D
dapan1121 已提交
1258 1259 1260 1261 1262
_return:

  taosArrayDestroy(explainRes);

  SCH_RET(code);
D
dapan1121 已提交
1263 1264
}

D
dapan1121 已提交
1265 1266 1267 1268
// Note: no more error processing, handled in function internal
int32_t schLaunchFetchTask(SSchJob *pJob) {
  int32_t code = 0;

D
dapan1121 已提交
1269 1270 1271
  void *fetchRes = atomic_load_ptr(&pJob->fetchRes);
  if (fetchRes) {
    SCH_JOB_DLOG("res already fetched, res:%p", fetchRes);
D
dapan1121 已提交
1272 1273 1274
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1275 1276 1277 1278 1279
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) {
    SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask));
  } else {
    SCH_ERR_JRET(schExecRemoteFetch(pJob, pJob->fetchTask));
  }
D
dapan1121 已提交
1280 1281 1282 1283 1284 1285 1286

  return TSDB_CODE_SUCCESS;

_return:

  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
}