schTask.c 38.4 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalog.h"
#include "command.h"
#include "query.h"
H
Hongze Cheng 已提交
19
#include "qworker.h"
D
dapan1121 已提交
20
#include "schInt.h"
H
Hongze Cheng 已提交
21
#include "tglobal.h"
D
dapan1121 已提交
22 23 24 25 26 27
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"

void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
  schDeregisterTaskHb(pJob, pTask);
28

D
dapan1121 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
  if (pTask->candidateAddrs) {
    taosArrayDestroy(pTask->candidateAddrs);
  }

  taosMemoryFreeClear(pTask->msg);

  if (pTask->children) {
    taosArrayDestroy(pTask->children);
  }

  if (pTask->parents) {
    taosArrayDestroy(pTask->parents);
  }

  if (pTask->execNodes) {
    taosHashCleanup(pTask->execNodes);
  }
D
dapan1121 已提交
46

D
dapan1121 已提交
47 48 49
  taosArrayDestroy(pTask->profile.execTime);
}

D
dapan1121 已提交
50
void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) {
D
dapan1121 已提交
51
  if (SCH_IS_DATA_BIND_TASK(pTask) || (!SCH_IS_QUERY_JOB(pJob)) || (SCH_ALL != schMgmt.cfg.schPolicy)) {
D
dapan1121 已提交
52
    pTask->maxRetryTimes = SCH_DEFAULT_MAX_RETRY_NUM;
D
dapan1121 已提交
53 54
  } else {
    int32_t nodeNum = taosArrayGetSize(pJob->nodeList);
D
dapan1121 已提交
55
    pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM);
D
dapan1121 已提交
56
  }
dengyihao's avatar
dengyihao 已提交
57

D
dapan1121 已提交
58
  pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1);
D
dapan1121 已提交
59 60
}

D
dapan1121 已提交
61
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
D
dapan1121 已提交
62
  int32_t code = 0;
63

D
dapan1121 已提交
64 65 66 67 68
  pTask->plan = pPlan;
  pTask->level = pLevel;
  pTask->execId = -1;
  pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
  pTask->taskId = schGenTaskId();
D
dapan1121 已提交
69

D
dapan1121 已提交
70
  schInitTaskRetryTimes(pJob, pTask, pLevel);
D
dapan1121 已提交
71

D
dapan1121 已提交
72 73
  pTask->execNodes =
      taosHashInit(pTask->maxExecTimes, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
D
dapan1121 已提交
74
  pTask->profile.execTime = taosArrayInit(pTask->maxExecTimes, sizeof(int64_t));
D
dapan1121 已提交
75
  if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
S
Shengliang Guan 已提交
76
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
77 78
  }

D
dapan1121 已提交
79 80
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);

D
dapan1121 已提交
81 82
  SCH_TASK_DLOG("task initialized, max times %d:%d", pTask->maxRetryTimes, pTask->maxExecTimes);

D
dapan1121 已提交
83
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
84 85 86

_return:

D
dapan1121 已提交
87
  taosArrayDestroy(pTask->profile.execTime);
D
dapan1121 已提交
88 89 90
  taosHashCleanup(pTask->execNodes);

  SCH_RET(code);
D
dapan1121 已提交
91 92 93
}

int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
94 95 96
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
97

D
dapan1121 已提交
98 99 100 101 102 103 104 105 106 107 108 109 110
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
  if (NULL == addr) {
    SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
                  (int32_t)taosArrayGetSize(pTask->candidateAddrs));
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

  pTask->succeedAddr = *addr;

  return TSDB_CODE_SUCCESS;
}

int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) {
D
dapan1121 已提交
111
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = SCH_GET_TASK_HANDLE(pTask)};
D
dapan1121 已提交
112 113 114

  if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) {
    SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno);
S
Shengliang Guan 已提交
115
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
116 117
  }

D
dapan1121 已提交
118
  SCH_TASK_DLOG("task execNode added, execId:%d, handle:%p", execId, nodeInfo.handle);
D
dapan1121 已提交
119 120 121 122 123 124 125 126 127 128

  return TSDB_CODE_SUCCESS;
}

int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
  if (NULL == pTask->execNodes) {
    return TSDB_CODE_SUCCESS;
  }

  if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) {
D
dapan1121 已提交
129
    SCH_TASK_DLOG("execId %d already not in execNodeList", execId);
D
dapan1121 已提交
130
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
131 132 133
  } else {
    SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
  }
134

D
dapan1121 已提交
135 136
  if ((execId != pTask->execId) || pTask->waitRetry) {  // ignore it
    SCH_TASK_DLOG("execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
D
dapan1121 已提交
137
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
138 139 140 141 142 143 144 145 146 147 148
  }

  return TSDB_CODE_SUCCESS;
}

int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
  if (taosHashGetSize(pTask->execNodes) <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
D
dapan1121 已提交
149
  if (NULL == nodeInfo) {  // ignore it
dengyihao's avatar
dengyihao 已提交
150 151
    SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId,
                  pTask->execId, pTask->waitRetry);
D
dapan1121 已提交
152 153 154
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
155 156 157 158 159 160 161 162 163 164 165
  nodeInfo->handle = handle;

  SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId);

  return TSDB_CODE_SUCCESS;
}

int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId) {
  if (dropExecNode) {
    SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
  }
H
Hongze Cheng 已提交
166

H
Haojun Liao 已提交
167
  schUpdateTaskExecNode(pJob, pTask, handle, execId);
D
dapan1121 已提交
168

H
Haojun Liao 已提交
169
  if ((execId != pTask->execId) || pTask->waitRetry) {  // ignore it
H
Hongze Cheng 已提交
170 171
    SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId,
                  pTask->execId, pTask->waitRetry);
H
Haojun Liao 已提交
172 173
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
  }
H
Hongze Cheng 已提交
174

D
dapan1121 已提交
175 176 177 178 179 180
  SCH_SET_TASK_HANDLE(pTask, handle);

  return TSDB_CODE_SUCCESS;
}

int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
D
dapan1121 已提交
181 182 183
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    return TSDB_CODE_SCH_IGNORE_ERROR;
  }
184

D
dapan1121 已提交
185 186 187
  int8_t jobStatus = 0;
  if (schJobNeedToStop(pJob, &jobStatus)) {
    SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(jobStatus));
D
dapan1121 已提交
188 189
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
  }
190

D
dapan1121 已提交
191 192 193
  int8_t taskStatus = SCH_GET_TASK_STATUS(pTask);
  if (taskStatus == JOB_TASK_STATUS_FAIL || taskStatus == JOB_TASK_STATUS_SUCC) {
    SCH_TASK_ELOG("task already done, status:%s", jobTaskStatusStr(taskStatus));
D
dapan1121 已提交
194 195
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
D
dapan1121 已提交
196 197 198 199 200 201 202 203 204 205 206 207 208

  if (errCode == TSDB_CODE_SCH_TIMEOUT_ERROR) {
    SCH_LOG_TASK_WAIT_TS(pTask);
  } else {
    SCH_LOG_TASK_END_TS(pTask);
  }

  bool    needRetry = false;
  bool    moved = false;
  int32_t taskDone = 0;

  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));

D
dapan1121 已提交
209
  SCH_ERR_RET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
D
dapan1121 已提交
210 211 212 213 214 215

  if (!needRetry) {
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));

    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAIL);

D
dapan1121 已提交
216
    if (SCH_JOB_NEED_WAIT(pJob)) {
D
dapan1121 已提交
217 218 219 220 221 222 223 224 225
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskFailed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);

      schUpdateJobErrCode(pJob, errCode);

      if (taskDone < pTask->level->taskNum) {
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
226
        SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
227
      }
D
dapan1121 已提交
228 229

      SCH_RET(atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
230 231
    }
  } else {
D
dapan1121 已提交
232
    SCH_ERR_RET(schHandleTaskRetry(pJob, pTask));
D
dapan1121 已提交
233 234 235 236

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
237
  SCH_RET(errCode);
D
dapan1121 已提交
238 239 240 241 242 243 244 245 246 247 248 249
}

int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
  bool    moved = false;
  int32_t code = 0;

  SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));

  SCH_LOG_TASK_END_TS(pTask);

  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC);

D
dapan1121 已提交
250
  SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask));
D
dapan1121 已提交
251

D
dapan1121 已提交
252
  SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
D
dapan1121 已提交
253 254 255 256

  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
  if (parentNum == 0) {
    int32_t taskDone = 0;
D
dapan1121 已提交
257
    if (SCH_JOB_NEED_WAIT(pJob)) {
D
dapan1121 已提交
258 259 260 261 262 263 264 265 266 267 268 269 270
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskSucceed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);

      if (taskDone < pTask->level->taskNum) {
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
        return TSDB_CODE_SUCCESS;
      } else if (taskDone > pTask->level->taskNum) {
        SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
      }

      if (pTask->level->taskFailed > 0) {
D
dapan1121 已提交
271
        SCH_RET(schHandleJobFailure(pJob, pJob->errCode));
D
dapan1121 已提交
272
      } else {
D
dapan1121 已提交
273
        SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
D
dapan1121 已提交
274 275 276 277 278 279 280
      }
    } else {
      pJob->resNode = pTask->succeedAddr;
    }

    pJob->fetchTask = pTask;

D
dapan1121 已提交
281
    SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
D
dapan1121 已提交
282 283 284 285 286 287 288 289 290 291 292 293 294 295
  }

  /*
    if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
      strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
      job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;

      ++job->dataSrcEps.numOfEps;
    }
  */

  for (int32_t i = 0; i < parentNum; ++i) {
    SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);

296
    SCH_LOCK(SCH_WRITE, &parent->planLock);
297 298 299 300 301 302 303
    SDownstreamSourceNode source = {
        .type = QUERY_NODE_DOWNSTREAM_SOURCE,
        .taskId = pTask->taskId,
        .schedId = schMgmt.sId,
        .execId = pTask->execId,
        .addr = pTask->succeedAddr,
        .fetchMsgType = SCH_FETCH_TYPE(pTask),
D
dapan1121 已提交
304
        .localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask),
305
    };
D
dapan1121 已提交
306
    qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
307
    SCH_UNLOCK(SCH_WRITE, &parent->planLock);
D
dapan1121 已提交
308

X
Xiaoyu Wang 已提交
309 310
    int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);

D
dapan1121 已提交
311 312 313 314 315 316 317 318 319 320 321 322
    if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
      SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId);
      SCH_ERR_RET(schLaunchTask(pJob, parent));
    }
  }

  SCH_ERR_RET(schLaunchJobLowerLevel(pJob, pTask));

  return TSDB_CODE_SUCCESS;
}

int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
323 324 325
  if (!schMgmt.cfg.enableReSchedule) {
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
326

D
dapan1121 已提交
327
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
D
dapan1121 已提交
328 329 330
    return TSDB_CODE_SUCCESS;
  }

331 332
  if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && pJob->fetchTask != pTask &&
      taosArrayGetSize(pTask->candidateAddrs) > 1) {
D
dapan1121 已提交
333 334 335
    SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId);
    schDropTaskOnExecNode(pJob, pTask);
    taosHashClear(pTask->execNodes);
336

D
dapan1121 已提交
337
    SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR));
D
dapan1121 已提交
338 339 340 341 342
  }

  return TSDB_CODE_SUCCESS;
}

343
int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode) {
344 345 346 347 348
  SSchRedirectCtx *pCtx = &pTask->redirectCtx;
  if (!pCtx->inRedirect) {
    pCtx->inRedirect = true;
    pCtx->periodMs = tsRedirectPeriod;
    pCtx->startTs = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
349

350 351 352 353 354 355 356 357 358 359 360 361 362
    if (SCH_IS_DATA_BIND_TASK(pTask)) {
      if (pEpSet) {
        pCtx->roundTotal = pEpSet->numOfEps;
      } else {
        SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
        pCtx->roundTotal = pAddr->epSet.numOfEps;
      }
    } else {
      pCtx->roundTotal = 1;
    }

    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
363

364
  pCtx->totalTimes++;
D
dapan1121 已提交
365
  pCtx->roundTimes++;
366 367 368 369 370 371 372

  if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) {
    pCtx->roundTotal = pEpSet->numOfEps;
  }


  if (pCtx->roundTimes >= pCtx->roundTotal) {
373 374 375
    int64_t nowTs = taosGetTimestampMs();
    int64_t lastTime = nowTs - pCtx->startTs;
    if (lastTime > tsMaxRetryWaitTime) {
dengyihao's avatar
dengyihao 已提交
376 377
      SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d",
                    nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes);
378
      SCH_ERR_RET(SCH_GET_REDICT_CODE(pJob, rspCode));
379 380
    }

381 382 383 384 385
    pCtx->periodMs *= tsRedirectFactor;
    if (pCtx->periodMs > tsRedirectMaxPeriod) {
      pCtx->periodMs = tsRedirectMaxPeriod;
    }

386 387
    int64_t leftTime = tsMaxRetryWaitTime - lastTime;
    pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs;
388

D
dapan1121 已提交
389 390
    pCtx->roundTimes = 0;

391 392 393 394 395 396 397
    goto _return;
  }

  pTask->delayExecMs = 0;

_return:

dengyihao's avatar
dengyihao 已提交
398 399 400
  SCH_TASK_DLOG("task start %d/%d/%d redirect retry, delayExec:%d", pCtx->roundTimes, pCtx->roundTotal,
                pCtx->totalTimes, pTask->delayExecMs);

401 402 403
  return TSDB_CODE_SUCCESS;
}

404
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
D
dapan1121 已提交
405
  int32_t code = 0;
406

D
dapan1121 已提交
407
  SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode));
D
dapan1121 已提交
408 409 410

  if (NULL == pData) {
    pTask->retryTimes = 0;
D
dapan1121 已提交
411 412
  }

413 414 415 416 417
  if (!NO_RET_REDIRECT_ERROR(rspCode)) {
    SCH_UPDATE_REDICT_CODE(pJob, rspCode);
  }

  SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode));
418

D
dapan1121 已提交
419
  pTask->waitRetry = true;
dengyihao's avatar
dengyihao 已提交
420

D
dapan1121 已提交
421 422
  schDropTaskOnExecNode(pJob, pTask);
  taosHashClear(pTask->execNodes);
H
Haojun Liao 已提交
423
  schRemoveTaskFromExecList(pJob, pTask);
D
dapan1121 已提交
424 425 426 427 428 429 430
  schDeregisterTaskHb(pJob, pTask);
  atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
  taosMemoryFreeClear(pTask->msg);
  pTask->msgLen = 0;
  pTask->lastMsgType = 0;
  memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));

D
dapan1121 已提交
431
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
432
    if (pData && pData->pEpSet) {
D
dapan1121 已提交
433
      SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
D
dapan1121 已提交
434
    } else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) {
435
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
dengyihao's avatar
dengyihao 已提交
436
      SEp            *pEp = &addr->epSet.eps[addr->epSet.inUse];
D
dapan1121 已提交
437 438
      SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse,
                    addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode));
D
dapan1121 已提交
439 440 441 442
    } else {
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
      SCH_SWITCH_EPSET(addr);
      SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
D
dapan1121 已提交
443
    }
444

D
dapan1121 已提交
445 446 447 448
    if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
      if (JOB_TASK_STATUS_EXEC == SCH_GET_TASK_STATUS(pTask)) {
        SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
      }
449
    }
D
dapan1121 已提交
450 451

    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
452

453
    SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask));
D
dapan1121 已提交
454 455 456 457 458

    return TSDB_CODE_SUCCESS;
  }

  // merge plan
459

D
dapan1121 已提交
460
  pTask->childReady = 0;
461

D
dapan1121 已提交
462 463
  qClearSubplanExecutionNode(pTask->plan);

D
dapan1121 已提交
464 465 466 467 468
  // Note: current error task and upper level merge task
  if ((pData && 0 == pData->len) || NULL == pData) {
    SCH_ERR_JRET(schSwitchTaskCandidateAddr(pJob, pTask));
  }

D
dapan1121 已提交
469
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
470

D
dapan1121 已提交
471 472
  int32_t childrenNum = taosArrayGetSize(pTask->children);
  for (int32_t i = 0; i < childrenNum; ++i) {
473
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
D
dapan1121 已提交
474 475 476 477 478 479 480 481 482
    SCH_LOCK_TASK(pChild);
    schDoTaskRedirect(pJob, pChild, NULL, rspCode);
    SCH_UNLOCK_TASK(pChild);
  }

  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
483
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
484 485
}

486
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
D
dapan1121 已提交
487 488
  int32_t code = 0;

D
dapan1121 已提交
489 490 491 492 493
  if (JOB_TASK_STATUS_PART_SUCC == pJob->status) {
    SCH_LOCK(SCH_WRITE, &pJob->resLock);
    if (pJob->fetched) {
      SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
      SCH_TASK_ELOG("already fetched while got error %s", tstrerror(rspCode));
D
dapan1121 已提交
494
      SCH_ERR_JRET(rspCode);
D
dapan1121 已提交
495 496 497 498 499 500
    }
    SCH_UNLOCK(SCH_WRITE, &pJob->resLock);

    schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC);
  }

501
  if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) {
D
dapan1121 已提交
502
    if (NULL == pData->pEpSet) {
503 504
      SCH_TASK_ELOG("epset updating excepted, error:%s", tstrerror(rspCode));
      code = TSDB_CODE_INVALID_MSG;
D
dapan1121 已提交
505
      goto _return;
D
dapan1121 已提交
506 507 508
    }
  }

D
dapan1121 已提交
509
  code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
510 511
  taosMemoryFreeClear(pData->pData);
  taosMemoryFreeClear(pData->pEpSet);
D
dapan1121 已提交
512 513

  SCH_RET(code);
D
dapan1121 已提交
514 515 516

_return:

517 518
  taosMemoryFreeClear(pData->pData);
  taosMemoryFreeClear(pData->pEpSet);
D
dapan1121 已提交
519

D
dapan1121 已提交
520
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
521 522 523 524 525 526 527 528 529 530 531
}

int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      SCH_TASK_ELOG("task already in execTask list, code:%x", code);
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
S
Shengliang Guan 已提交
532
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556
  }

  SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));

  return TSDB_CODE_SUCCESS;
}

/*
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
  } else {
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
  }

  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno);
S
Shengliang Guan 已提交
557
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));

  return TSDB_CODE_SUCCESS;
}

int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  *moved = false;

  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
  }

  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;

      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno);
S
Shengliang Guan 已提交
584
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));

  return TSDB_CODE_SUCCESS;
}

int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
  }

  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;

      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
S
Shengliang Guan 已提交
609
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
610 611 612 613 614 615 616 617 618 619 620 621 622
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));

  return TSDB_CODE_SUCCESS;
}
*/

int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
  if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
    pTask->maxExecTimes++;
D
dapan1121 已提交
623
    pTask->maxRetryTimes++;
D
dapan1121 已提交
624 625 626 627 628 629 630 631
    if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) {
      pTask->timeoutUsec *= 2;
      if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) {
        pTask->timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC;
      }
    }
  }

D
dapan1121 已提交
632 633
  if ((pTask->retryTimes + 1) > pTask->maxRetryTimes) {
    *needRetry = false;
dengyihao's avatar
dengyihao 已提交
634 635
    SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d", pTask->retryTimes,
                  pTask->maxRetryTimes);
D
dapan1121 已提交
636 637 638
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
639 640
  if ((pTask->execId + 1) >= pTask->maxExecTimes) {
    *needRetry = false;
D
dapan1121 已提交
641
    SCH_TASK_DLOG("task no more retry since reach max exec times, execId:%d/%d", pTask->execId, pTask->maxExecTimes);
D
dapan1121 已提交
642 643 644 645 646 647 648 649 650
    return TSDB_CODE_SUCCESS;
  }

  if (!SCH_NEED_RETRY(pTask->lastMsgType, errCode)) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
651 652 653 654 655 656 657 658 659 660
  /*
    if (SCH_IS_DATA_BIND_TASK(pTask)) {
      if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
        *needRetry = false;
        SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId,
                      SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
        return TSDB_CODE_SUCCESS;
      }
    } else {
      int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
D
dapan1121 已提交
661

dengyihao's avatar
dengyihao 已提交
662 663 664 665 666 667
      if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) {
        *needRetry = false;
        SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
                      pTask->candidateIdx, candidateNum);
        return TSDB_CODE_SUCCESS;
      }
D
dapan1121 已提交
668
    }
dengyihao's avatar
dengyihao 已提交
669
  */
D
dapan1121 已提交
670 671 672 673 674 675 676 677 678 679

  *needRetry = true;
  SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execId + 1, errCode, tstrerror(errCode));

  return TSDB_CODE_SUCCESS;
}

int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
  atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);

H
Haojun Liao 已提交
680
  schRemoveTaskFromExecList(pJob, pTask);
D
dapan1121 已提交
681
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
682

D
dapan1121 已提交
683 684 685 686 687 688
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
  }

  schDeregisterTaskHb(pJob, pTask);

D
dapan1121 已提交
689
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
690 691
    SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
    SCH_SWITCH_EPSET(addr);
D
dapan1121 已提交
692
  } else {
D
dapan1121 已提交
693
    SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
D
dapan1121 已提交
694 695 696 697 698 699 700 701 702 703
  }

  SCH_ERR_RET(schLaunchTask(pJob, pTask));

  return TSDB_CODE_SUCCESS;
}

int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
  int32_t addNum = 0;
  int32_t nodeNum = 0;
704

D
dapan1121 已提交
705 706 707
  if (pJob->nodeList) {
    nodeNum = taosArrayGetSize(pJob->nodeList);

D
dapan1121 已提交
708
    for (int32_t i = 0; i < nodeNum; ++i) {
D
dapan1121 已提交
709 710
      SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
      SQueryNodeAddr *naddr = &nload->addr;
711

D
dapan1121 已提交
712 713
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
S
Shengliang Guan 已提交
714
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
715 716
      }

dengyihao's avatar
dengyihao 已提交
717 718 719
      SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId,
                    naddr->epSet.inUse, naddr->epSet.numOfEps, SCH_GET_CUR_EP(naddr)->fqdn,
                    SCH_GET_CUR_EP(naddr)->port);
D
dapan1121 已提交
720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738

      ++addNum;
    }
  }

  if (addNum <= 0) {
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
    SCH_ERR_RET(TSDB_CODE_TSC_NO_EXEC_NODE);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
  if (NULL != pTask->candidateAddrs) {
    return TSDB_CODE_SUCCESS;
  }

  pTask->candidateIdx = 0;
D
dapan1121 已提交
739
  pTask->candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr));
D
dapan1121 已提交
740
  if (NULL == pTask->candidateAddrs) {
D
dapan1121 已提交
741
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCHEDULE_DEFAULT_MAX_NODE_NUM);
S
Shengliang Guan 已提交
742
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
743 744 745 746 747
  }

  if (pTask->plan->execNode.epSet.numOfEps > 0) {
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
S
Shengliang Guan 已提交
748
      SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
749 750 751 752 753 754 755
    }

    SCH_TASK_DLOG("use execNode in plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
756
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
D
dapan1121 已提交
757
    SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
S
Shengliang Guan 已提交
758
    SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
759 760
  }

D
dapan1121 已提交
761 762 763 764 765 766 767 768 769 770 771 772 773 774
  SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));

  /*
    for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
      strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
      epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];

      ++epSet->numOfEps;
    }
  */

  return TSDB_CODE_SUCCESS;
}

775
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
D
dapan1121 已提交
776
  if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
777 778
    SCH_TASK_ELOG("not able to update cndidate addr, addr num %d",
                  (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0));
D
dapan1121 已提交
779 780 781
    SCH_ERR_RET(TSDB_CODE_APP_ERROR);
  }

782
  SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
D
dapan1121 已提交
783

784 785
  char *origEpset = schDumpEpSet(&pAddr->epSet);
  char *newEpset = schDumpEpSet(pEpSet);
D
dapan1121 已提交
786

787
  SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset);
D
dapan1121 已提交
788

dengyihao's avatar
dengyihao 已提交
789 790 791
  taosMemoryFree(origEpset);
  taosMemoryFree(newEpset);

D
dapan1121 已提交
792 793 794 795 796
  memcpy(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
797
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
798
  int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
D
dapan1121 已提交
799 800 801
  if (candidateNum <= 1) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
802

D
dapan1121 已提交
803 804
  switch (schMgmt.cfg.schPolicy) {
    case SCH_LOAD_SEQ:
dengyihao's avatar
dengyihao 已提交
805
    case SCH_ALL:
D
dapan1121 已提交
806 807 808 809 810 811 812 813 814 815 816 817
    default:
      if (++pTask->candidateIdx >= candidateNum) {
        pTask->candidateIdx = 0;
      }
      break;
    case SCH_RANDOM: {
      int32_t lastIdx = pTask->candidateIdx;
      while (lastIdx == pTask->candidateIdx) {
        pTask->candidateIdx = taosRand() % candidateNum;
      }
      break;
    }
D
dapan1121 已提交
818
  }
D
dapan1121 已提交
819 820 821 822

_return:

  SCH_TASK_DLOG("switch task candiateIdx to %d/%d", pTask->candidateIdx, candidateNum);
dengyihao's avatar
dengyihao 已提交
823

D
dapan1121 已提交
824 825 826
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
827 828 829
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
  int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
  if (code) {
H
Haojun Liao 已提交
830
    SCH_TASK_WLOG("task already not in execTask list, code:%x", code);
D
dapan1121 已提交
831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848
  }

  return TSDB_CODE_SUCCESS;
}

void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
  if (NULL == pTask->execNodes) {
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
    return;
  }

  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);

  if (size <= 0) {
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
    return;
  }

dengyihao's avatar
dengyihao 已提交
849
  int32_t       i = 0;
D
dapan1121 已提交
850 851
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
  while (nodeInfo) {
D
dapan1121 已提交
852 853 854 855 856 857 858
    if (nodeInfo->handle) {
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
      schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK);
      SCH_TASK_DLOG("start to drop task's %dth execNode", i);
    } else {
      SCH_TASK_DLOG("no need to drop task %dth execNode", i);
    }
D
dapan1121 已提交
859

D
dapan1121 已提交
860
    ++i;
D
dapan1121 已提交
861 862 863 864 865 866
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
  }

  SCH_TASK_DLOG("task has been dropped on %d exec nodes", size);
}

867 868
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
  int32_t   taskNum = (int32_t)taosArrayGetSize(pStatusList);
D
dapan1121 已提交
869
  SSchTask *pTask = NULL;
870
  SSchJob  *pJob = NULL;
D
dapan1121 已提交
871

872 873
  qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn,
         pEpId->ep.port);
D
dapan1121 已提交
874 875

  for (int32_t i = 0; i < taskNum; ++i) {
D
dapan1121 已提交
876
    STaskStatus *pStatus = taosArrayGet(pStatusList, i);
877 878 879 880
    int32_t      code = 0;

    qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId, pStatus->taskId,
           pStatus->execId, jobTaskStatusStr(pStatus->status));
D
dapan1121 已提交
881

D
dapan1121 已提交
882
    if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) {
D
dapan1121 已提交
883 884 885
      continue;
    }

D
dapan1121 已提交
886
    if (pStatus->execId != pTask->execId) {
887
      // TODO
D
dapan1121 已提交
888 889
      SCH_TASK_DLOG("execId %d mis-match current execId %d", pStatus->execId, pTask->execId);
      schProcessOnCbEnd(pJob, pTask, 0);
D
dapan1121 已提交
890 891
      continue;
    }
892

D
dapan1121 已提交
893
    if (pStatus->status == JOB_TASK_STATUS_FAIL) {
D
dapan1121 已提交
894
      // RECORD AND HANDLE ERROR!!!!
D
dapan1121 已提交
895
      schProcessOnCbEnd(pJob, pTask, 0);
D
dapan1121 已提交
896 897 898
      continue;
    }

D
dapan1121 已提交
899 900
    if (pStatus->status == JOB_TASK_STATUS_INIT) {
      code = schRescheduleTask(pJob, pTask);
D
dapan1121 已提交
901 902
    }

D
dapan1121 已提交
903
    schProcessOnCbEnd(pJob, pTask, code);
D
dapan1121 已提交
904 905 906 907 908
  }

  return TSDB_CODE_SUCCESS;
}

909 910 911 912 913 914 915
int32_t schHandleExplainRes(SArray *pExplainRes) {
  int32_t code = 0;
  int32_t resNum = taosArrayGetSize(pExplainRes);
  if (resNum <= 0) {
    goto _return;
  }

H
Hongze Cheng 已提交
916 917
  SSchTask *pTask = NULL;
  SSchJob  *pJob = NULL;
918 919

  for (int32_t i = 0; i < resNum; ++i) {
H
Hongze Cheng 已提交
920
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
921

D
dapan1121 已提交
922
    qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId);
923

D
dapan1121 已提交
924 925
    pJob = schAcquireJob(localRsp->rId);
    if (NULL == pJob) {
H
Hongze Cheng 已提交
926 927
      qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId,
            localRsp->tId, localRsp->rId);
D
dapan1121 已提交
928 929
      SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
    }
930

D
dapan1121 已提交
931 932 933 934 935
    int8_t status = 0;
    if (schJobNeedToStop(pJob, &status)) {
      SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
      schReleaseJob(pJob->refId);
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
936
    }
H
Haojun Liao 已提交
937

D
dapan1121 已提交
938 939 940 941
    code = schGetTaskInJob(pJob, localRsp->tId, &pTask);

    if (TSDB_CODE_SUCCESS == code) {
      code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp);
942
    }
H
Haojun Liao 已提交
943

D
dapan1121 已提交
944
    schReleaseJob(pJob->refId);
945

H
Hongze Cheng 已提交
946 947
    qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId,
           localRsp->tId, code);
948 949 950 951 952

    SCH_ERR_JRET(code);

    localRsp->rsp.numOfPlans = 0;
    localRsp->rsp.subplanInfo = NULL;
D
dapan1121 已提交
953 954
    pTask = NULL;
    pJob = NULL;
955 956 957 958 959
  }

_return:

  for (int32_t i = 0; i < resNum; ++i) {
H
Hongze Cheng 已提交
960
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
961 962 963 964 965 966 967 968
    tFreeSExplainRsp(&localRsp->rsp);
  }

  taosArrayDestroy(pExplainRes);

  SCH_RET(code);
}

D
dapan1121 已提交
969 970
int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
  SSubplan *plan = pTask->plan;
H
Hongze Cheng 已提交
971
  int32_t   code = 0;
D
dapan1121 已提交
972 973

  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
974
    code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
D
dapan1121 已提交
975 976 977 978
    if (TSDB_CODE_SUCCESS != code) {
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
                    pTask->msgLen);
      SCH_ERR_RET(code);
979
    } else if (tsQueryPlannerTrace) {
H
Hongze Cheng 已提交
980
      char   *msg = NULL;
981
      int32_t msgLen = 0;
D
dapan1121 已提交
982
      SCH_ERR_RET(qSubPlanToString(plan, &msg, &msgLen));
983 984
      SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
      taosMemoryFree(msg);
D
dapan1121 已提交
985 986 987 988 989 990
    }
  }

  SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));

  if (SCH_IS_QUERY_JOB(pJob)) {
991
    SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
D
dapan1121 已提交
992
  }
H
Haojun Liao 已提交
993

D
dapan1121 已提交
994 995 996 997
  SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
}

int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
H
Hongze Cheng 已提交
998
  // SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
D
dapan1121 已提交
999 1000 1001
  if (NULL == schMgmt.queryMgmt) {
    SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, (void **)&schMgmt.queryMgmt, NULL));
  }
D
dapan1121 已提交
1002 1003

  SArray *explainRes = NULL;
D
dapan1121 已提交
1004
  int32_t code = 0;
H
Hongze Cheng 已提交
1005
  SQWMsg  qwMsg = {0};
D
dapan1121 已提交
1006 1007 1008 1009 1010
  qwMsg.msgInfo.taskType = TASK_TYPE_TEMP;
  qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
  qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask);
  qwMsg.msg = pTask->plan;
  qwMsg.msgType = pTask->plan->msgType;
D
dapan1121 已提交
1011 1012 1013
  qwMsg.connInfo.handle = pJob->conn.pTrans;

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
1014
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
D
dapan1121 已提交
1015
  }
H
Haojun Liao 已提交
1016

D
dapan1121 已提交
1017
  SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
dengyihao's avatar
dengyihao 已提交
1018
                                        pTask->execId, &qwMsg, explainRes));
D
dapan1121 已提交
1019

1020 1021
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_RET(schHandleExplainRes(explainRes));
D
dapan1121 已提交
1022
    explainRes = NULL;
1023 1024
  }

D
dapan1121 已提交
1025 1026 1027 1028 1029 1030 1031
  SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));

_return:

  taosArrayDestroy(explainRes);

  SCH_RET(code);
D
dapan1121 已提交
1032 1033
}

D
dapan1121 已提交
1034 1035
int32_t schLaunchTaskImpl(void *param) {
  SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
dengyihao's avatar
dengyihao 已提交
1036
  SSchJob     *pJob = schAcquireJob(pCtx->jobRid);
1037 1038
  if (NULL == pJob) {
    qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
dengyihao's avatar
dengyihao 已提交
1039
    taosMemoryFree(param);
1040 1041
    SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING);
  }
dengyihao's avatar
dengyihao 已提交
1042

D
dapan1121 已提交
1043
  SSchTask *pTask = pCtx->pTask;
H
Haojun Liao 已提交
1044 1045 1046 1047

  if (pCtx->asyncLaunch) {
    SCH_LOCK_TASK(pTask);
  }
H
Hongze Cheng 已提交
1048 1049 1050

  int8_t  status = 0;
  int32_t code = 0;
D
dapan1121 已提交
1051 1052 1053

  atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
  pTask->execId++;
D
dapan1121 已提交
1054
  pTask->retryTimes++;
D
dapan1121 已提交
1055
  pTask->waitRetry = false;
D
dapan1121 已提交
1056

H
Hongze Cheng 已提交
1057 1058
  SCH_TASK_DLOG("start to launch %s task, execId %d, retry %d",
                SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", pTask->execId, pTask->retryTimes);
D
dapan1121 已提交
1059 1060 1061 1062

  SCH_LOG_TASK_START_TS(pTask);

  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
1063
    SCH_TASK_DLOG("no need to launch task cause of job status %s", jobTaskStatusStr(status));
D
dapan1121 已提交
1064
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
1065 1066 1067 1068
  }

  // NOTE: race condition: the task should be put into the hash table before send msg to server
  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
D
dapan1121 已提交
1069
    SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
D
dapan1121 已提交
1070 1071 1072
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
  }

D
dapan1121 已提交
1073
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
D
dapan1121 已提交
1074 1075 1076
    SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask));
  } else {
    SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask));
D
dapan1121 已提交
1077 1078
  }

1079
#if 0
D
dapan1121 已提交
1080
  if (SCH_IS_QUERY_JOB(pJob)) {
D
dapan1121 已提交
1081
    SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
D
dapan1121 已提交
1082
  }
1083
#endif
D
dapan1121 已提交
1084

D
dapan1121 已提交
1085 1086
_return:

D
dapan1121 已提交
1087 1088 1089 1090 1091 1092 1093
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
    if (code) {
      code = schProcessOnTaskFailure(pJob, pTask, code);
    }
    if (code) {
      code = schHandleJobFailure(pJob, code);
    }
D
dapan1121 已提交
1094 1095
  }

H
Haojun Liao 已提交
1096 1097 1098 1099
  if (pCtx->asyncLaunch) {
    SCH_UNLOCK_TASK(pTask);
  }

1100 1101
  schReleaseJob(pJob->refId);

H
Haojun Liao 已提交
1102 1103
  taosMemoryFree(param);

D
dapan1121 已提交
1104 1105 1106
  SCH_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1107
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
1108 1109 1110 1111
  SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx));
  if (NULL == param) {
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
dengyihao's avatar
dengyihao 已提交
1112

1113
  param->jobRid = pJob->refId;
D
dapan1121 已提交
1114 1115
  param->pTask = pTask;

D
dapan1121 已提交
1116
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
H
Haojun Liao 已提交
1117
    param->asyncLaunch = true;
D
dapan1121 已提交
1118 1119 1120 1121
    taosAsyncExec(schLaunchTaskImpl, param, NULL);
  } else {
    SCH_ERR_RET(schLaunchTaskImpl(param));
  }
dengyihao's avatar
dengyihao 已提交
1122

D
dapan1121 已提交
1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136
  return TSDB_CODE_SUCCESS;
}

// Note: no more error processing, handled in function internal
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
  bool    enough = false;
  int32_t code = 0;

  SCH_SET_TASK_HANDLE(pTask, NULL);

  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));

    if (enough) {
D
dapan1121 已提交
1137
      SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
D
dapan1121 已提交
1138 1139
    }
  } else {
D
dapan1121 已提交
1140
    SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
D
dapan1121 已提交
1141 1142 1143 1144 1145 1146 1147 1148 1149
  }

  return TSDB_CODE_SUCCESS;

_return:

  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}

1150 1151 1152 1153 1154
void schHandleTimerEvent(void *param, void *tmrId) {
  SSchTimerParam *pTimerParam = (SSchTimerParam *)param;
  SSchTask       *pTask = NULL;
  SSchJob        *pJob = NULL;
  int32_t         code = 0;
dengyihao's avatar
dengyihao 已提交
1155

dengyihao's avatar
dengyihao 已提交
1156 1157 1158 1159 1160 1161
  int64_t  rId = pTimerParam->rId;
  uint64_t queryId = pTimerParam->queryId;
  uint64_t taskId = pTimerParam->taskId;
  taosMemoryFree(pTimerParam);

  if (schProcessOnCbBegin(&pJob, &pTask, queryId, rId, taskId)) {
1162 1163
    return;
  }
1164

1165
  code = schLaunchTask(pJob, pTask);
1166

1167
  schProcessOnCbEnd(pJob, pTask, code);
1168 1169 1170 1171 1172 1173
}

int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
  if (pTask->delayExecMs > 0) {
    SSchTimerParam *param = taosMemoryMalloc(sizeof(SSchTimerParam));
    if (NULL == param) {
dengyihao's avatar
dengyihao 已提交
1174
      SCH_TASK_ELOG("taosMemoryMalloc %d failed", (int)sizeof(SSchTimerParam));
S
Shengliang Guan 已提交
1175
      SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
1176
    }
dengyihao's avatar
dengyihao 已提交
1177

1178 1179 1180 1181
    param->rId = pJob->refId;
    param->queryId = pJob->queryId;
    param->taskId = pTask->taskId;

dengyihao's avatar
dengyihao 已提交
1182
    if (NULL == pTask->delayTimer) {
1183 1184
      pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer);
      if (NULL == pTask->delayTimer) {
1185
        SCH_TASK_ELOG("start delay timer failed, handle:%p", schMgmt.timer);
S
Shengliang Guan 已提交
1186
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
1187 1188 1189 1190 1191
      }

      return TSDB_CODE_SUCCESS;
    }

dengyihao's avatar
dengyihao 已提交
1192
    taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer, &pTask->delayTimer);
1193 1194 1195 1196 1197 1198 1199

    return TSDB_CODE_SUCCESS;
  }

  SCH_RET(schLaunchTask(pJob, pTask));
}

D
dapan1121 已提交
1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
  SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level));

  for (int32_t i = 0; i < level->taskNum; ++i) {
    SSchTask *pTask = taosArrayGet(level->subTasks, i);

    SCH_ERR_RET(schLaunchTask(pJob, pTask));
  }

  return TSDB_CODE_SUCCESS;
}

void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
D
dapan1121 已提交
1213
  if (!SCH_JOB_NEED_DROP(pJob)) {
D
dapan1121 已提交
1214 1215 1216 1217 1218 1219 1220
    return;
  }

  void *pIter = taosHashIterate(list, NULL);
  while (pIter) {
    SSchTask *pTask = *(SSchTask **)pIter;

1221 1222 1223 1224
    SCH_LOCK_TASK(pTask);
    if (pTask->delayTimer) {
      taosTmrStopA(&pTask->delayTimer);
    }
D
dapan1121 已提交
1225
    schDropTaskOnExecNode(pJob, pTask);
1226
    SCH_UNLOCK_TASK(pTask);
D
dapan1121 已提交
1227 1228 1229 1230 1231

    pIter = taosHashIterate(list, pIter);
  }
}

D
dapan1121 已提交
1232 1233 1234 1235 1236
int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
  SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask)));
}

int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
H
Hongze Cheng 已提交
1237
  void   *pRsp = NULL;
D
dapan1121 已提交
1238
  int32_t code = 0;
D
dapan1121 已提交
1239 1240 1241
  SArray *explainRes = NULL;

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
1242
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
D
dapan1121 已提交
1243 1244
  }

D
dapan1121 已提交
1245
  SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
dengyihao's avatar
dengyihao 已提交
1246
                                        pTask->execId, &pRsp, explainRes));
1247 1248 1249

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_RET(schHandleExplainRes(explainRes));
D
dapan1121 已提交
1250
    explainRes = NULL;
1251
  }
H
Haojun Liao 已提交
1252

D
dapan1121 已提交
1253 1254
  SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));

D
dapan1121 已提交
1255 1256 1257 1258 1259
_return:

  taosArrayDestroy(explainRes);

  SCH_RET(code);
D
dapan1121 已提交
1260 1261
}

D
dapan1121 已提交
1262 1263 1264 1265
// Note: no more error processing, handled in function internal
int32_t schLaunchFetchTask(SSchJob *pJob) {
  int32_t code = 0;

D
dapan1121 已提交
1266 1267 1268
  void *fetchRes = atomic_load_ptr(&pJob->fetchRes);
  if (fetchRes) {
    SCH_JOB_DLOG("res already fetched, res:%p", fetchRes);
D
dapan1121 已提交
1269 1270 1271
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1272 1273 1274 1275 1276
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) {
    SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask));
  } else {
    SCH_ERR_JRET(schExecRemoteFetch(pJob, pJob->fetchTask));
  }
D
dapan1121 已提交
1277 1278 1279 1280 1281 1282 1283

  return TSDB_CODE_SUCCESS;

_return:

  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
}