scheduler.c 50.7 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15
 */

D
dapan1121 已提交
16
#include "schedulerInt.h"
H
Hongze Cheng 已提交
17
#include "tmsg.h"
18
#include "query.h"
D
dapan 已提交
19
#include "catalog.h"
20

21
static SSchedulerMgmt schMgmt = {0};
D
dapan1121 已提交
22

D
dapan1121 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
uint64_t schGenTaskId(void) {
  return atomic_add_fetch_64(&schMgmt.taskId, 1);
}

uint64_t schGenUUID(void) {
  static uint64_t hashId = 0;
  static int32_t requestSerialId = 0;

  if (hashId == 0) {
    char    uid[64];
    int32_t code = taosGetSystemUUID(uid, tListLen(uid));
    if (code != TSDB_CODE_SUCCESS) {
      qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    } else {
      hashId = MurmurHash3_32(uid, strlen(uid));
    }
  }

  int64_t ts      = taosGetTimestampMs();
  uint64_t pid    = taosGetPId();
  int32_t val     = atomic_add_fetch_32(&requestSerialId, 1);

  uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
  return id;
}

D
dapan1121 已提交
49 50 51 52 53

int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) {
  pTask->plan   = pPlan;
  pTask->level  = pLevel;
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
D
dapan1121 已提交
54
  pTask->taskId = schGenTaskId();
55
  pTask->execAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
D
dapan1121 已提交
56
  if (NULL == pTask->execAddrs) {
57
    SCH_TASK_ELOG("taosArrayInit %d exec addrs failed", SCH_MAX_CANDIDATE_EP_NUM);
D
dapan1121 已提交
58 59 60 61 62 63 64 65 66 67 68
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  return TSDB_CODE_SUCCESS;
}

void schFreeTask(SSchTask* pTask) {
  if (pTask->candidateAddrs) {
    taosArrayDestroy(pTask->candidateAddrs);
  }

H
Haojun Liao 已提交
69
  tfree(pTask->msg);
D
dapan1121 已提交
70 71 72 73 74 75 76 77

  if (pTask->children) {
    taosArrayDestroy(pTask->children);
  }

  if (pTask->parents) {
    taosArrayDestroy(pTask->parents);
  }
H
Haojun Liao 已提交
78 79 80 81

  if (pTask->execAddrs) {
    taosArrayDestroy(pTask->execAddrs);
  }
D
dapan1121 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
}


int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
  int32_t lastMsgType = atomic_load_32(&pTask->lastMsgType);
  
  switch (msgType) {
    case TDMT_VND_CREATE_TABLE_RSP:
    case TDMT_VND_SUBMIT_RSP:
    case TDMT_VND_QUERY_RSP:
    case TDMT_VND_RES_READY_RSP:
    case TDMT_VND_FETCH_RSP:
    case TDMT_VND_DROP_TASK:
      if (lastMsgType != (msgType - 1)) {
        SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%d, rspType:%d", lastMsgType, msgType);
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
D
dapan1121 已提交
99

D
dapan1121 已提交
100
      if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING && SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
D
dapan1121 已提交
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
        SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%d", SCH_GET_TASK_STATUS(pTask), msgType);
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }

      break;
    default:
      SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%d", msgType, SCH_GET_TASK_STATUS(pTask));
      
      SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  return TSDB_CODE_SUCCESS;
}


int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
D
dapan 已提交
117 118
  int32_t code = 0;

D
dapan1121 已提交
119
  int8_t oriStatus = 0;
D
dapan1121 已提交
120

D
dapan1121 已提交
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
  while (true) {
    oriStatus = SCH_GET_JOB_STATUS(pJob);

    if (oriStatus == newStatus) {
      SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
    
    switch (oriStatus) {
      case JOB_TASK_STATUS_NULL:
        if (newStatus != JOB_TASK_STATUS_NOT_START) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
        
        break;
      case JOB_TASK_STATUS_NOT_START:
        if (newStatus != JOB_TASK_STATUS_EXECUTING) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
        
        break;
      case JOB_TASK_STATUS_EXECUTING:
        if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED 
         && newStatus != JOB_TASK_STATUS_FAILED 
         && newStatus != JOB_TASK_STATUS_CANCELLING 
         && newStatus != JOB_TASK_STATUS_CANCELLED 
         && newStatus != JOB_TASK_STATUS_DROPPING) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
        
        break;
      case JOB_TASK_STATUS_PARTIAL_SUCCEED:
        if (newStatus != JOB_TASK_STATUS_FAILED 
         && newStatus != JOB_TASK_STATUS_SUCCEED
         && newStatus != JOB_TASK_STATUS_DROPPING) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
        
        break;
      case JOB_TASK_STATUS_SUCCEED:
      case JOB_TASK_STATUS_FAILED:
      case JOB_TASK_STATUS_CANCELLING:
        if (newStatus != JOB_TASK_STATUS_DROPPING) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
        
        break;
      case JOB_TASK_STATUS_CANCELLED:
      case JOB_TASK_STATUS_DROPPING:
D
dapan1121 已提交
169
        SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
170 171 172 173
        break;
        
      default:
        SCH_JOB_ELOG("invalid job status:%d", oriStatus);
D
dapan 已提交
174
        SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
175 176 177 178 179
    }

    if (oriStatus != atomic_val_compare_exchange_8(&pJob->status, oriStatus, newStatus)) {
      continue;
    }
D
dapan 已提交
180

D
dapan1121 已提交
181
    SCH_JOB_DLOG("job status updated from %d to %d", oriStatus, newStatus);
D
dapan1121 已提交
182

D
dapan1121 已提交
183 184
    break;
  }
D
dapan1121 已提交
185

D
dapan 已提交
186 187 188 189 190 191 192 193 194
  return TSDB_CODE_SUCCESS;

_return:

  SCH_JOB_ELOG("invalid job status update, from %d to %d", oriStatus, newStatus);
  SCH_ERR_RET(code);
}


D
dapan1121 已提交
195
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
D
dapan1121 已提交
196 197
  for (int32_t i = 0; i < pJob->levelNum; ++i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
D
dapan 已提交
198
    
D
dapan1121 已提交
199 200 201 202 203
    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
      SSubplan *pPlan = pTask->plan;
      int32_t childNum = pPlan->pChildren ? (int32_t)taosArrayGetSize(pPlan->pChildren) : 0;
      int32_t parentNum = pPlan->pParents ? (int32_t)taosArrayGetSize(pPlan->pParents) : 0;
D
dapan1121 已提交
204 205

      if (childNum > 0) {
D
dapan1121 已提交
206 207 208 209 210
        if (pJob->levelIdx == pLevel->level) {
          SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
        
D
dapan1121 已提交
211 212 213
        pTask->children = taosArrayInit(childNum, POINTER_BYTES);
        if (NULL == pTask->children) {
          SCH_TASK_ELOG("taosArrayInit %d children failed", childNum);
D
dapan1121 已提交
214 215 216 217 218
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      for (int32_t n = 0; n < childNum; ++n) {
D
dapan1121 已提交
219
        SSubplan **child = taosArrayGet(pPlan->pChildren, n);
D
dapan 已提交
220
        SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES);
D
dapan 已提交
221
        if (NULL == childTask || NULL == *childTask) {
D
dapan1121 已提交
222
          SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
223 224 225
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan1121 已提交
226 227
        if (NULL == taosArrayPush(pTask->children, childTask)) {
          SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
228 229 230 231 232
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      if (parentNum > 0) {
D
dapan1121 已提交
233 234 235 236 237
        if (0 == pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
        
D
dapan1121 已提交
238 239 240
        pTask->parents = taosArrayInit(parentNum, POINTER_BYTES);
        if (NULL == pTask->parents) {
          SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
D
dapan1121 已提交
241 242
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
D
dapan1121 已提交
243 244 245 246 247
      } else {
        if (0 != pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:%d, parentNum:%d", pLevel->level, parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
D
dapan1121 已提交
248 249 250
      }

      for (int32_t n = 0; n < parentNum; ++n) {
D
dapan1121 已提交
251
        SSubplan **parent = taosArrayGet(pPlan->pParents, n);
D
dapan 已提交
252
        SSchTask **parentTask = taosHashGet(planToTask, parent, POINTER_BYTES);
D
dapan 已提交
253
        if (NULL == parentTask || NULL == *parentTask) {
D
dapan1121 已提交
254
          SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
255 256 257
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan1121 已提交
258 259
        if (NULL == taosArrayPush(pTask->parents, parentTask)) {
          SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
260 261
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
D
dapan1121 已提交
262 263 264
      }  

      SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
D
dapan1121 已提交
265 266 267
    }
  }

D
dapan1121 已提交
268 269 270
  SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
  if (pJob->attr.queryJob && pLevel->taskNum > 1) {
    SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
D
dapan 已提交
271 272 273
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

D
dapan1121 已提交
274 275 276
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
277

D
dapan 已提交
278
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
279 280 281
  int32_t idx = atomic_load_8(&pTask->candidateIdx);
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, idx);
  if (NULL == addr) {
D
dapan1121 已提交
282
    SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", idx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
D
dapan1121 已提交
283 284
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
D
dapan1121 已提交
285 286

  pTask->succeedAddr = *addr;
287

D
dapan1121 已提交
288
  return TSDB_CODE_SUCCESS;
289 290
}

D
dapan1121 已提交
291 292 293 294 295 296 297 298

int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr) {
  if (NULL == taosArrayPush(pTask->execAddrs, addr)) {
    SCH_TASK_ELOG("taosArrayPush addr to execAddr list failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  return TSDB_CODE_SUCCESS;
299
}
D
dapan1121 已提交
300

D
dapan1121 已提交
301

D
dapan1121 已提交
302
int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) {
D
dapan1121 已提交
303
  int32_t code = 0;
D
dapan1121 已提交
304
  pJob->queryId = pDag->queryId;
D
dapan1121 已提交
305
  
D
dapan1121 已提交
306 307
  if (pDag->numOfSubplans <= 0) {
    SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
D
dapan 已提交
308 309 310
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
  
D
dapan1121 已提交
311
  int32_t levelNum = (int32_t)taosArrayGetSize(pDag->pSubplans);
312
  if (levelNum <= 0) {
D
dapan1121 已提交
313
    SCH_JOB_ELOG("invalid level num:%d", levelNum);
D
dapan1121 已提交
314
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
315 316
  }

D
dapan1121 已提交
317 318
  SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == planToTask) {
D
dapan1121 已提交
319
    SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER);
D
dapan1121 已提交
320 321
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
322

D
dapan1121 已提交
323 324
  pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
  if (NULL == pJob->levels) {
D
dapan1121 已提交
325
    SCH_JOB_ELOG("taosArrayInit %d failed", levelNum);
D
dapan1121 已提交
326
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
327 328
  }

D
dapan1121 已提交
329 330
  pJob->levelNum = levelNum;
  pJob->levelIdx = levelNum - 1;
331

D
dapan1121 已提交
332
  pJob->subPlans = pDag->pSubplans;
333

D
dapan 已提交
334
  SSchLevel level = {0};
D
dapan1121 已提交
335 336
  SArray *plans = NULL;
  int32_t taskNum = 0;
D
dapan 已提交
337
  SSchLevel *pLevel = NULL;
338

D
dapan1121 已提交
339
  level.status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
340

341
  for (int32_t i = 0; i < levelNum; ++i) {
D
dapan1121 已提交
342
    if (NULL == taosArrayPush(pJob->levels, &level)) {
D
dapan1121 已提交
343
      SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i);
D
dapan1121 已提交
344 345 346
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

D
dapan1121 已提交
347
    pLevel = taosArrayGet(pJob->levels, i);
D
dapan1121 已提交
348
    pLevel->level = i;
D
dapan1121 已提交
349
    
D
dapan1121 已提交
350
    plans = taosArrayGetP(pDag->pSubplans, i);
D
dapan1121 已提交
351 352
    if (NULL == plans) {
      SCH_JOB_ELOG("empty level plan, level:%d", i);
D
dapan1121 已提交
353
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
354 355
    }

D
dapan1121 已提交
356 357 358
    taskNum = (int32_t)taosArrayGetSize(plans);
    if (taskNum <= 0) {
      SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
D
dapan1121 已提交
359
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
360 361
    }

D
dapan1121 已提交
362
    pLevel->taskNum = taskNum;
D
dapan1121 已提交
363
    
D
dapan1121 已提交
364
    pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
D
dapan1121 已提交
365
    if (NULL == pLevel->subTasks) {
D
dapan1121 已提交
366
      SCH_JOB_ELOG("taosArrayInit %d failed", taskNum);
D
dapan1121 已提交
367
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
368 369
    }
    
D
dapan1121 已提交
370 371
    for (int32_t n = 0; n < taskNum; ++n) {
      SSubplan *plan = taosArrayGetP(plans, n);
D
dapan 已提交
372

D
dapan1121 已提交
373 374 375 376
      SCH_SET_JOB_TYPE(&pJob->attr, plan->type);

      SSchTask  task = {0};
      SSchTask *pTask = &task;
377

D
dapan1121 已提交
378
      SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel));
D
dapan1121 已提交
379
      
D
dapan1121 已提交
380
      void *p = taosArrayPush(pLevel->subTasks, &task);
D
dapan1121 已提交
381
      if (NULL == p) {
D
dapan1121 已提交
382
        SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
D
dapan1121 已提交
383 384 385 386
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
      
      if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
D
dapan1121 已提交
387
        SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
D
dapan1121 已提交
388 389
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
390
    }
D
dapan1121 已提交
391

D
dapan1121 已提交
392
    SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum);
D
dapan1121 已提交
393
  }
D
dapan1121 已提交
394 395

  SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
D
dapan1121 已提交
396 397

_return:
D
dapan1121 已提交
398 399 400 401
  if (planToTask) {
    taosHashCleanup(planToTask);
  }

D
dapan1121 已提交
402
  SCH_RET(code);
403 404
}

D
dapan1121 已提交
405 406
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
  if (NULL != pTask->candidateAddrs) {
D
dapan 已提交
407 408 409
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
410
  pTask->candidateIdx = 0;
411
  pTask->candidateAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
D
dapan1121 已提交
412
  if (NULL == pTask->candidateAddrs) {
413
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CANDIDATE_EP_NUM);
D
dapan1121 已提交
414 415 416
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

H
Haojun Liao 已提交
417
  if (pTask->plan->execNode.epset.numOfEps > 0) {
D
dapan1121 已提交
418 419
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
D
dapan1121 已提交
420 421 422
      SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

H
Haojun Liao 已提交
423
    SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epset.numOfEps);
D
dapan1121 已提交
424

D
dapan1121 已提交
425 426 427 428
    return TSDB_CODE_SUCCESS;
  }

  int32_t addNum = 0;
D
dapan 已提交
429
  int32_t nodeNum = 0;
430
  if (pJob->nodeList) {
D
dapan 已提交
431
    nodeNum = taosArrayGetSize(pJob->nodeList);
D
dapan1121 已提交
432
    
433
    for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
D
dapan 已提交
434 435
      SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i);
      
436
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
D
dapan 已提交
437 438 439
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
440 441

      ++addNum;
D
dapan1121 已提交
442
    }
D
dapan1121 已提交
443 444
  }

D
dapan1121 已提交
445
  if (addNum <= 0) {
H
Haojun Liao 已提交
446
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
D
dapan1121 已提交
447 448 449
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

D
dapan1121 已提交
450
/*
451
  for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
D
dapan1121 已提交
452 453 454 455
    strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
    epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
    
    ++epSet->numOfEps;
D
dapan 已提交
456
  }
D
dapan1121 已提交
457
*/
D
dapan 已提交
458 459

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
460
}
D
dapan1121 已提交
461

D
dapan1121 已提交
462
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
463 464 465
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
466
      SCH_TASK_ELOG("task already in execTask list, code:%x", code);
D
dapan1121 已提交
467 468 469
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }
    
D
dapan1121 已提交
470
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
D
dapan 已提交
471 472 473
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
474
  SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
475

D
dapan 已提交
476 477 478
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
479 480 481
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask));
D
dapan 已提交
482 483
  }

D
dapan1121 已提交
484 485 486 487 488 489 490 491 492 493
  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
      
      SCH_TASK_ELOG("task already in succTask list, status:%d", SCH_GET_TASK_STATUS(pTask));
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
  
    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno);
D
dapan1121 已提交
494 495 496 497
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
D
dapan1121 已提交
498 499

  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));
D
dapan1121 已提交
500 501 502 503
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
504 505 506 507 508
int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  *moved = false;
  
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask));
D
dapan1121 已提交
509 510
  }

D
dapan1121 已提交
511 512 513 514 515 516 517 518 519 520
  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
      
      SCH_TASK_WLOG("task already in failTask list, status:%d", SCH_GET_TASK_STATUS(pTask));
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
    
    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno);
D
dapan 已提交
521 522 523 524
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
D
dapan1121 已提交
525 526

  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));
D
dapan 已提交
527 528 529 530
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557

int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask));
  }

  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
      
      SCH_TASK_ELOG("task already in execTask list, status:%d", SCH_GET_TASK_STATUS(pTask));
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
  
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
  
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
558
int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) {
D
dapan1121 已提交
559 560
  // TODO set retry or not based on task type/errCode/retry times/job status/available eps...
  // TODO if needRetry, set task retry info
D
dapan1121 已提交
561 562
  // TODO set condidateIdx
  // TODO record failed but tried task
D
dapan 已提交
563

D
dapan1121 已提交
564 565 566
  *needRetry = false;

  return TSDB_CODE_SUCCESS;
D
dapan 已提交
567 568
}

D
dapan1121 已提交
569
int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) {
D
dapan1121 已提交
570
  // if already FAILED, no more processing
D
dapan1121 已提交
571
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, status));
D
dapan1121 已提交
572 573 574 575 576 577 578 579 580
  
  if (errCode) {
    atomic_store_32(&pJob->errCode, errCode);
  }

  if (atomic_load_8(&pJob->userFetch) || ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule)) {
    tsem_post(&pJob->rspSem);
  }

D
dapan1121 已提交
581 582
  int32_t code = atomic_load_32(&pJob->errCode);
  SCH_ERR_RET(code);
D
dapan1121 已提交
583

D
dapan1121 已提交
584
  SCH_JOB_ELOG("job errCode is invalid, errCode:%d", code);
D
dapan1121 已提交
585 586 587 588
}



D
dapan1121 已提交
589 590 591 592 593 594 595 596 597 598 599
// Note: no more error processing, handled in function internal
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
  SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAILED, errCode));
}

// Note: no more error processing, handled in function internal
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) {
  SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_DROPPING, errCode));
}


D
dapan1121 已提交
600
// Note: no more error processing, handled in function internal
D
dapan1121 已提交
601
int32_t schFetchFromRemote(SSchJob *pJob) {
D
dapan 已提交
602 603
  int32_t code = 0;
  
D
dapan1121 已提交
604 605
  if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) {
    SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch));
D
dapan 已提交
606 607 608
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
609 610 611 612 613 614 615
  void *res = atomic_load_ptr(&pJob->res);
  if (res) {
    atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

    SCH_JOB_DLOG("res already fetched, res:%p", res);
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
616 617

  SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH));
D
dapan 已提交
618 619 620 621

  return TSDB_CODE_SUCCESS;
  
_return:
D
dapan1121 已提交
622 623

  atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);
D
dapan 已提交
624

D
dapan1121 已提交
625 626
  schProcessOnJobFailure(pJob, code);

D
dapan 已提交
627
  return code;
D
dapan 已提交
628 629
}

D
dapan 已提交
630

D
dapan1121 已提交
631 632 633
// Note: no more error processing, handled in function internal
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
  int32_t code = 0;
D
dapan 已提交
634
  
D
dapan1121 已提交
635
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_PARTIAL_SUCCEED));
D
dapan 已提交
636

D
dapan1121 已提交
637
  if ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule) {
D
dapan 已提交
638
    tsem_post(&pJob->rspSem);
D
dapan 已提交
639
  }
D
dapan1121 已提交
640 641 642 643
  
  if (atomic_load_8(&pJob->userFetch)) {
    SCH_ERR_JRET(schFetchFromRemote(pJob));
  }
D
dapan 已提交
644

D
dapan 已提交
645
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
646 647 648 649 650 651

_return:

  SCH_ERR_RET(schProcessOnJobFailure(pJob, code));

  SCH_RET(code);
D
dapan 已提交
652 653
}

D
dapan1121 已提交
654
int32_t schProcessOnDataFetched(SSchJob *job) {
D
dapan 已提交
655 656
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
  tsem_post(&job->rspSem);
D
dapan 已提交
657 658
}

D
dapan1121 已提交
659 660 661
// Note: no more error processing, handled in function internal
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
  bool needRetry = false;
D
dapan 已提交
662
  bool moved = false;
D
dapan1121 已提交
663
  int32_t taskDone = 0;
D
dapan1121 已提交
664
  int32_t code = 0;
D
dapan1121 已提交
665

H
Haojun Liao 已提交
666
  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
D
dapan 已提交
667
  
D
dapan1121 已提交
668 669 670
  SCH_ERR_JRET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry));
  
  if (!needRetry) {
H
Haojun Liao 已提交
671
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
D
dapan1121 已提交
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698

    if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
      code = schMoveTaskToFailList(pJob, pTask, &moved);
      if (code && moved) {
        SCH_ERR_RET(errCode);
      }
    }

    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED);
    
    if (SCH_TASK_NEED_WAIT_ALL(pTask)) {
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskFailed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);

      atomic_store_32(&pJob->errCode, errCode);
      
      if (taskDone < pTask->level->taskNum) {
        SCH_TASK_DLOG("not all tasks done, done:%d, all:%d", taskDone, pTask->level->taskNum);
        SCH_ERR_RET(errCode);
      }
    }
  } else {
    // Note: no more error processing, already handled
    SCH_ERR_RET(schLaunchTask(pJob, pTask));
    
D
dapan 已提交
699 700
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
701

D
dapan1121 已提交
702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720
_return:

  SCH_ERR_RET(schProcessOnJobFailure(pJob, errCode));

  SCH_ERR_RET(errCode);
}


// Note: no more error processing, handled in function internal
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
  bool moved = false;
  int32_t code = 0;
  SSchTask *pErrTask = pTask;

  code = schMoveTaskToSuccList(pJob, pTask, &moved);
  if (code && moved) {
    SCH_ERR_RET(code);
  }

D
dapan1121 已提交
721
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED);
D
dapan1121 已提交
722

D
dapan1121 已提交
723
  SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask));
D
dapan 已提交
724
  
D
dapan1121 已提交
725
  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
D
dapan 已提交
726
  if (parentNum == 0) {
D
dapan1121 已提交
727 728
    int32_t taskDone = 0;
    
D
dapan1121 已提交
729 730 731 732 733
    if (SCH_TASK_NEED_WAIT_ALL(pTask)) {
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskSucceed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
D
dapan1121 已提交
734
      
D
dapan1121 已提交
735
      if (taskDone < pTask->level->taskNum) {
S
Shengliang Guan 已提交
736
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
737
        return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
738
      } else if (taskDone > pTask->level->taskNum) {
D
dapan1121 已提交
739
        SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
740 741
      }

D
dapan1121 已提交
742
      if (pTask->level->taskFailed > 0) {
D
dapan1121 已提交
743 744 745
        SCH_RET(schProcessOnJobFailure(pJob, 0));
      } else {
        SCH_RET(schProcessOnJobPartialSuccess(pJob));
D
dapan1121 已提交
746 747
      }
    } else {
D
dapan1121 已提交
748
      pJob->resNode = pTask->succeedAddr;
D
dapan1121 已提交
749
    }
D
dapan 已提交
750

D
dapan1121 已提交
751
    pJob->fetchTask = pTask;
D
dapan1121 已提交
752 753 754 755 756

    code = schMoveTaskToExecList(pJob, pTask, &moved);
    if (code && moved) {
      SCH_ERR_RET(code);
    }
D
dapan1121 已提交
757
    
D
dapan1121 已提交
758
    SCH_ERR_RET(schProcessOnJobPartialSuccess(pJob));
D
dapan 已提交
759 760 761 762

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
763
/*
764
  if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
D
dapan 已提交
765 766 767 768 769
    strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
    job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;

    ++job->dataSrcEps.numOfEps;
  }
D
dapan1121 已提交
770
*/
D
dapan 已提交
771

D
dapan 已提交
772
  for (int32_t i = 0; i < parentNum; ++i) {
D
dapan1121 已提交
773
    SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i);
D
dapan1121 已提交
774 775
    pErrTask = par;
    
D
dapan 已提交
776
    int32_t readyNum = atomic_add_fetch_32(&par->childReady, 1);
D
dapan 已提交
777

D
dapan1121 已提交
778
    SCH_LOCK(SCH_WRITE, &par->lock);
H
Haojun Liao 已提交
779 780
    SDownstreamSource source = {.taskId = pTask->taskId, .schedId = schMgmt.sId, .addr = pTask->succeedAddr};
    qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &source);
D
dapan1121 已提交
781
    SCH_UNLOCK(SCH_WRITE, &par->lock);
D
dapan 已提交
782
    
D
dapan 已提交
783
    if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) {
D
dapan1121 已提交
784
      SCH_ERR_RET(schLaunchTask(pJob, par));
D
dapan 已提交
785 786 787 788 789
    }
  }

  return TSDB_CODE_SUCCESS;

D
dapan1121 已提交
790
_return:
D
dapan 已提交
791

D
dapan1121 已提交
792
  SCH_ERR_RET(schProcessOnTaskFailure(pJob, pErrTask, code));
D
dapan 已提交
793

D
dapan1121 已提交
794
  SCH_ERR_RET(code);
D
dapan 已提交
795 796
}

D
dapan1121 已提交
797
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
D
dapan1121 已提交
798
  int32_t code = 0;
H
Haojun Liao 已提交
799

D
dapan1121 已提交
800 801
  SCH_ERR_JRET(schValidateTaskReceivedMsgType(pJob, pTask, msgType));

D
dapan1121 已提交
802
  switch (msgType) {
H
Haojun Liao 已提交
803
    case TDMT_VND_CREATE_TABLE_RSP: {
D
dapan1121 已提交
804 805
        if (rspCode != TSDB_CODE_SUCCESS) {
          SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
H
Haojun Liao 已提交
806
        }
D
dapan1121 已提交
807 808
        
        SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
D
dapan1121 已提交
809

D
dapan1121 已提交
810 811
        break;
      }
D
dapan1121 已提交
812
    case TDMT_VND_SUBMIT_RSP: {
D
dapan1121 已提交
813 814 815 816 817 818 819 820 821 822 823
        #if 0 //TODO OPEN THIS
        SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;

        if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) {
          SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
        }

        pJob->resNumOfRows += rsp->affectedRows;
        #else
        if (rspCode != TSDB_CODE_SUCCESS) {
          SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
D
dapan1121 已提交
824
        }
D
dapan1121 已提交
825 826 827 828 829

        SShellSubmitRsp *rsp = (SShellSubmitRsp *)msg;
        if (rsp) {
          pJob->resNumOfRows += rsp->affectedRows;
        }
D
dapan1121 已提交
830 831 832 833
        #endif

        SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));

D
dapan1121 已提交
834
        break;
D
dapan1121 已提交
835
      }
D
dapan1121 已提交
836
    case TDMT_VND_QUERY_RSP: {
D
dapan1121 已提交
837 838
        SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
        
D
dapan1121 已提交
839
        if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
840
          SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
D
dapan1121 已提交
841
        }
D
dapan1121 已提交
842 843 844
        
        SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY));
        
D
dapan1121 已提交
845 846
        break;
      }
D
dapan1121 已提交
847
    case TDMT_VND_RES_READY_RSP: {
D
dapan1121 已提交
848 849
        SResReadyRsp *rsp = (SResReadyRsp *)msg;
        
D
dapan1121 已提交
850
        if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
851
          SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
D
dapan1121 已提交
852
        }
D
dapan1121 已提交
853 854 855
        
        SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
        
D
dapan1121 已提交
856 857
        break;
      }
D
dapan1121 已提交
858
    case TDMT_VND_FETCH_RSP: {
D
dapan1121 已提交
859 860
        SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;

D
dapan1121 已提交
861 862
        if (rspCode != TSDB_CODE_SUCCESS || NULL == msg) {
          SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
D
dapan1121 已提交
863
        }
D
dapan1121 已提交
864

D
dapan1121 已提交
865 866 867 868 869 870
        if (pJob->res) {
          SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->res);
          tfree(rsp);
          SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
        }

D
dapan1121 已提交
871
        atomic_store_ptr(&pJob->res, rsp);
D
dapan1121 已提交
872
        atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
D
dapan1121 已提交
873 874 875 876

        if (rsp->completed) {
          SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
        }
H
Haojun Liao 已提交
877

D
dapan1121 已提交
878 879
        SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);

D
dapan1121 已提交
880
        SCH_ERR_JRET(schProcessOnDataFetched(pJob));
D
dapan1121 已提交
881
        break;
D
dapan1121 已提交
882
      }
D
dapan1121 已提交
883
    case TDMT_VND_DROP_TASK_RSP: {
D
dapan1121 已提交
884
        // SHOULD NEVER REACH HERE
D
dapan1121 已提交
885 886
        SCH_TASK_ELOG("invalid status to handle drop task rsp, ref:%d", atomic_load_32(&pJob->ref));
        SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
D
dapan1121 已提交
887
        break;
D
dapan1121 已提交
888
      }
D
dapan1121 已提交
889
    default:
D
dapan1121 已提交
890 891
      SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%d", msgType, SCH_GET_TASK_STATUS(pTask));
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
892 893 894 895 896
  }

  return TSDB_CODE_SUCCESS;

_return:
D
dapan1121 已提交
897 898 899 900

  SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, code));
  
  SCH_RET(code);
D
dapan1121 已提交
901 902
}

D
dapan 已提交
903

D
dapan1121 已提交
904
int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) {
D
dapan1121 已提交
905 906
  int32_t code = 0;
  SSchCallbackParam *pParam = (SSchCallbackParam *)param;
D
dapan1121 已提交
907 908
  SSchJob *pJob = NULL;
  SSchTask *pTask = NULL;
D
dapan1121 已提交
909
  
D
dapan1121 已提交
910 911
  SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId));
  if (NULL == job || NULL == (*job)) {
D
dapan1121 已提交
912
    qError("QID:%"PRIx64" taosHashGet queryId not exist, may be dropped", pParam->queryId);
D
dapan1121 已提交
913
    SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
914 915
  }

D
dapan1121 已提交
916 917 918 919 920
  pJob = *job;

  atomic_add_fetch_32(&pJob->ref, 1);

  int32_t s = taosHashGetSize(pJob->execTasks);
D
dapan1121 已提交
921
  if (s <= 0) {
D
dapan1121 已提交
922
    qError("QID:%"PRIx64",TID:%"PRId64" no task in execTask list", pParam->queryId, pParam->taskId);
D
dapan1121 已提交
923 924
    SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
H
Haojun Liao 已提交
925

D
dapan1121 已提交
926
  SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId));
D
dapan1121 已提交
927
  if (NULL == task || NULL == (*task)) {
D
dapan1121 已提交
928
    qError("QID:%"PRIx64",TID:%"PRId64" taosHashGet taskId not exist", pParam->queryId, pParam->taskId);
D
dapan1121 已提交
929 930
    SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
D
dapan1121 已提交
931 932

  pTask = *task;
933
  SCH_TASK_DLOG("rsp msg received, type:%s, code:%s", TMSG_INFO(msgType), tstrerror(rspCode));
D
dapan1121 已提交
934
  
D
dapan1121 已提交
935
  SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
D
dapan1121 已提交
936

H
Haojun Liao 已提交
937
_return:
D
dapan1121 已提交
938 939 940 941 942

  if (pJob) {
    atomic_sub_fetch_32(&pJob->ref, 1);
  }

D
dapan1121 已提交
943 944 945 946
  tfree(param);
  SCH_RET(code);
}

D
dapan1121 已提交
947
int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) {
D
dapan1121 已提交
948 949
  return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
}
H
Haojun Liao 已提交
950

D
dapan1121 已提交
951
int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
H
Haojun Liao 已提交
952 953 954
  return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
}

D
dapan1121 已提交
955
int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) {
D
dapan1121 已提交
956 957
  return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
H
Haojun Liao 已提交
958

D
dapan1121 已提交
959
int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) {
D
dapan1121 已提交
960 961
  return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
}
H
Haojun Liao 已提交
962

D
dapan1121 已提交
963
int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) {
D
dapan1121 已提交
964 965
  return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
}
H
Haojun Liao 已提交
966

D
dapan1121 已提交
967
int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {  
D
dapan1121 已提交
968
  SSchCallbackParam *pParam = (SSchCallbackParam *)param;
D
dapan1121 已提交
969
  qDebug("QID:%"PRIx64",TID:%"PRIx64" drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code);
D
dapan1121 已提交
970 971
}

D
dapan1121 已提交
972
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
D
dapan1121 已提交
973
  switch (msgType) {
H
Haojun Liao 已提交
974 975 976
    case TDMT_VND_CREATE_TABLE:
      *fp = schHandleCreateTableCallback;
      break;
D
dapan1121 已提交
977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992
    case TDMT_VND_SUBMIT: 
      *fp = schHandleSubmitCallback;
      break;
    case TDMT_VND_QUERY: 
      *fp = schHandleQueryCallback;
      break;
    case TDMT_VND_RES_READY: 
      *fp = schHandleReadyCallback;
      break;
    case TDMT_VND_FETCH: 
      *fp = schHandleFetchCallback;
      break;
    case TDMT_VND_DROP_TASK:
      *fp = schHandleDropCallback;
      break;
    default:
D
dapan1121 已提交
993
      qError("unknown msg type for callback, msgType:%d", msgType);
D
dapan1121 已提交
994 995 996 997 998 999 1000
      SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
1001
int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) {
D
dapan1121 已提交
1002 1003 1004
  int32_t code = 0;
  SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
  if (NULL == pMsgSendInfo) {
D
dapan 已提交
1005
    qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SMsgSendInfo));
D
dapan1121 已提交
1006 1007 1008 1009 1010
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SSchCallbackParam *param = calloc(1, sizeof(SSchCallbackParam));
  if (NULL == param) {
D
dapan 已提交
1011
    qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SSchCallbackParam));
D
dapan1121 已提交
1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));

  param->queryId = qId;
  param->taskId = tId;

  pMsgSendInfo->param = param;
  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
  pMsgSendInfo->msgType = msgType;
  pMsgSendInfo->fp = fp;
  
  int64_t  transporterId = 0;
D
dapan1121 已提交
1028 1029 1030 1031
  code = asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo);
  if (code) {
    SCH_ERR_JRET(code);
  }
D
dapan1121 已提交
1032

1033
  qDebug("QID:0x%"PRIx64 ",TID:0x%"PRIx64 " req msg sent, type:%d, %s", qId, tId, msgType, TMSG_INFO(msgType));
D
dapan1121 已提交
1034 1035 1036
  return TSDB_CODE_SUCCESS;

_return:
D
dapan 已提交
1037
  
D
dapan1121 已提交
1038 1039 1040 1041 1042
  tfree(param);
  tfree(pMsgSendInfo);
  SCH_RET(code);
}

D
dapan1121 已提交
1043
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
D
dapan1121 已提交
1044 1045 1046
  uint32_t msgSize = 0;
  void *msg = NULL;
  int32_t code = 0;
D
dapan1121 已提交
1047 1048 1049 1050 1051 1052
  bool isCandidateAddr = false;
  if (NULL == addr) {
    addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx));
    isCandidateAddr = true;
  }

H
Haojun Liao 已提交
1053 1054
  SEpSet epSet = addr->epset;

D
dapan1121 已提交
1055
  switch (msgType) {
H
Haojun Liao 已提交
1056
    case TDMT_VND_CREATE_TABLE:
D
dapan1121 已提交
1057
    case TDMT_VND_SUBMIT: {
D
dapan1121 已提交
1058
      msgSize = pTask->msgLen;
D
dapan1121 已提交
1059 1060 1061 1062 1063 1064 1065
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        SCH_TASK_ELOG("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      memcpy(msg, pTask->msg, msgSize);
D
dapan1121 已提交
1066 1067
      break;
    }
1068

D
dapan1121 已提交
1069
    case TDMT_VND_QUERY: {
1070 1071 1072
      uint32_t len = strlen(pJob->sql);

      msgSize = sizeof(SSubQueryMsg) + pTask->msgLen + len;
D
dapan1121 已提交
1073 1074
      msg = calloc(1, msgSize);
      if (NULL == msg) {
D
dapan 已提交
1075
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
1076 1077 1078 1079
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      SSubQueryMsg *pMsg = msg;
D
dapan 已提交
1080
      pMsg->header.vgId = htonl(addr->nodeId);
1081 1082 1083 1084 1085 1086 1087 1088 1089
      pMsg->sId        = htobe64(schMgmt.sId);
      pMsg->queryId    = htobe64(pJob->queryId);
      pMsg->taskId     = htobe64(pTask->taskId);
      pMsg->taskType   = TASK_TYPE_TEMP;
      pMsg->phyLen     = htonl(pTask->msgLen);
      pMsg->sqlLen     = htonl(len);

      memcpy(pMsg->msg, pJob->sql, len);
      memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
D
dapan1121 已提交
1090
      break;
1091 1092
    }

D
dapan1121 已提交
1093
    case TDMT_VND_RES_READY: {
S
Shengliang Guan 已提交
1094
      msgSize = sizeof(SResReadyReq);
D
dapan1121 已提交
1095 1096
      msg = calloc(1, msgSize);
      if (NULL == msg) {
D
dapan 已提交
1097
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
1098 1099 1100
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

S
Shengliang Guan 已提交
1101
      SResReadyReq *pMsg = msg;
D
dapan1121 已提交
1102
      
D
dapan 已提交
1103 1104
      pMsg->header.vgId = htonl(addr->nodeId);  
      
D
dapan1121 已提交
1105
      pMsg->sId = htobe64(schMgmt.sId);      
D
dapan1121 已提交
1106 1107
      pMsg->queryId = htobe64(pJob->queryId);
      pMsg->taskId = htobe64(pTask->taskId);      
D
dapan1121 已提交
1108 1109 1110
      break;
    }
    case TDMT_VND_FETCH: {
S
Shengliang Guan 已提交
1111
      msgSize = sizeof(SResFetchReq);
D
dapan1121 已提交
1112 1113
      msg = calloc(1, msgSize);
      if (NULL == msg) {
D
dapan 已提交
1114
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
1115 1116 1117
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
    
S
Shengliang Guan 已提交
1118
      SResFetchReq *pMsg = msg;
D
dapan1121 已提交
1119
      
D
dapan 已提交
1120 1121
      pMsg->header.vgId = htonl(addr->nodeId);  
      
D
dapan1121 已提交
1122
      pMsg->sId = htobe64(schMgmt.sId);      
D
dapan1121 已提交
1123 1124
      pMsg->queryId = htobe64(pJob->queryId);
      pMsg->taskId = htobe64(pTask->taskId);      
D
dapan1121 已提交
1125 1126 1127
      break;
    }
    case TDMT_VND_DROP_TASK:{
S
Shengliang Guan 已提交
1128
      msgSize = sizeof(STaskDropReq);
D
dapan1121 已提交
1129 1130
      msg = calloc(1, msgSize);
      if (NULL == msg) {
D
dapan 已提交
1131
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
1132 1133 1134
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
    
S
Shengliang Guan 已提交
1135
      STaskDropReq *pMsg = msg;
D
dapan1121 已提交
1136
      
D
dapan 已提交
1137 1138
      pMsg->header.vgId = htonl(addr->nodeId);   
      
D
dapan1121 已提交
1139
      pMsg->sId = htobe64(schMgmt.sId);      
D
dapan1121 已提交
1140 1141
      pMsg->queryId = htobe64(pJob->queryId);
      pMsg->taskId = htobe64(pTask->taskId);      
D
dapan1121 已提交
1142 1143 1144
      break;
    }
    default:
D
dapan1121 已提交
1145
      SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType);
D
dapan1121 已提交
1146 1147 1148 1149
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      break;
  }

D
dapan1121 已提交
1150
  atomic_store_32(&pTask->lastMsgType, msgType);
D
dapan1121 已提交
1151

D
dapan1121 已提交
1152
  SCH_ERR_JRET(schAsyncSendMsg(pJob->transport, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize));
D
dapan1121 已提交
1153

D
dapan1121 已提交
1154 1155 1156 1157
  if (isCandidateAddr) {
    SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr));
  }
  
D
dapan1121 已提交
1158 1159 1160 1161
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
1162 1163
  atomic_store_32(&pTask->lastMsgType, -1);

D
dapan1121 已提交
1164 1165 1166
  tfree(msg);
  SCH_RET(code);
}
D
dapan1121 已提交
1167

D
dapan1121 已提交
1168 1169 1170 1171 1172 1173 1174 1175 1176
static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (pStatus) {
    *pStatus = status;
  }

  return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED 
       || status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING);
}
D
dapan1121 已提交
1177

D
dapan1121 已提交
1178 1179

// Note: no more error processing, handled in function internal
D
dapan1121 已提交
1180
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
1181 1182 1183 1184 1185
  int8_t status = 0;
  int32_t code = 0;
  
  if (schJobNeedToStop(pJob, &status)) {
    SCH_TASK_ELOG("no need to launch task cause of job status, job status:%d", status);
D
dapan1121 已提交
1186 1187 1188 1189
    
    code = atomic_load_32(&pJob->errCode);
    SCH_ERR_RET(code);
    SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
1190
  }
D
dapan1121 已提交
1191
  
D
dapan1121 已提交
1192
  SSubplan *plan = pTask->plan;
D
dapan1121 已提交
1193

1194
  if (NULL == pTask->msg) { // TODO add more detailed reason for failure
D
dapan1121 已提交
1195 1196
    code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen);
    if (TSDB_CODE_SUCCESS != code || NULL == pTask->msg || pTask->msgLen <= 0) {
1197
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, pTask->msgLen);
D
dapan1121 已提交
1198
      SCH_ERR_JRET(code);
1199
    } else {
1200
      SCH_TASK_DLOG("physical plan len:%d, %s", pTask->msgLen, pTask->msg);
D
dapan1121 已提交
1201
    }
D
dapan1121 已提交
1202
  }
D
dapan1121 已提交
1203 1204
  
  SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
D
dapan1121 已提交
1205

H
Haojun Liao 已提交
1206
  // NOTE: race condition: the task should be put into the hash table before send msg to server
D
dapan 已提交
1207 1208 1209 1210
  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) {
    SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING);
  }
D
dapan1121 已提交
1211

D
dapan1121 已提交
1212
  SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
D
dapan1121 已提交
1213
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1214 1215

_return:
D
dapan1121 已提交
1216
  SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
1217
  SCH_RET(code);
D
dapan1121 已提交
1218 1219
}

D
dapan1121 已提交
1220 1221
int32_t schLaunchJob(SSchJob *pJob) {
  SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
D
dapan1121 已提交
1222 1223

  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_EXECUTING));
D
dapan1121 已提交
1224
  
D
dapan 已提交
1225
  for (int32_t i = 0; i < level->taskNum; ++i) {
D
dapan1121 已提交
1226 1227
    SSchTask *pTask = taosArrayGet(level->subTasks, i);
    SCH_ERR_RET(schLaunchTask(pJob, pTask));
D
dapan1121 已提交
1228
  }
D
dapan 已提交
1229
  
D
dapan1121 已提交
1230
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1231 1232
}

D
dapan1121 已提交
1233 1234 1235 1236 1237
void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) {
  if (NULL == pTask->execAddrs) {
    SCH_TASK_DLOG("no exec address, status:%d", SCH_GET_TASK_STATUS(pTask));
    return;
  }
H
Haojun Liao 已提交
1238

D
dapan1121 已提交
1239 1240 1241
  int32_t size = (int32_t)taosArrayGetSize(pTask->execAddrs);
  
  if (size <= 0) {
D
dapan1121 已提交
1242
    SCH_TASK_DLOG("task has no exec address, no need to drop it, status:%d", SCH_GET_TASK_STATUS(pTask));
D
dapan1121 已提交
1243 1244
    return;
  }
H
Haojun Liao 已提交
1245

D
dapan1121 已提交
1246 1247 1248
  SQueryNodeAddr *addr = NULL;
  for (int32_t i = 0; i < size; ++i) {
    addr = (SQueryNodeAddr *)taosArrayGet(pTask->execAddrs, i);
D
dapan1121 已提交
1249

D
dapan1121 已提交
1250 1251
    schBuildAndSendMsg(pJob, pTask, addr, TDMT_VND_DROP_TASK);
  }
D
dapan1121 已提交
1252 1253

  SCH_TASK_DLOG("task has %d exec address", size);
D
dapan1121 已提交
1254 1255 1256 1257
}

void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
  void *pIter = taosHashIterate(list, NULL);
D
dapan1121 已提交
1258
  while (pIter) {
D
dapan1121 已提交
1259
    SSchTask *pTask = *(SSchTask **)pIter;
H
Haojun Liao 已提交
1260

D
dapan1121 已提交
1261 1262
    if (!SCH_TASK_NO_NEED_DROP(pTask)) {
      schDropTaskOnExecutedNode(pJob, pTask);
H
Haojun Liao 已提交
1263
    }
D
dapan1121 已提交
1264 1265 1266 1267
    
    pIter = taosHashIterate(list, pIter);
  } 
}
H
Haojun Liao 已提交
1268

D
dapan1121 已提交
1269 1270 1271 1272
void schDropJobAllTasks(SSchJob *pJob) {
  schDropTaskInHashList(pJob, pJob->execTasks);
  schDropTaskInHashList(pJob, pJob->succTasks);
  schDropTaskInHashList(pJob, pJob->failTasks);
D
dapan1121 已提交
1273
}
1274

1275
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, struct SSchJob** job, const char* sql, bool syncSchedule) {
H
Haojun Liao 已提交
1276 1277
  qDebug("QID:0x%"PRIx64" job started", pDag->queryId);

H
Haojun Liao 已提交
1278
  if (pNodeList == NULL || (pNodeList && taosArrayGetSize(pNodeList) <= 0)) {
H
Haojun Liao 已提交
1279
    qDebug("QID:0x%"PRIx64" input exec nodeList is empty", pDag->queryId);
D
dapan1121 已提交
1280 1281
  }

D
dapan1121 已提交
1282
  int32_t code = 0;
D
dapan1121 已提交
1283 1284
  SSchJob *pJob = calloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
D
dapan1121 已提交
1285
    qError("QID:%"PRIx64" calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
D
dapan1121 已提交
1286
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
1287 1288
  }

D
dapan1121 已提交
1289 1290
  pJob->attr.syncSchedule = syncSchedule;
  pJob->transport = transport;
1291
  pJob->sql       = sql;
1292 1293 1294 1295

  if (pNodeList != NULL) {
    pJob->nodeList = taosArrayDup(pNodeList);
  }
D
dapan 已提交
1296

D
dapan1121 已提交
1297
  SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));
D
dapan1121 已提交
1298

D
dapan1121 已提交
1299 1300 1301
  pJob->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->execTasks) {
    SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans);
D
dapan 已提交
1302 1303 1304
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1305 1306 1307
  pJob->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->succTasks) {
    SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans);
D
dapan 已提交
1308 1309
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan 已提交
1310

D
dapan1121 已提交
1311 1312 1313
  pJob->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->failTasks) {
    SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans);
D
dapan1121 已提交
1314 1315 1316
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1317
  tsem_init(&pJob->rspSem, 0, 0);
D
dapan1121 已提交
1318

D
dapan1121 已提交
1319
  code = taosHashPut(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId), &pJob, POINTER_BYTES);
D
dapan1121 已提交
1320 1321
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
1322
      SCH_JOB_ELOG("job already exist, isQueryJob:%d", pJob->attr.queryJob);
D
dapan1121 已提交
1323 1324
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
    } else {
D
dapan1121 已提交
1325 1326
      SCH_JOB_ELOG("taosHashPut job failed, errno:%d", errno);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1327
    }
D
dapan1121 已提交
1328 1329
  }

D
dapan1121 已提交
1330 1331
  pJob->status = JOB_TASK_STATUS_NOT_START;
  SCH_ERR_JRET(schLaunchJob(pJob));
1332

D
dapan1121 已提交
1333
  *(SSchJob **)job = pJob;
D
dapan1121 已提交
1334
  
D
dapan 已提交
1335
  if (syncSchedule) {
D
dapan1121 已提交
1336
    SCH_JOB_DLOG("will wait for rsp now, job status:%d", SCH_GET_JOB_STATUS(pJob));
D
dapan1121 已提交
1337
    tsem_wait(&pJob->rspSem);
D
dapan1121 已提交
1338 1339
  }

D
dapan1121 已提交
1340
  SCH_JOB_DLOG("job exec done, job status:%d", SCH_GET_JOB_STATUS(pJob));
D
dapan1121 已提交
1341
  return TSDB_CODE_SUCCESS;
1342

D
dapan1121 已提交
1343
_return:
D
dapan1121 已提交
1344

D
dapan1121 已提交
1345
  *(SSchJob **)job = NULL;
D
dapan1121 已提交
1346
  schedulerFreeJob(pJob);
D
dapan1121 已提交
1347
  SCH_RET(code);
1348
}
D
dapan1121 已提交
1349

D
dapan1121 已提交
1350 1351 1352 1353 1354 1355 1356
int32_t schCancelJob(SSchJob *pJob) {
  //TODO

  //TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST

}

D
dapan1121 已提交
1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379

int32_t schedulerInit(SSchedulerCfg *cfg) {
  if (schMgmt.jobs) {
    qError("scheduler already initialized");
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  if (cfg) {
    schMgmt.cfg = *cfg;
    
    if (schMgmt.cfg.maxJobNum == 0) {
      schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
    }
  } else {
    schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
  }

  schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == schMgmt.jobs) {
    qError("init schduler jobs failed, num:%u", schMgmt.cfg.maxJobNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1380
  if (taosGetSystemUUID((char *)&schMgmt.sId, sizeof(schMgmt.sId))) {
D
dapan1121 已提交
1381 1382 1383 1384
    qError("generate schdulerId failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR);
  }

D
dapan1121 已提交
1385
  qInfo("scheduler %"PRIx64" initizlized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum);
D
dapan1121 已提交
1386 1387 1388 1389
  
  return TSDB_CODE_SUCCESS;
}

1390
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, const char* sql, SQueryResult *pRes) {
H
Haojun Liao 已提交
1391
  if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
D
dapan1121 已提交
1392 1393 1394
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

1395 1396 1397
  SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true));
  pRes->code = atomic_load_32(&(*pJob)->errCode);
  pRes->numOfRows = (*pJob)->resNumOfRows;
D
dapan1121 已提交
1398
  
D
dapan1121 已提交
1399 1400 1401
  return TSDB_CODE_SUCCESS;
}

1402
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, const char* sql, struct SSchJob** pJob) {
1403
  if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
D
dapan1121 已提交
1404 1405 1406
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

1407
  SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, false));
D
dapan1121 已提交
1408
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1409 1410
}

1411
int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
D
dapan1121 已提交
1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455
  if (NULL == pDag || pDag->numOfSubplans <= 0 || taosArrayGetSize(pDag->pSubplans) <= 0) {
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t levelNum = taosArrayGetSize(pDag->pSubplans);
  if (1 != levelNum) {
    qError("invalid level num: %d", levelNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SArray *plans = taosArrayGet(pDag->pSubplans, 0);
  int32_t taskNum = taosArrayGetSize(plans);
  if (taskNum <= 0) {
    qError("invalid task num: %d", taskNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SArray *info = taosArrayInit(taskNum, sizeof(STaskInfo));
  if (NULL == info) {
    qError("taosArrayInit %d taskInfo failed", taskNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  STaskInfo tInfo = {0};
  char *msg = NULL;
  int32_t msgLen = 0;
  int32_t code = 0;
  
  for (int32_t i = 0; i < taskNum; ++i) {
    SSubplan *plan = taosArrayGetP(plans, i);
    tInfo.addr = plan->execNode;

    code = qSubPlanToString(plan, &msg, &msgLen);
    if (TSDB_CODE_SUCCESS != code || NULL == msg || msgLen <= 0) {
      qError("subplanToString error, code:%x, msg:%p, len:%d", code, msg, msgLen);
      SCH_ERR_JRET(code);
    }

    int32_t msgSize = sizeof(SSubQueryMsg) + msgLen;
    if (NULL == msg) {
      qError("calloc %d failed", msgSize);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
    
L
fix tq  
Liu Jicong 已提交
1456 1457
    SSubQueryMsg* pMsg = calloc(1, msgSize);
    memcpy(pMsg->msg, msg, msgLen);
D
dapan1121 已提交
1458
    
L
Liu Jicong 已提交
1459
    pMsg->header.vgId = tInfo.addr.nodeId;
D
dapan1121 已提交
1460
    
1461 1462 1463
    pMsg->sId      = schMgmt.sId;
    pMsg->queryId  = plan->id.queryId;
    pMsg->taskId   = schGenUUID();
D
dapan1121 已提交
1464
    pMsg->taskType = TASK_TYPE_PERSISTENT;
1465 1466
    pMsg->phyLen   = msgLen;
    pMsg->sqlLen   = 0;
L
fix tq  
Liu Jicong 已提交
1467
    /*memcpy(pMsg->msg, ((SSubQueryMsg*)msg)->msg, msgLen);*/
D
dapan1121 已提交
1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485

    tInfo.msg = pMsg;

    if (NULL == taosArrayPush(info, &tInfo)) {
      qError("taosArrayPush failed, idx:%d", i);
      free(msg);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

  *pTasks = info;
  info = NULL;
  
_return:
  schedulerFreeTaskList(info);
  SCH_RET(code);
}

D
dapan1121 已提交
1486 1487 1488 1489 1490 1491
int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) {
  if (NULL == src || NULL == dst || copyNum <= 0) {
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
H
Haojun Liao 已提交
1492

D
dapan1121 已提交
1493 1494 1495 1496 1497 1498
  *dst = taosArrayInit(copyNum, sizeof(STaskInfo));
  if (NULL == *dst) {
    qError("taosArrayInit %d taskInfo failed", copyNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

1499
  int32_t msgSize = src->msg->phyLen + sizeof(*src->msg);
D
dapan1121 已提交
1500 1501 1502
  STaskInfo info = {0};

  info.addr = src->addr;
H
Haojun Liao 已提交
1503

D
dapan1121 已提交
1504 1505 1506 1507 1508 1509 1510 1511 1512 1513
  for (int32_t i = 0; i < copyNum; ++i) {
    info.msg = malloc(msgSize);
    if (NULL == info.msg) {
      qError("malloc %d failed", msgSize);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    memcpy(info.msg, src->msg, msgSize);

    info.msg->taskId = schGenUUID();
H
Haojun Liao 已提交
1514

D
dapan1121 已提交
1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527
    if (NULL == taosArrayPush(*dst, &info)) {
      qError("taosArrayPush failed, idx:%d", i);
      free(info.msg);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

  return TSDB_CODE_SUCCESS;

_return:

  schedulerFreeTaskList(*dst);
  *dst = NULL;
H
Haojun Liao 已提交
1528

D
dapan1121 已提交
1529 1530 1531
  SCH_RET(code);
}

D
dapan1121 已提交
1532

D
dapan1121 已提交
1533
int32_t schedulerFetchRows(SSchJob *pJob, void** pData) {
1534
  if (NULL == pJob || NULL == pData) {
D
dapan1121 已提交
1535
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan 已提交
1536 1537
  }

1538
  int32_t code = 0;
D
dapan1121 已提交
1539 1540
  atomic_add_fetch_32(&pJob->ref, 1);

D
dapan1121 已提交
1541 1542 1543
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (status == JOB_TASK_STATUS_DROPPING) {
    SCH_JOB_ELOG("job is dropping, status:%d", status);
D
dapan1121 已提交
1544
    SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
1545 1546
  }

D
dapan1121 已提交
1547 1548
  if (!SCH_JOB_NEED_FETCH(&pJob->attr)) {
    SCH_JOB_ELOG("no need to fetch data, status:%d", SCH_GET_JOB_STATUS(pJob));
D
dapan1121 已提交
1549
    SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
1550 1551
  }

D
dapan1121 已提交
1552 1553
  if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) {
    SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch));
D
dapan1121 已提交
1554
    SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
D
dapan 已提交
1555 1556
  }

D
dapan1121 已提交
1557
  if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
D
dapan1121 已提交
1558
    SCH_JOB_ELOG("job failed or dropping, status:%d", status);
D
dapan1121 已提交
1559 1560
    SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
  } else if (status == JOB_TASK_STATUS_SUCCEED) {
D
dapan1121 已提交
1561
    SCH_JOB_ELOG("job already succeed, status:%d", status);
D
dapan1121 已提交
1562 1563 1564
    goto _return;
  } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
    SCH_ERR_JRET(schFetchFromRemote(pJob));
D
dapan 已提交
1565 1566
  }

D
dapan1121 已提交
1567
  tsem_wait(&pJob->rspSem);
D
dapan 已提交
1568

D
dapan1121 已提交
1569
  status = SCH_GET_JOB_STATUS(pJob);
D
dapan 已提交
1570

D
dapan1121 已提交
1571
  if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
D
dapan1121 已提交
1572 1573
    SCH_JOB_ELOG("job failed or dropping, status:%d", status);
    SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
D
dapan 已提交
1574 1575
  }
  
D
dapan1121 已提交
1576 1577
  if (pJob->res && ((SRetrieveTableRsp *)pJob->res)->completed) {
    SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED));
D
dapan 已提交
1578 1579
  }

D
dapan1121 已提交
1580 1581
_return:

D
dapan1121 已提交
1582
  while (true) {
D
dapan1121 已提交
1583 1584
    *pData = atomic_load_ptr(&pJob->res);
    if (*pData != atomic_val_compare_exchange_ptr(&pJob->res, *pData, NULL)) {
D
dapan1121 已提交
1585 1586 1587 1588 1589
      continue;
    }

    break;
  }
D
dapan 已提交
1590

D
dapan1121 已提交
1591 1592 1593 1594 1595 1596 1597
  if (NULL == *pData) {
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp));
    if (rsp) {
      rsp->completed = 1;
    }

    *pData = rsp;
D
dapan1121 已提交
1598
    SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
D
dapan1121 已提交
1599
  }
D
dapan1121 已提交
1600 1601 1602

  atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);

1603
  SCH_JOB_DLOG("fetch done, totalRows:%d, code:%s", pJob->resNumOfRows, tstrerror(code));
D
dapan1121 已提交
1604
  atomic_sub_fetch_32(&pJob->ref, 1);
D
dapan 已提交
1605

D
dapan1121 已提交
1606
  SCH_RET(code);
D
dapan 已提交
1607
}
D
dapan1121 已提交
1608

D
dapan1121 已提交
1609 1610
int32_t scheduleCancelJob(void *job) {
  SSchJob *pJob = (SSchJob *)job;
D
dapan1121 已提交
1611

D
dapan1121 已提交
1612
  atomic_add_fetch_32(&pJob->ref, 1);
D
dapan1121 已提交
1613

D
dapan1121 已提交
1614 1615 1616 1617 1618
  int32_t code = schCancelJob(pJob);

  atomic_sub_fetch_32(&pJob->ref, 1);

  SCH_RET(code);
D
dapan1121 已提交
1619 1620
}

D
dapan1121 已提交
1621
void schedulerFreeJob(void *job) {
D
dapan1121 已提交
1622
  if (NULL == job) {
D
dapan 已提交
1623 1624
    return;
  }
D
dapan1121 已提交
1625

D
dapan1121 已提交
1626
  SSchJob *pJob = job;
D
dapan1121 已提交
1627
  uint64_t queryId = pJob->queryId;
D
dapan1121 已提交
1628
  bool setJobFree = false;
D
dapan1121 已提交
1629

D
dapan1121 已提交
1630 1631 1632 1633 1634
  if (SCH_GET_JOB_STATUS(pJob) > 0) {
    if (0 != taosHashRemove(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId))) {
      SCH_JOB_ELOG("taosHashRemove job from list failed, may already freed, pJob:%p", pJob);
      return;
    }
D
dapan1121 已提交
1635

D
dapan1121 已提交
1636
    SCH_JOB_DLOG("job removed from list, no further ref, ref:%d", atomic_load_32(&pJob->ref));
D
dapan1121 已提交
1637

D
dapan1121 已提交
1638 1639 1640 1641 1642
    while (true) {
      int32_t ref = atomic_load_32(&pJob->ref);
      if (0 == ref) {
        break;
      } else if (ref > 0) {
D
dapan1121 已提交
1643 1644 1645 1646
        if (1 == ref && atomic_load_8(&pJob->userFetch) > 0 && !setJobFree) {
          schProcessOnJobDropped(pJob, TSDB_CODE_QRY_JOB_FREED);
          setJobFree = true;
        }
H
Haojun Liao 已提交
1647

D
dapan1121 已提交
1648 1649
        usleep(1);
      } else {
D
dapan1121 已提交
1650 1651
        SCH_JOB_ELOG("invalid job ref number, ref:%d", ref);
        break;
D
dapan1121 已提交
1652
      }
D
dapan1121 已提交
1653
    }
D
dapan1121 已提交
1654

D
dapan1121 已提交
1655
    SCH_JOB_DLOG("job no ref now, status:%d", SCH_GET_JOB_STATUS(pJob));
D
dapan1121 已提交
1656

D
dapan1121 已提交
1657 1658 1659
    if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
      schCancelJob(pJob);
    }
1660

D
dapan1121 已提交
1661 1662
    schDropJobAllTasks(pJob);
  }
D
dapan1121 已提交
1663 1664 1665 1666

  pJob->subPlans = NULL; // it is a reference to pDag->pSubplans
  
  int32_t numOfLevels = taosArrayGetSize(pJob->levels);
1667
  for(int32_t i = 0; i < numOfLevels; ++i) {
D
dapan1121 已提交
1668
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
1669 1670 1671 1672

    int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
    for(int32_t j = 0; j < numOfTasks; ++j) {
      SSchTask* pTask = taosArrayGet(pLevel->subTasks, j);
D
dapan1121 已提交
1673
      schFreeTask(pTask);
1674 1675 1676 1677
    }

    taosArrayDestroy(pLevel->subTasks);
  }
D
dapan1121 已提交
1678
  
D
dapan1121 已提交
1679 1680 1681
  taosHashCleanup(pJob->execTasks);
  taosHashCleanup(pJob->failTasks);
  taosHashCleanup(pJob->succTasks);
D
dapan1121 已提交
1682
  
D
dapan1121 已提交
1683
  taosArrayDestroy(pJob->levels);
D
dapan1121 已提交
1684
  taosArrayDestroy(pJob->nodeList);
D
dapan1121 已提交
1685 1686

  tfree(pJob->res);
D
dapan1121 已提交
1687
  
D
dapan1121 已提交
1688
  tfree(pJob);
D
dapan1121 已提交
1689

1690
  qDebug("QID:0x%"PRIx64" job freed", queryId);
D
dapan1121 已提交
1691
}
D
dapan1121 已提交
1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705

void schedulerFreeTaskList(SArray *taskList) {
  if (NULL == taskList) {
    return;
  }

  int32_t taskNum = taosArrayGetSize(taskList);
  for (int32_t i = 0; i < taskNum; ++i) {
    STaskInfo *info = taosArrayGet(taskList, i);
    tfree(info->msg);
  }

  taosArrayDestroy(taskList);
}
D
dapan1121 已提交
1706 1707 1708 1709 1710 1711 1712 1713
  
void schedulerDestroy(void) {
  if (schMgmt.jobs) {
    taosHashCleanup(schMgmt.jobs); //TODO
    schMgmt.jobs = NULL;
  }
}