schTask.c 38.6 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalog.h"
#include "command.h"
#include "query.h"
H
Hongze Cheng 已提交
19
#include "qworker.h"
D
dapan1121 已提交
20
#include "schInt.h"
H
Hongze Cheng 已提交
21
#include "tglobal.h"
D
dapan1121 已提交
22 23 24 25 26 27
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"

void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
  schDeregisterTaskHb(pJob, pTask);
28

D
dapan1121 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
  if (pTask->candidateAddrs) {
    taosArrayDestroy(pTask->candidateAddrs);
  }

  taosMemoryFreeClear(pTask->msg);

  if (pTask->children) {
    taosArrayDestroy(pTask->children);
  }

  if (pTask->parents) {
    taosArrayDestroy(pTask->parents);
  }

  if (pTask->execNodes) {
    taosHashCleanup(pTask->execNodes);
  }
D
dapan1121 已提交
46

D
dapan1121 已提交
47 48 49
  taosArrayDestroy(pTask->profile.execTime);
}

D
dapan1121 已提交
50
void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) {
D
dapan1121 已提交
51
  if (SCH_IS_DATA_BIND_TASK(pTask) || (!SCH_IS_QUERY_JOB(pJob)) || (SCH_ALL != schMgmt.cfg.schPolicy)) {
D
dapan1121 已提交
52
    pTask->maxRetryTimes = SCH_DEFAULT_MAX_RETRY_NUM;
D
dapan1121 已提交
53 54
  } else {
    int32_t nodeNum = taosArrayGetSize(pJob->nodeList);
D
dapan1121 已提交
55
    pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM);
D
dapan1121 已提交
56
  }
dengyihao's avatar
dengyihao 已提交
57

D
dapan1121 已提交
58
  pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1);
D
dapan1121 已提交
59 60
}

D
dapan1121 已提交
61
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
D
dapan1121 已提交
62
  int32_t code = 0;
63

D
dapan1121 已提交
64 65 66
  pTask->plan = pPlan;
  pTask->level = pLevel;
  pTask->execId = -1;
67
  pTask->failedExecId = -2;
D
dapan1121 已提交
68 69
  pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
  pTask->taskId = schGenTaskId();
D
dapan1121 已提交
70

D
dapan1121 已提交
71
  schInitTaskRetryTimes(pJob, pTask, pLevel);
D
dapan1121 已提交
72

D
dapan1121 已提交
73 74
  pTask->execNodes =
      taosHashInit(pTask->maxExecTimes, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
D
dapan1121 已提交
75
  pTask->profile.execTime = taosArrayInit(pTask->maxExecTimes, sizeof(int64_t));
D
dapan1121 已提交
76
  if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
S
Shengliang Guan 已提交
77
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
78 79
  }

D
dapan1121 已提交
80 81
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);

D
dapan1121 已提交
82 83
  SCH_TASK_DLOG("task initialized, max times %d:%d", pTask->maxRetryTimes, pTask->maxExecTimes);

D
dapan1121 已提交
84
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
85 86 87

_return:

D
dapan1121 已提交
88
  taosArrayDestroy(pTask->profile.execTime);
D
dapan1121 已提交
89 90 91
  taosHashCleanup(pTask->execNodes);

  SCH_RET(code);
D
dapan1121 已提交
92 93 94
}

int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
95 96 97
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
98

D
dapan1121 已提交
99 100 101 102 103 104 105 106 107 108 109 110 111
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
  if (NULL == addr) {
    SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
                  (int32_t)taosArrayGetSize(pTask->candidateAddrs));
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

  pTask->succeedAddr = *addr;

  return TSDB_CODE_SUCCESS;
}

int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) {
D
dapan1121 已提交
112
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = SCH_GET_TASK_HANDLE(pTask)};
D
dapan1121 已提交
113 114 115

  if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) {
    SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno);
S
Shengliang Guan 已提交
116
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
117 118
  }

D
dapan1121 已提交
119
  SCH_TASK_DLOG("task execNode added, execId:%d, handle:%p", execId, nodeInfo.handle);
D
dapan1121 已提交
120 121 122 123 124 125 126 127 128 129

  return TSDB_CODE_SUCCESS;
}

int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
  if (NULL == pTask->execNodes) {
    return TSDB_CODE_SUCCESS;
  }

  if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) {
D
dapan1121 已提交
130
    SCH_TASK_DLOG("execId %d already not in execNodeList", execId);
D
dapan1121 已提交
131
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
132 133 134
  } else {
    SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
  }
135

D
dapan1121 已提交
136 137
  if ((execId != pTask->execId) || pTask->waitRetry) {  // ignore it
    SCH_TASK_DLOG("execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
D
dapan1121 已提交
138
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
139 140 141 142 143 144 145 146 147 148 149
  }

  return TSDB_CODE_SUCCESS;
}

int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
  if (taosHashGetSize(pTask->execNodes) <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
D
dapan1121 已提交
150
  if (NULL == nodeInfo) {  // ignore it
dengyihao's avatar
dengyihao 已提交
151 152
    SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId,
                  pTask->execId, pTask->waitRetry);
D
dapan1121 已提交
153 154 155
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
156 157 158 159 160 161 162 163 164 165 166
  nodeInfo->handle = handle;

  SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId);

  return TSDB_CODE_SUCCESS;
}

int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId) {
  if (dropExecNode) {
    SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
  }
H
Hongze Cheng 已提交
167

H
Haojun Liao 已提交
168
  schUpdateTaskExecNode(pJob, pTask, handle, execId);
D
dapan1121 已提交
169

170
  if ((execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) {  // ignore it
H
Hongze Cheng 已提交
171 172
    SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId,
                  pTask->execId, pTask->waitRetry);
H
Haojun Liao 已提交
173 174
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
  }
H
Hongze Cheng 已提交
175

D
dapan1121 已提交
176 177 178 179 180 181
  SCH_SET_TASK_HANDLE(pTask, handle);

  return TSDB_CODE_SUCCESS;
}

int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
D
dapan1121 已提交
182 183 184
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
    return TSDB_CODE_SCH_IGNORE_ERROR;
  }
185

186 187
  pTask->failedExecId = pTask->execId;

D
dapan1121 已提交
188 189 190
  int8_t jobStatus = 0;
  if (schJobNeedToStop(pJob, &jobStatus)) {
    SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(jobStatus));
D
dapan1121 已提交
191 192
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
  }
193

D
dapan1121 已提交
194 195 196
  int8_t taskStatus = SCH_GET_TASK_STATUS(pTask);
  if (taskStatus == JOB_TASK_STATUS_FAIL || taskStatus == JOB_TASK_STATUS_SUCC) {
    SCH_TASK_ELOG("task already done, status:%s", jobTaskStatusStr(taskStatus));
D
dapan1121 已提交
197 198
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
D
dapan1121 已提交
199 200 201 202 203 204 205 206 207 208 209 210 211

  if (errCode == TSDB_CODE_SCH_TIMEOUT_ERROR) {
    SCH_LOG_TASK_WAIT_TS(pTask);
  } else {
    SCH_LOG_TASK_END_TS(pTask);
  }

  bool    needRetry = false;
  bool    moved = false;
  int32_t taskDone = 0;

  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));

D
dapan1121 已提交
212
  SCH_ERR_RET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
D
dapan1121 已提交
213 214 215 216 217 218

  if (!needRetry) {
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));

    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAIL);

D
dapan1121 已提交
219
    if (SCH_JOB_NEED_WAIT(pJob)) {
D
dapan1121 已提交
220 221 222 223 224 225 226 227 228
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskFailed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);

      schUpdateJobErrCode(pJob, errCode);

      if (taskDone < pTask->level->taskNum) {
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
229
        SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
230
      }
D
dapan1121 已提交
231 232

      SCH_RET(atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
233 234
    }
  } else {
D
dapan1121 已提交
235
    SCH_ERR_RET(schHandleTaskRetry(pJob, pTask));
D
dapan1121 已提交
236 237 238 239

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
240
  SCH_RET(errCode);
D
dapan1121 已提交
241 242 243 244 245 246 247 248 249 250 251 252
}

int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
  bool    moved = false;
  int32_t code = 0;

  SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));

  SCH_LOG_TASK_END_TS(pTask);

  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC);

D
dapan1121 已提交
253
  SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask));
D
dapan1121 已提交
254

D
dapan1121 已提交
255
  SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
D
dapan1121 已提交
256 257 258 259

  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
  if (parentNum == 0) {
    int32_t taskDone = 0;
D
dapan1121 已提交
260
    if (SCH_JOB_NEED_WAIT(pJob)) {
D
dapan1121 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskSucceed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);

      if (taskDone < pTask->level->taskNum) {
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
        return TSDB_CODE_SUCCESS;
      } else if (taskDone > pTask->level->taskNum) {
        SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
      }

      if (pTask->level->taskFailed > 0) {
D
dapan1121 已提交
274
        SCH_RET(schHandleJobFailure(pJob, pJob->errCode));
D
dapan1121 已提交
275
      } else {
D
dapan1121 已提交
276
        SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
D
dapan1121 已提交
277 278 279 280 281 282 283
      }
    } else {
      pJob->resNode = pTask->succeedAddr;
    }

    pJob->fetchTask = pTask;

D
dapan1121 已提交
284
    SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
D
dapan1121 已提交
285 286 287 288 289 290 291 292 293 294 295 296 297 298
  }

  /*
    if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
      strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
      job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;

      ++job->dataSrcEps.numOfEps;
    }
  */

  for (int32_t i = 0; i < parentNum; ++i) {
    SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);

299
    SCH_LOCK(SCH_WRITE, &parent->planLock);
300 301 302 303 304 305 306
    SDownstreamSourceNode source = {
        .type = QUERY_NODE_DOWNSTREAM_SOURCE,
        .taskId = pTask->taskId,
        .schedId = schMgmt.sId,
        .execId = pTask->execId,
        .addr = pTask->succeedAddr,
        .fetchMsgType = SCH_FETCH_TYPE(pTask),
D
dapan1121 已提交
307
        .localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask),
308
    };
D
dapan1121 已提交
309
    qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
310
    SCH_UNLOCK(SCH_WRITE, &parent->planLock);
D
dapan1121 已提交
311

X
Xiaoyu Wang 已提交
312 313
    int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);

D
dapan1121 已提交
314 315 316 317 318 319 320 321 322 323 324 325
    if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
      SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId);
      SCH_ERR_RET(schLaunchTask(pJob, parent));
    }
  }

  SCH_ERR_RET(schLaunchJobLowerLevel(pJob, pTask));

  return TSDB_CODE_SUCCESS;
}

int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
326 327 328
  if (!schMgmt.cfg.enableReSchedule) {
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
329

D
dapan1121 已提交
330
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
D
dapan1121 已提交
331 332 333
    return TSDB_CODE_SUCCESS;
  }

334 335
  if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && pJob->fetchTask != pTask &&
      taosArrayGetSize(pTask->candidateAddrs) > 1) {
D
dapan1121 已提交
336 337 338
    SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId);
    schDropTaskOnExecNode(pJob, pTask);
    taosHashClear(pTask->execNodes);
339

D
dapan1121 已提交
340
    SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR));
D
dapan1121 已提交
341 342 343 344 345
  }

  return TSDB_CODE_SUCCESS;
}

346
int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode) {
347 348 349 350 351
  SSchRedirectCtx *pCtx = &pTask->redirectCtx;
  if (!pCtx->inRedirect) {
    pCtx->inRedirect = true;
    pCtx->periodMs = tsRedirectPeriod;
    pCtx->startTs = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
352

353 354 355 356 357 358 359 360 361 362 363 364 365
    if (SCH_IS_DATA_BIND_TASK(pTask)) {
      if (pEpSet) {
        pCtx->roundTotal = pEpSet->numOfEps;
      } else {
        SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
        pCtx->roundTotal = pAddr->epSet.numOfEps;
      }
    } else {
      pCtx->roundTotal = 1;
    }

    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
366

367
  pCtx->totalTimes++;
D
dapan1121 已提交
368
  pCtx->roundTimes++;
369 370 371 372 373 374 375

  if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) {
    pCtx->roundTotal = pEpSet->numOfEps;
  }


  if (pCtx->roundTimes >= pCtx->roundTotal) {
376 377 378
    int64_t nowTs = taosGetTimestampMs();
    int64_t lastTime = nowTs - pCtx->startTs;
    if (lastTime > tsMaxRetryWaitTime) {
dengyihao's avatar
dengyihao 已提交
379 380
      SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d",
                    nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes);
D
dapan1121 已提交
381 382
      pJob->noMoreRetry = true;                    
      SCH_ERR_RET(SCH_GET_REDIRECT_CODE(pJob, rspCode));
383 384
    }

385 386 387 388 389
    pCtx->periodMs *= tsRedirectFactor;
    if (pCtx->periodMs > tsRedirectMaxPeriod) {
      pCtx->periodMs = tsRedirectMaxPeriod;
    }

390 391
    int64_t leftTime = tsMaxRetryWaitTime - lastTime;
    pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs;
392

D
dapan1121 已提交
393 394
    pCtx->roundTimes = 0;

395 396 397 398 399 400 401
    goto _return;
  }

  pTask->delayExecMs = 0;

_return:

dengyihao's avatar
dengyihao 已提交
402 403 404
  SCH_TASK_DLOG("task start %d/%d/%d redirect retry, delayExec:%d", pCtx->roundTimes, pCtx->roundTotal,
                pCtx->totalTimes, pTask->delayExecMs);

405 406 407
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
408
void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
409
  pTask->waitRetry = true;
dengyihao's avatar
dengyihao 已提交
410

D
dapan1121 已提交
411
  schDropTaskOnExecNode(pJob, pTask);
D
dapan1121 已提交
412 413 414
  if (pTask->delayTimer) {
    taosTmrStopA(&pTask->delayTimer);
  }
D
dapan1121 已提交
415
  taosHashClear(pTask->execNodes);
H
Haojun Liao 已提交
416
  schRemoveTaskFromExecList(pJob, pTask);
D
dapan1121 已提交
417 418 419 420
  schDeregisterTaskHb(pJob, pTask);
  taosMemoryFreeClear(pTask->msg);
  pTask->msgLen = 0;
  pTask->lastMsgType = 0;
D
dapan1121 已提交
421
  pTask->childReady = 0;      
D
dapan1121 已提交
422
  memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
D
dapan1121 已提交
423 424 425 426 427 428 429 430
}

int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
  int32_t code = 0;

  SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode));

  if (!NO_RET_REDIRECT_ERROR(rspCode)) {
D
dapan1121 已提交
431
    SCH_UPDATE_REDIRECT_CODE(pJob, rspCode);
D
dapan1121 已提交
432 433 434 435 436
  }

  SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode));

  schResetTaskForRetry(pJob, pTask);
D
dapan1121 已提交
437

D
dapan1121 已提交
438
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
439
    if (pData && pData->pEpSet) {
D
dapan1121 已提交
440
      SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
D
dapan1121 已提交
441
    } else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) {
442
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
dengyihao's avatar
dengyihao 已提交
443
      SEp            *pEp = &addr->epSet.eps[addr->epSet.inUse];
D
dapan1121 已提交
444 445
      SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse,
                    addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode));
D
dapan1121 已提交
446 447 448 449
    } else {
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
      SCH_SWITCH_EPSET(addr);
      SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
D
dapan1121 已提交
450
    }
451

D
dapan1121 已提交
452
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
453

454
    SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask));
D
dapan1121 已提交
455 456 457 458 459

    return TSDB_CODE_SUCCESS;
  }

  // merge plan
460

D
dapan1121 已提交
461
  pTask->childReady = 0;
462

D
dapan1121 已提交
463 464
  qClearSubplanExecutionNode(pTask->plan);

D
dapan1121 已提交
465 466 467 468 469
  // Note: current error task and upper level merge task
  if ((pData && 0 == pData->len) || NULL == pData) {
    SCH_ERR_JRET(schSwitchTaskCandidateAddr(pJob, pTask));
  }

D
dapan1121 已提交
470
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
471

D
dapan1121 已提交
472 473
  int32_t childrenNum = taosArrayGetSize(pTask->children);
  for (int32_t i = 0; i < childrenNum; ++i) {
474
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
D
dapan1121 已提交
475 476 477 478 479 480 481 482 483
    SCH_LOCK_TASK(pChild);
    schDoTaskRedirect(pJob, pChild, NULL, rspCode);
    SCH_UNLOCK_TASK(pChild);
  }

  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
484
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
485 486
}

D
dapan1121 已提交
487
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
D
dapan1121 已提交
488 489
  int32_t code = 0;

D
dapan1121 已提交
490
  SCH_ERR_JRET(schChkResetJobRetry(pJob, rspCode));
D
dapan1121 已提交
491

492
  if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) {
D
dapan1121 已提交
493
    if (NULL == pData->pEpSet) {
494 495
      SCH_TASK_ELOG("epset updating excepted, error:%s", tstrerror(rspCode));
      code = TSDB_CODE_INVALID_MSG;
D
dapan1121 已提交
496
      goto _return;
D
dapan1121 已提交
497 498 499
    }
  }

D
dapan1121 已提交
500 501 502 503 504 505 506 507 508 509 510
  SCH_TASK_DLOG("start to redirect current task set cause of error: %s", tstrerror(rspCode));

  for (int32_t i = 0; i < pJob->levelNum; ++i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);

    pLevel->taskExecDoneNum = 0;
    pLevel->taskLaunchedNum = 0;
  }
  
  SCH_RESET_JOB_LEVEL_IDX(pJob);
  
D
dapan1121 已提交
511
  code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
D
dapan1121 已提交
512
  
513 514
  taosMemoryFreeClear(pData->pData);
  taosMemoryFreeClear(pData->pEpSet);
D
dapan1121 已提交
515 516

  SCH_RET(code);
D
dapan1121 已提交
517 518 519

_return:

520 521
  taosMemoryFreeClear(pData->pData);
  taosMemoryFreeClear(pData->pEpSet);
D
dapan1121 已提交
522

D
dapan1121 已提交
523
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
524 525 526 527 528 529 530 531 532 533 534
}

int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      SCH_TASK_ELOG("task already in execTask list, code:%x", code);
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
S
Shengliang Guan 已提交
535
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559
  }

  SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));

  return TSDB_CODE_SUCCESS;
}

/*
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
  } else {
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
  }

  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno);
S
Shengliang Guan 已提交
560
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));

  return TSDB_CODE_SUCCESS;
}

int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  *moved = false;

  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
  }

  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;

      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno);
S
Shengliang Guan 已提交
587
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));

  return TSDB_CODE_SUCCESS;
}

int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
  }

  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;

      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }

    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
S
Shengliang Guan 已提交
612
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
613 614 615 616 617 618 619 620 621 622 623
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));

  return TSDB_CODE_SUCCESS;
}
*/

int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
D
dapan1121 已提交
624 625 626 627 628 629 630
  if (pJob->noMoreRetry) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry since job no more retry, retryTimes:%d/%d", pTask->retryTimes,
                  pTask->maxRetryTimes);
    return TSDB_CODE_SUCCESS;
  }
  
D
dapan1121 已提交
631 632
  if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
    pTask->maxExecTimes++;
D
dapan1121 已提交
633
    pTask->maxRetryTimes++;
D
dapan1121 已提交
634 635 636 637 638 639 640 641
    if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) {
      pTask->timeoutUsec *= 2;
      if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) {
        pTask->timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC;
      }
    }
  }

D
dapan1121 已提交
642 643
  if ((pTask->retryTimes + 1) > pTask->maxRetryTimes) {
    *needRetry = false;
dengyihao's avatar
dengyihao 已提交
644 645
    SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d", pTask->retryTimes,
                  pTask->maxRetryTimes);
D
dapan1121 已提交
646 647 648
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
649 650
  if ((pTask->execId + 1) >= pTask->maxExecTimes) {
    *needRetry = false;
D
dapan1121 已提交
651
    SCH_TASK_DLOG("task no more retry since reach max exec times, execId:%d/%d", pTask->execId, pTask->maxExecTimes);
D
dapan1121 已提交
652 653 654
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
655
  if (!SCH_TASK_NEED_RETRY(pTask->lastMsgType, errCode)) {
D
dapan1121 已提交
656 657 658 659 660
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
661 662 663 664 665 666 667 668 669 670
  /*
    if (SCH_IS_DATA_BIND_TASK(pTask)) {
      if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
        *needRetry = false;
        SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId,
                      SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
        return TSDB_CODE_SUCCESS;
      }
    } else {
      int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
D
dapan1121 已提交
671

dengyihao's avatar
dengyihao 已提交
672 673 674 675 676 677
      if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) {
        *needRetry = false;
        SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
                      pTask->candidateIdx, candidateNum);
        return TSDB_CODE_SUCCESS;
      }
D
dapan1121 已提交
678
    }
dengyihao's avatar
dengyihao 已提交
679
  */
D
dapan1121 已提交
680 681 682 683 684 685 686 687 688 689

  *needRetry = true;
  SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execId + 1, errCode, tstrerror(errCode));

  return TSDB_CODE_SUCCESS;
}

int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
  atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);

H
Haojun Liao 已提交
690
  schRemoveTaskFromExecList(pJob, pTask);
D
dapan1121 已提交
691
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
692

D
dapan1121 已提交
693 694 695 696 697 698
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
  }

  schDeregisterTaskHb(pJob, pTask);

D
dapan1121 已提交
699
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
700 701
    SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
    SCH_SWITCH_EPSET(addr);
D
dapan1121 已提交
702
  } else {
D
dapan1121 已提交
703
    SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
D
dapan1121 已提交
704 705 706 707 708 709 710 711 712 713
  }

  SCH_ERR_RET(schLaunchTask(pJob, pTask));

  return TSDB_CODE_SUCCESS;
}

int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
  int32_t addNum = 0;
  int32_t nodeNum = 0;
714

D
dapan1121 已提交
715 716 717
  if (pJob->nodeList) {
    nodeNum = taosArrayGetSize(pJob->nodeList);

D
dapan1121 已提交
718
    for (int32_t i = 0; i < nodeNum; ++i) {
D
dapan1121 已提交
719 720
      SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
      SQueryNodeAddr *naddr = &nload->addr;
721

D
dapan1121 已提交
722 723
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
S
Shengliang Guan 已提交
724
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
725 726
      }

dengyihao's avatar
dengyihao 已提交
727 728 729
      SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId,
                    naddr->epSet.inUse, naddr->epSet.numOfEps, SCH_GET_CUR_EP(naddr)->fqdn,
                    SCH_GET_CUR_EP(naddr)->port);
D
dapan1121 已提交
730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748

      ++addNum;
    }
  }

  if (addNum <= 0) {
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
    SCH_ERR_RET(TSDB_CODE_TSC_NO_EXEC_NODE);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
  if (NULL != pTask->candidateAddrs) {
    return TSDB_CODE_SUCCESS;
  }

  pTask->candidateIdx = 0;
D
dapan1121 已提交
749
  pTask->candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr));
D
dapan1121 已提交
750
  if (NULL == pTask->candidateAddrs) {
D
dapan1121 已提交
751
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCHEDULE_DEFAULT_MAX_NODE_NUM);
S
Shengliang Guan 已提交
752
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
753 754 755 756 757
  }

  if (pTask->plan->execNode.epSet.numOfEps > 0) {
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
S
Shengliang Guan 已提交
758
      SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
759 760 761 762 763 764 765
    }

    SCH_TASK_DLOG("use execNode in plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
766
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
D
dapan1121 已提交
767
    SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
S
Shengliang Guan 已提交
768
    SCH_ERR_RET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
769 770
  }

D
dapan1121 已提交
771 772 773 774 775 776 777 778 779 780 781 782 783 784
  SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));

  /*
    for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
      strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
      epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];

      ++epSet->numOfEps;
    }
  */

  return TSDB_CODE_SUCCESS;
}

785
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
D
dapan1121 已提交
786
  if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
787 788
    SCH_TASK_ELOG("not able to update cndidate addr, addr num %d",
                  (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0));
D
dapan1121 已提交
789 790 791
    SCH_ERR_RET(TSDB_CODE_APP_ERROR);
  }

792
  SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
D
dapan1121 已提交
793

794 795
  char *origEpset = schDumpEpSet(&pAddr->epSet);
  char *newEpset = schDumpEpSet(pEpSet);
D
dapan1121 已提交
796

797
  SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset);
D
dapan1121 已提交
798

dengyihao's avatar
dengyihao 已提交
799 800 801
  taosMemoryFree(origEpset);
  taosMemoryFree(newEpset);

D
dapan1121 已提交
802 803 804 805 806
  memcpy(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
807
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
808
  int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
D
dapan1121 已提交
809 810 811
  if (candidateNum <= 1) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
812

D
dapan1121 已提交
813 814
  switch (schMgmt.cfg.schPolicy) {
    case SCH_LOAD_SEQ:
dengyihao's avatar
dengyihao 已提交
815
    case SCH_ALL:
D
dapan1121 已提交
816 817 818 819 820 821 822 823 824 825 826 827
    default:
      if (++pTask->candidateIdx >= candidateNum) {
        pTask->candidateIdx = 0;
      }
      break;
    case SCH_RANDOM: {
      int32_t lastIdx = pTask->candidateIdx;
      while (lastIdx == pTask->candidateIdx) {
        pTask->candidateIdx = taosRand() % candidateNum;
      }
      break;
    }
D
dapan1121 已提交
828
  }
D
dapan1121 已提交
829 830 831 832

_return:

  SCH_TASK_DLOG("switch task candiateIdx to %d/%d", pTask->candidateIdx, candidateNum);
dengyihao's avatar
dengyihao 已提交
833

D
dapan1121 已提交
834 835 836
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
837 838 839
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
  int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
  if (code) {
H
Haojun Liao 已提交
840
    SCH_TASK_WLOG("task already not in execTask list, code:%x", code);
D
dapan1121 已提交
841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858
  }

  return TSDB_CODE_SUCCESS;
}

void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
  if (NULL == pTask->execNodes) {
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
    return;
  }

  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);

  if (size <= 0) {
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
    return;
  }

dengyihao's avatar
dengyihao 已提交
859
  int32_t       i = 0;
D
dapan1121 已提交
860 861
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
  while (nodeInfo) {
D
dapan1121 已提交
862 863 864 865 866 867 868
    if (nodeInfo->handle) {
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
      schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK);
      SCH_TASK_DLOG("start to drop task's %dth execNode", i);
    } else {
      SCH_TASK_DLOG("no need to drop task %dth execNode", i);
    }
D
dapan1121 已提交
869

D
dapan1121 已提交
870
    ++i;
D
dapan1121 已提交
871 872 873 874 875 876
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
  }

  SCH_TASK_DLOG("task has been dropped on %d exec nodes", size);
}

877 878
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
  int32_t   taskNum = (int32_t)taosArrayGetSize(pStatusList);
D
dapan1121 已提交
879
  SSchTask *pTask = NULL;
880
  SSchJob  *pJob = NULL;
D
dapan1121 已提交
881

882 883
  qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn,
         pEpId->ep.port);
D
dapan1121 已提交
884 885

  for (int32_t i = 0; i < taskNum; ++i) {
D
dapan1121 已提交
886
    STaskStatus *pStatus = taosArrayGet(pStatusList, i);
887 888 889 890
    int32_t      code = 0;

    qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId, pStatus->taskId,
           pStatus->execId, jobTaskStatusStr(pStatus->status));
D
dapan1121 已提交
891

D
dapan1121 已提交
892
    if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) {
D
dapan1121 已提交
893 894 895
      continue;
    }

D
dapan1121 已提交
896
    if (pStatus->execId != pTask->execId) {
897
      // TODO
D
dapan1121 已提交
898 899
      SCH_TASK_DLOG("execId %d mis-match current execId %d", pStatus->execId, pTask->execId);
      schProcessOnCbEnd(pJob, pTask, 0);
D
dapan1121 已提交
900 901
      continue;
    }
902

D
dapan1121 已提交
903
    if (pStatus->status == JOB_TASK_STATUS_FAIL) {
D
dapan1121 已提交
904
      // RECORD AND HANDLE ERROR!!!!
D
dapan1121 已提交
905
      schProcessOnCbEnd(pJob, pTask, 0);
D
dapan1121 已提交
906 907 908
      continue;
    }

D
dapan1121 已提交
909 910
    if (pStatus->status == JOB_TASK_STATUS_INIT) {
      code = schRescheduleTask(pJob, pTask);
D
dapan1121 已提交
911 912
    }

D
dapan1121 已提交
913
    schProcessOnCbEnd(pJob, pTask, code);
D
dapan1121 已提交
914 915 916 917 918
  }

  return TSDB_CODE_SUCCESS;
}

919 920 921 922 923 924 925
int32_t schHandleExplainRes(SArray *pExplainRes) {
  int32_t code = 0;
  int32_t resNum = taosArrayGetSize(pExplainRes);
  if (resNum <= 0) {
    goto _return;
  }

H
Hongze Cheng 已提交
926 927
  SSchTask *pTask = NULL;
  SSchJob  *pJob = NULL;
928 929

  for (int32_t i = 0; i < resNum; ++i) {
H
Hongze Cheng 已提交
930
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
931

D
dapan1121 已提交
932
    qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId);
933

D
dapan1121 已提交
934 935
    pJob = schAcquireJob(localRsp->rId);
    if (NULL == pJob) {
H
Hongze Cheng 已提交
936 937
      qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId,
            localRsp->tId, localRsp->rId);
D
dapan1121 已提交
938 939
      SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
    }
940

D
dapan1121 已提交
941 942 943 944 945
    int8_t status = 0;
    if (schJobNeedToStop(pJob, &status)) {
      SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
      schReleaseJob(pJob->refId);
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
946
    }
H
Haojun Liao 已提交
947

D
dapan1121 已提交
948 949 950 951
    code = schGetTaskInJob(pJob, localRsp->tId, &pTask);

    if (TSDB_CODE_SUCCESS == code) {
      code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp);
952
    }
H
Haojun Liao 已提交
953

D
dapan1121 已提交
954
    schReleaseJob(pJob->refId);
955

H
Hongze Cheng 已提交
956 957
    qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId,
           localRsp->tId, code);
958 959 960 961 962

    SCH_ERR_JRET(code);

    localRsp->rsp.numOfPlans = 0;
    localRsp->rsp.subplanInfo = NULL;
D
dapan1121 已提交
963 964
    pTask = NULL;
    pJob = NULL;
965 966 967 968 969
  }

_return:

  for (int32_t i = 0; i < resNum; ++i) {
H
Hongze Cheng 已提交
970
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
971 972 973 974 975 976 977 978
    tFreeSExplainRsp(&localRsp->rsp);
  }

  taosArrayDestroy(pExplainRes);

  SCH_RET(code);
}

D
dapan1121 已提交
979 980
int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
  SSubplan *plan = pTask->plan;
H
Hongze Cheng 已提交
981
  int32_t   code = 0;
D
dapan1121 已提交
982 983

  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
984
    code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
D
dapan1121 已提交
985 986 987 988
    if (TSDB_CODE_SUCCESS != code) {
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
                    pTask->msgLen);
      SCH_ERR_RET(code);
989
    } else if (tsQueryPlannerTrace) {
H
Hongze Cheng 已提交
990
      char   *msg = NULL;
991
      int32_t msgLen = 0;
D
dapan1121 已提交
992
      SCH_ERR_RET(qSubPlanToString(plan, &msg, &msgLen));
993 994
      SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
      taosMemoryFree(msg);
D
dapan1121 已提交
995 996 997 998 999 1000
    }
  }

  SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));

  if (SCH_IS_QUERY_JOB(pJob)) {
1001
    SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
D
dapan1121 已提交
1002
  }
H
Haojun Liao 已提交
1003

D
dapan1121 已提交
1004 1005 1006 1007
  SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
}

int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
H
Hongze Cheng 已提交
1008
  // SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
D
dapan1121 已提交
1009 1010 1011
  if (NULL == schMgmt.queryMgmt) {
    SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, (void **)&schMgmt.queryMgmt, NULL));
  }
D
dapan1121 已提交
1012 1013

  SArray *explainRes = NULL;
D
dapan1121 已提交
1014
  int32_t code = 0;
H
Hongze Cheng 已提交
1015
  SQWMsg  qwMsg = {0};
D
dapan1121 已提交
1016 1017 1018 1019 1020
  qwMsg.msgInfo.taskType = TASK_TYPE_TEMP;
  qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
  qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask);
  qwMsg.msg = pTask->plan;
  qwMsg.msgType = pTask->plan->msgType;
D
dapan1121 已提交
1021 1022 1023
  qwMsg.connInfo.handle = pJob->conn.pTrans;

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
1024
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
D
dapan1121 已提交
1025
  }
H
Haojun Liao 已提交
1026

D
dapan1121 已提交
1027
  SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
dengyihao's avatar
dengyihao 已提交
1028
                                        pTask->execId, &qwMsg, explainRes));
D
dapan1121 已提交
1029

1030 1031
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_RET(schHandleExplainRes(explainRes));
D
dapan1121 已提交
1032
    explainRes = NULL;
1033 1034
  }

D
dapan1121 已提交
1035 1036 1037 1038 1039 1040 1041
  SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));

_return:

  taosArrayDestroy(explainRes);

  SCH_RET(code);
D
dapan1121 已提交
1042 1043
}

D
dapan1121 已提交
1044 1045
int32_t schLaunchTaskImpl(void *param) {
  SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
dengyihao's avatar
dengyihao 已提交
1046
  SSchJob     *pJob = schAcquireJob(pCtx->jobRid);
1047 1048
  if (NULL == pJob) {
    qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
dengyihao's avatar
dengyihao 已提交
1049
    taosMemoryFree(param);
1050 1051
    SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING);
  }
dengyihao's avatar
dengyihao 已提交
1052

D
dapan1121 已提交
1053
  SSchTask *pTask = pCtx->pTask;
H
Haojun Liao 已提交
1054 1055 1056 1057

  if (pCtx->asyncLaunch) {
    SCH_LOCK_TASK(pTask);
  }
H
Hongze Cheng 已提交
1058 1059 1060

  int8_t  status = 0;
  int32_t code = 0;
D
dapan1121 已提交
1061 1062 1063

  atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
  pTask->execId++;
D
dapan1121 已提交
1064
  pTask->retryTimes++;
D
dapan1121 已提交
1065
  pTask->waitRetry = false;
D
dapan1121 已提交
1066

H
Hongze Cheng 已提交
1067 1068
  SCH_TASK_DLOG("start to launch %s task, execId %d, retry %d",
                SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", pTask->execId, pTask->retryTimes);
D
dapan1121 已提交
1069 1070 1071 1072

  SCH_LOG_TASK_START_TS(pTask);

  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
1073
    SCH_TASK_DLOG("no need to launch task cause of job status %s", jobTaskStatusStr(status));
D
dapan1121 已提交
1074
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
D
dapan1121 已提交
1075 1076 1077
  }

  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
D
dapan1121 已提交
1078
    SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
D
dapan1121 已提交
1079 1080 1081
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
  }

D
dapan1121 已提交
1082
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
D
dapan1121 已提交
1083 1084 1085
    SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask));
  } else {
    SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask));
D
dapan1121 已提交
1086 1087
  }

1088
#if 0
D
dapan1121 已提交
1089
  if (SCH_IS_QUERY_JOB(pJob)) {
D
dapan1121 已提交
1090
    SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
D
dapan1121 已提交
1091
  }
1092
#endif
D
dapan1121 已提交
1093

D
dapan1121 已提交
1094 1095
_return:

D
dapan1121 已提交
1096 1097 1098 1099 1100 1101 1102
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
    if (code) {
      code = schProcessOnTaskFailure(pJob, pTask, code);
    }
    if (code) {
      code = schHandleJobFailure(pJob, code);
    }
D
dapan1121 已提交
1103 1104
  }

H
Haojun Liao 已提交
1105 1106 1107 1108
  if (pCtx->asyncLaunch) {
    SCH_UNLOCK_TASK(pTask);
  }

1109 1110
  schReleaseJob(pJob->refId);

H
Haojun Liao 已提交
1111 1112
  taosMemoryFree(param);

D
dapan1121 已提交
1113 1114 1115
  SCH_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1116
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
1117 1118 1119 1120
  SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx));
  if (NULL == param) {
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
dengyihao's avatar
dengyihao 已提交
1121

1122
  param->jobRid = pJob->refId;
D
dapan1121 已提交
1123 1124
  param->pTask = pTask;

D
dapan1121 已提交
1125
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
H
Haojun Liao 已提交
1126
    param->asyncLaunch = true;
D
dapan1121 已提交
1127 1128 1129 1130
    taosAsyncExec(schLaunchTaskImpl, param, NULL);
  } else {
    SCH_ERR_RET(schLaunchTaskImpl(param));
  }
dengyihao's avatar
dengyihao 已提交
1131

D
dapan1121 已提交
1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145
  return TSDB_CODE_SUCCESS;
}

// Note: no more error processing, handled in function internal
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
  bool    enough = false;
  int32_t code = 0;

  SCH_SET_TASK_HANDLE(pTask, NULL);

  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));

    if (enough) {
D
dapan1121 已提交
1146
      SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
D
dapan1121 已提交
1147 1148
    }
  } else {
D
dapan1121 已提交
1149
    SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
D
dapan1121 已提交
1150 1151 1152 1153 1154 1155 1156 1157 1158
  }

  return TSDB_CODE_SUCCESS;

_return:

  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}

1159 1160 1161 1162 1163
void schHandleTimerEvent(void *param, void *tmrId) {
  SSchTimerParam *pTimerParam = (SSchTimerParam *)param;
  SSchTask       *pTask = NULL;
  SSchJob        *pJob = NULL;
  int32_t         code = 0;
dengyihao's avatar
dengyihao 已提交
1164

dengyihao's avatar
dengyihao 已提交
1165 1166 1167 1168 1169 1170
  int64_t  rId = pTimerParam->rId;
  uint64_t queryId = pTimerParam->queryId;
  uint64_t taskId = pTimerParam->taskId;
  taosMemoryFree(pTimerParam);

  if (schProcessOnCbBegin(&pJob, &pTask, queryId, rId, taskId)) {
1171 1172
    return;
  }
1173

1174
  code = schLaunchTask(pJob, pTask);
1175

1176
  schProcessOnCbEnd(pJob, pTask, code);
1177 1178 1179 1180 1181 1182
}

int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
  if (pTask->delayExecMs > 0) {
    SSchTimerParam *param = taosMemoryMalloc(sizeof(SSchTimerParam));
    if (NULL == param) {
dengyihao's avatar
dengyihao 已提交
1183
      SCH_TASK_ELOG("taosMemoryMalloc %d failed", (int)sizeof(SSchTimerParam));
S
Shengliang Guan 已提交
1184
      SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
1185
    }
dengyihao's avatar
dengyihao 已提交
1186

1187 1188 1189 1190
    param->rId = pJob->refId;
    param->queryId = pJob->queryId;
    param->taskId = pTask->taskId;

dengyihao's avatar
dengyihao 已提交
1191
    if (NULL == pTask->delayTimer) {
1192 1193
      pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer);
      if (NULL == pTask->delayTimer) {
1194
        SCH_TASK_ELOG("start delay timer failed, handle:%p", schMgmt.timer);
S
Shengliang Guan 已提交
1195
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
1196 1197 1198 1199 1200
      }

      return TSDB_CODE_SUCCESS;
    }

dengyihao's avatar
dengyihao 已提交
1201
    taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer, &pTask->delayTimer);
1202 1203 1204 1205 1206 1207 1208

    return TSDB_CODE_SUCCESS;
  }

  SCH_RET(schLaunchTask(pJob, pTask));
}

D
dapan1121 已提交
1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
  SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level));

  for (int32_t i = 0; i < level->taskNum; ++i) {
    SSchTask *pTask = taosArrayGet(level->subTasks, i);

    SCH_ERR_RET(schLaunchTask(pJob, pTask));
  }

  return TSDB_CODE_SUCCESS;
}

void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
D
dapan1121 已提交
1222
  if (!SCH_JOB_NEED_DROP(pJob)) {
D
dapan1121 已提交
1223 1224 1225 1226 1227 1228 1229
    return;
  }

  void *pIter = taosHashIterate(list, NULL);
  while (pIter) {
    SSchTask *pTask = *(SSchTask **)pIter;

1230 1231 1232 1233
    SCH_LOCK_TASK(pTask);
    if (pTask->delayTimer) {
      taosTmrStopA(&pTask->delayTimer);
    }
D
dapan1121 已提交
1234
    schDropTaskOnExecNode(pJob, pTask);
1235
    SCH_UNLOCK_TASK(pTask);
D
dapan1121 已提交
1236 1237 1238 1239 1240

    pIter = taosHashIterate(list, pIter);
  }
}

D
dapan1121 已提交
1241 1242 1243 1244 1245
int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
  SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask)));
}

int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
H
Hongze Cheng 已提交
1246
  void   *pRsp = NULL;
D
dapan1121 已提交
1247
  int32_t code = 0;
D
dapan1121 已提交
1248 1249 1250
  SArray *explainRes = NULL;

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
1251
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
D
dapan1121 已提交
1252 1253
  }

D
dapan1121 已提交
1254
  SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
dengyihao's avatar
dengyihao 已提交
1255
                                        pTask->execId, &pRsp, explainRes));
1256 1257 1258

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_RET(schHandleExplainRes(explainRes));
D
dapan1121 已提交
1259
    explainRes = NULL;
1260
  }
H
Haojun Liao 已提交
1261

D
dapan1121 已提交
1262 1263
  SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));

D
dapan1121 已提交
1264 1265 1266 1267 1268
_return:

  taosArrayDestroy(explainRes);

  SCH_RET(code);
D
dapan1121 已提交
1269 1270
}

D
dapan1121 已提交
1271 1272 1273 1274
// Note: no more error processing, handled in function internal
int32_t schLaunchFetchTask(SSchJob *pJob) {
  int32_t code = 0;

D
dapan1121 已提交
1275 1276 1277
  void *fetchRes = atomic_load_ptr(&pJob->fetchRes);
  if (fetchRes) {
    SCH_JOB_DLOG("res already fetched, res:%p", fetchRes);
D
dapan1121 已提交
1278 1279 1280
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1281 1282
  SCH_SET_TASK_STATUS(pJob->fetchTask, JOB_TASK_STATUS_FETCH);

D
dapan1121 已提交
1283 1284 1285 1286 1287
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) {
    SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask));
  } else {
    SCH_ERR_JRET(schExecRemoteFetch(pJob, pJob->fetchTask));
  }
D
dapan1121 已提交
1288 1289 1290 1291 1292 1293 1294

  return TSDB_CODE_SUCCESS;

_return:

  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
}