scheduler.c 28.1 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15
 */

D
dapan1121 已提交
16
#include "schedulerInt.h"
H
Hongze Cheng 已提交
17
#include "tmsg.h"
18
#include "query.h"
D
dapan 已提交
19
#include "catalog.h"
20 21

SSchedulerMgmt schMgmt = {0};
D
dapan1121 已提交
22 23


D
dapan 已提交
24
int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
D
dapan1121 已提交
25
  for (int32_t i = 0; i < job->levelNum; ++i) {
D
dapan 已提交
26
    SSchLevel *level = taosArrayGet(job->levels, i);
D
dapan 已提交
27
    
D
dapan1121 已提交
28
    for (int32_t m = 0; m < level->taskNum; ++m) {
D
dapan 已提交
29
      SSchTask *task = taosArrayGet(level->subTasks, m);
D
dapan1121 已提交
30
      SSubplan *plan = task->plan;
D
dapan1121 已提交
31 32
      int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0;
      int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0;
D
dapan1121 已提交
33 34

      if (childNum > 0) {
D
dapan 已提交
35 36
        task->children = taosArrayInit(childNum, POINTER_BYTES);
        if (NULL == task->children) {
D
dapan1121 已提交
37 38 39 40 41 42
          qError("taosArrayInit %d failed", childNum);
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      for (int32_t n = 0; n < childNum; ++n) {
D
dapan 已提交
43
        SSubplan **child = taosArrayGet(plan->pChildern, n);
D
dapan 已提交
44
        SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES);
D
dapan 已提交
45
        if (NULL == childTask || NULL == *childTask) {
D
dapan1121 已提交
46 47 48 49
          qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan 已提交
50
        if (NULL == taosArrayPush(task->children, childTask)) {
D
dapan1121 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64
          qError("taosArrayPush failed");
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      if (parentNum > 0) {
        task->parents = taosArrayInit(parentNum, POINTER_BYTES);
        if (NULL == task->parents) {
          qError("taosArrayInit %d failed", parentNum);
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      for (int32_t n = 0; n < parentNum; ++n) {
D
dapan 已提交
65
        SSubplan **parent = taosArrayGet(plan->pParents, n);
D
dapan 已提交
66
        SSchTask **parentTask = taosHashGet(planToTask, parent, POINTER_BYTES);
D
dapan 已提交
67
        if (NULL == parentTask || NULL == *parentTask) {
D
dapan1121 已提交
68 69 70 71
          qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan 已提交
72
        if (NULL == taosArrayPush(task->parents, parentTask)) {
D
dapan1121 已提交
73 74 75 76 77 78 79
          qError("taosArrayPush failed");
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }      
    }
  }

D
dapan 已提交
80 81
  SSchLevel *level = taosArrayGet(job->levels, 0);
  if (job->attr.queryJob && level->taskNum > 1) {
D
dapan 已提交
82 83 84 85
    qError("invalid plan info, level 0, taskNum:%d", level->taskNum);
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

D
dapan 已提交
86
  SSchTask *task = taosArrayGet(level->subTasks, 0);
D
dapan 已提交
87 88 89 90 91 92
  if (task->parents && taosArrayGetSize(task->parents) > 0) {
    qError("invalid plan info, level 0, parentNum:%d", (int32_t)taosArrayGetSize(task->parents));
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }


D
dapan1121 已提交
93 94 95 96
  return TSDB_CODE_SUCCESS;
}


D
dapan 已提交
97
int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
D
dapan1121 已提交
98
  int32_t code = 0;
D
dapan 已提交
99

D
dapan1121 已提交
100 101
  job->queryId = dag->queryId;
  
D
dapan 已提交
102 103 104 105 106
  if (dag->numOfSubplans <= 0) {
    qError("invalid subplan num:%d", dag->numOfSubplans);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
  
107 108 109
  int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans);
  if (levelNum <= 0) {
    qError("invalid level num:%d", levelNum);
D
dapan1121 已提交
110
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
111 112
  }

D
dapan1121 已提交
113 114 115 116 117 118
  SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == planToTask) {
    qError("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  
D
dapan 已提交
119
  job->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
120 121
  if (NULL == job->levels) {
    qError("taosArrayInit %d failed", levelNum);
D
dapan1121 已提交
122
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
123 124
  }

D
dapan1121 已提交
125 126
  job->attr.needFetch = true;
  
127 128 129 130 131
  job->levelNum = levelNum;
  job->levelIdx = levelNum - 1;

  job->subPlans = dag->pSubplans;

D
dapan 已提交
132
  SSchLevel level = {0};
133 134
  SArray *levelPlans = NULL;
  int32_t levelPlanNum = 0;
D
dapan 已提交
135
  SSchLevel *pLevel = NULL;
136

D
dapan1121 已提交
137
  level.status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
138

139
  for (int32_t i = 0; i < levelNum; ++i) {
D
dapan1121 已提交
140 141 142 143 144 145 146 147
    if (NULL == taosArrayPush(job->levels, &level)) {
      qError("taosArrayPush failed");
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    pLevel = taosArrayGet(job->levels, i);
  
    pLevel->level = i;
148 149 150
    levelPlans = taosArrayGetP(dag->pSubplans, i);
    if (NULL == levelPlans) {
      qError("no level plans for level %d", i);
D
dapan1121 已提交
151
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
152 153 154 155 156
    }

    levelPlanNum = (int32_t)taosArrayGetSize(levelPlans);
    if (levelPlanNum <= 0) {
      qError("invalid level plans number:%d, level:%d", levelPlanNum, i);
D
dapan1121 已提交
157
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
158 159
    }

D
dapan1121 已提交
160
    pLevel->taskNum = levelPlanNum;
D
dapan1121 已提交
161
    
D
dapan 已提交
162
    pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SSchTask));
D
dapan1121 已提交
163
    if (NULL == pLevel->subTasks) {
D
dapan1121 已提交
164
      qError("taosArrayInit %d failed", levelPlanNum);
D
dapan1121 已提交
165
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
166 167 168
    }
    
    for (int32_t n = 0; n < levelPlanNum; ++n) {
X
Xiaoyu Wang 已提交
169
      SSubplan *plan = taosArrayGetP(levelPlans, n);
D
dapan 已提交
170
      SSchTask task = {0};
D
dapan1121 已提交
171 172 173

      if (plan->type == QUERY_TYPE_MODIFY) {
        job->attr.needFetch = false;
D
dapan 已提交
174 175
      } else {
        job->attr.queryJob = true;
D
dapan1121 已提交
176
      }
D
dapan 已提交
177

D
dapan1121 已提交
178
      
D
dapan1121 已提交
179 180
      task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
      task.plan = plan;
D
dapan1121 已提交
181
      task.level = pLevel;
182
      task.status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
183

D
dapan1121 已提交
184
      void *p = taosArrayPush(pLevel->subTasks, &task);
D
dapan1121 已提交
185 186 187 188 189 190
      if (NULL == p) {
        qError("taosArrayPush failed");
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
      
      if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
D
dapan1121 已提交
191 192 193
        qError("taosHashPut failed");
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
194
    }
195 196 197

  }

D
dapan1121 已提交
198 199 200 201 202 203
  SCH_ERR_JRET(schBuildTaskRalation(job, planToTask));

  if (planToTask) {
    taosHashCleanup(planToTask);
  }
  
204
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
205 206

_return:
D
dapan1121 已提交
207 208
  if (pLevel->subTasks) {
    taosArrayDestroy(pLevel->subTasks);
D
dapan1121 已提交
209 210
  }

D
dapan1121 已提交
211 212 213 214
  if (planToTask) {
    taosHashCleanup(planToTask);
  }

D
dapan1121 已提交
215
  SCH_RET(code);
216 217
}

D
dapan 已提交
218
int32_t schSetTaskExecEpSet(SSchJob *job, SEpSet *epSet) {  
D
dapan 已提交
219
  if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) {
D
dapan 已提交
220 221 222
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
  int32_t qnodeNum = taosArrayGetSize(job->qnodeList);
  
  for (int32_t i = 0; i < qnodeNum && epSet->numOfEps < tListLen(epSet->port); ++i) {
    SEpAddr *addr = taosArrayGet(job->qnodeList, i);
    
    strncpy(epSet->fqdn[epSet->numOfEps], addr->fqdn, sizeof(addr->fqdn));
    epSet->port[epSet->numOfEps] = addr->port;
    
    ++epSet->numOfEps;
  }

  for (int32_t i = 0; i < job->dataSrcEps.numOfEps && epSet->numOfEps < tListLen(epSet->port); ++i) {
    strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
    epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
    
    ++epSet->numOfEps;
D
dapan 已提交
239
  }
D
dapan 已提交
240 241

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
242
}
D
dapan1121 已提交
243

D
dapan1121 已提交
244

D
dapan 已提交
245
int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) {
D
dapan 已提交
246 247 248 249 250 251 252 253
  if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
    qError("taosHashPut failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
254
int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) {
D
dapan 已提交
255 256 257 258 259
  if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
    qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
260 261 262 263 264 265 266 267 268 269
  if (0 != taosHashPut(job->succTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
    qError("taosHashPut failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
  
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
270
int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) {
D
dapan1121 已提交
271
  if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
D
dapan 已提交
272
    qWarn("remove task[%"PRIx64"] from execTasks failed, it may not exist", task->taskId);
D
dapan1121 已提交
273 274 275
  }

  if (0 != taosHashPut(job->failTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
D
dapan 已提交
276 277 278 279 280 281 282 283 284
    qError("taosHashPut failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
  
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
285
int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) {
D
dapan1121 已提交
286 287
  // TODO set retry or not based on task type/errCode/retry times/job status/available eps...
  // TODO if needRetry, set task retry info
D
dapan 已提交
288

D
dapan1121 已提交
289 290 291
  *needRetry = false;

  return TSDB_CODE_SUCCESS;
D
dapan 已提交
292 293
}

D
dapan 已提交
294

D
dapan 已提交
295
int32_t schFetchFromRemote(SSchJob *job) {
D
dapan 已提交
296 297 298 299 300 301 302
  int32_t code = 0;
  
  if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) {
    qInfo("prior fetching not finished");
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
303
  SCH_ERR_JRET(schBuildAndSendMsg(job, job->fetchTask, TDMT_VND_FETCH));
D
dapan 已提交
304 305 306 307 308

  return TSDB_CODE_SUCCESS;
  
_return:
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
D
dapan 已提交
309

D
dapan 已提交
310
  return code;
D
dapan 已提交
311 312
}

D
dapan 已提交
313

D
dapan 已提交
314
int32_t schProcessOnJobPartialSuccess(SSchJob *job) {
D
dapan1121 已提交
315 316
  job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;

D
dapan 已提交
317
  if ((!job->attr.needFetch) && job->attr.syncSchedule) {
D
dapan1121 已提交
318 319
    tsem_post(&job->rspSem);
  }
D
dapan1121 已提交
320
  
D
dapan 已提交
321 322 323
  if (job->userFetch) {
    SCH_ERR_RET(schFetchFromRemote(job));
  }
D
dapan 已提交
324

D
dapan 已提交
325
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
326 327
}

D
dapan 已提交
328
int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) {
D
dapan1121 已提交
329
  job->status = JOB_TASK_STATUS_FAILED;
D
dapan 已提交
330
  job->errCode = errCode;
D
dapan1121 已提交
331

D
dapan 已提交
332 333
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);

D
dapan 已提交
334
  if (job->userFetch || ((!job->attr.needFetch) && job->attr.syncSchedule)) {
D
dapan 已提交
335 336
    tsem_post(&job->rspSem);
  }
D
dapan 已提交
337

D
dapan 已提交
338 339 340
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
341
int32_t schProcessOnDataFetched(SSchJob *job) {
D
dapan 已提交
342 343 344
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);

  tsem_post(&job->rspSem);
D
dapan 已提交
345 346 347
}


D
dapan 已提交
348
int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
D
dapan 已提交
349 350 351 352
  bool moved = false;
  
  SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved));
  if (!moved) {
D
dapan 已提交
353
    SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
D
dapan 已提交
354 355
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
356

D
dapan1121 已提交
357
  task->status = JOB_TASK_STATUS_SUCCEED;
D
dapan 已提交
358
  
D
dapan 已提交
359
  int32_t parentNum = task->parents ? (int32_t)taosArrayGetSize(task->parents) : 0;
D
dapan 已提交
360 361 362 363 364 365
  if (parentNum == 0) {
    if (task->plan->level != 0) {
      qError("level error");
      SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
    }

D
dapan1121 已提交
366 367 368 369
    int32_t taskDone = 0;
    
    if (SCH_TASK_NEED_WAIT_ALL(task)) {
      SCH_LOCK(SCH_WRITE, &task->level->lock);
D
dapan 已提交
370
      task->level->taskSucceed++;
D
dapan1121 已提交
371 372 373 374 375 376 377 378 379 380
      taskDone = task->level->taskSucceed + task->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &task->level->lock);
      
      if (taskDone < task->level->taskNum) {
        qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum);
        return TSDB_CODE_SUCCESS;
      }

      if (task->level->taskFailed > 0) {
        job->status = JOB_TASK_STATUS_FAILED;
D
dapan 已提交
381
        SCH_ERR_RET(schProcessOnJobFailure(job, TSDB_CODE_QRY_APP_ERROR));
D
dapan1121 已提交
382 383 384 385 386 387 388

        return TSDB_CODE_SUCCESS;
      }
    } else {
      strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn));
      job->resEp.port = task->execAddr.port;
    }
D
dapan 已提交
389

D
dapan 已提交
390 391
    job->fetchTask = task;
    
D
dapan1121 已提交
392
    SCH_ERR_RET(schProcessOnJobPartialSuccess(job));
D
dapan 已提交
393 394 395 396

    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
397
  if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) {
D
dapan 已提交
398 399 400 401 402 403
    strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
    job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;

    ++job->dataSrcEps.numOfEps;
  }

D
dapan 已提交
404
  for (int32_t i = 0; i < parentNum; ++i) {
D
dapan 已提交
405
    SSchTask *par = *(SSchTask **)taosArrayGet(task->parents, i);
D
dapan 已提交
406 407 408

    ++par->childReady;

D
dapan 已提交
409
    SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, task->plan->id.templateId, &task->execAddr));
D
dapan 已提交
410 411
    
    if (SCH_TASK_READY_TO_LUNCH(par)) {
D
dapan 已提交
412
      SCH_ERR_RET(schLaunchTask(job, par));
D
dapan 已提交
413 414 415 416 417 418
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
419
int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
D
dapan 已提交
420
  bool needRetry = false;
D
dapan1121 已提交
421 422
  bool moved = false;
  int32_t taskDone = 0;
D
dapan 已提交
423 424 425 426
  SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry));
  
  if (!needRetry) {
    SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode);
D
dapan1121 已提交
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443

    SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved));
    if (!moved) {
      SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
    }    
    
    if (SCH_TASK_NEED_WAIT_ALL(task)) {
      SCH_LOCK(SCH_WRITE, &task->level->lock);
      task->level->taskFailed++;
      taskDone = task->level->taskSucceed + task->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &task->level->lock);
      
      if (taskDone < task->level->taskNum) {
        qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum);
        return TSDB_CODE_SUCCESS;
      }
    }
D
dapan 已提交
444
    
D
dapan1121 已提交
445
    job->status = JOB_TASK_STATUS_FAILED;
D
dapan 已提交
446
    SCH_ERR_RET(schProcessOnJobFailure(job, errCode));
D
dapan 已提交
447 448 449 450

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
451
  SCH_ERR_RET(schLaunchTask(job, task));
D
dapan 已提交
452 453 454 455

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
456
int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
D
dapan1121 已提交
457 458 459
  int32_t code = 0;
  
  switch (msgType) {
D
dapan1121 已提交
460
    case TDMT_VND_SUBMIT_RSP: {
D
dapan1121 已提交
461 462 463 464
        SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
        if (rsp->code != TSDB_CODE_SUCCESS) {
          SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
        } else {
D
dapan 已提交
465
          job->resNumOfRows += rsp->affectedRows;
D
dapan1121 已提交
466 467 468 469 470
          
          code = schProcessOnTaskSuccess(job, task);
          if (code) {
            goto _task_error;
          }               
D
dapan1121 已提交
471
        }
D
dapan1121 已提交
472
        break;
D
dapan1121 已提交
473
      }
D
dapan1121 已提交
474
    case TDMT_VND_QUERY_RSP: {
D
dapan1121 已提交
475 476 477 478 479
        SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
        
        if (rsp->code != TSDB_CODE_SUCCESS) {
          SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
        } else {
D
dapan1121 已提交
480
          code = schBuildAndSendMsg(job, task, TDMT_VND_RES_READY);
D
dapan1121 已提交
481 482 483 484 485 486
          if (code) {
            goto _task_error;
          }
        }
        break;
      }
D
dapan1121 已提交
487
    case TDMT_VND_RES_READY_RSP: {
D
dapan1121 已提交
488 489 490 491 492 493 494 495 496 497 498 499
        SResReadyRsp *rsp = (SResReadyRsp *)msg;
        
        if (rsp->code != TSDB_CODE_SUCCESS) {
          SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
        } else {
          code = schProcessOnTaskSuccess(job, task);
          if (code) {
            goto _task_error;
          }        
        }
        break;
      }
D
dapan1121 已提交
500
    case TDMT_VND_FETCH_RSP: {
D
dapan1121 已提交
501 502 503 504 505 506 507 508
        SCH_ERR_JRET(rspCode);
        SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;

        job->res = rsp;
        job->resNumOfRows = rsp->numOfRows;
        
        SCH_ERR_JRET(schProcessOnDataFetched(job));
        break;
D
dapan1121 已提交
509
      }
D
dapan1121 已提交
510 511 512
    case TDMT_VND_DROP_TASK: {

      }
D
dapan1121 已提交
513 514 515 516 517 518 519 520 521 522 523 524
    default:
      qError("unknown msg type:%d received", msgType);
      return TSDB_CODE_QRY_INVALID_INPUT;
  }

  return TSDB_CODE_SUCCESS;

_task_error:
  SCH_ERR_JRET(schProcessOnTaskFailure(job, task, code));
  return TSDB_CODE_SUCCESS;

_return:
D
dapan 已提交
525
  code = schProcessOnJobFailure(job, code);
D
dapan1121 已提交
526 527 528
  return code;
}

D
dapan 已提交
529

D
dapan1121 已提交

int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) {
  int32_t code = 0;
  SSchCallbackParam *pParam = (SSchCallbackParam *)param;
  
  SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId));
  if (NULL == job || NULL == (*job)) {
    qError("taosHashGet queryId:%"PRIx64" not exist", pParam->queryId);
    SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

  SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId));
  if (NULL == task || NULL == (*task)) {
    qError("taosHashGet taskId:%"PRIx64" not exist", pParam->taskId);
    SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
  
  schProcessRspMsg(*job, *task, msgType, pMsg->pData, pMsg->len, rspCode);

_return:  
  tfree(param);

  SCH_RET(code);
}

int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
}
int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
}
int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
}
int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {  
  SSchCallbackParam *pParam = (SSchCallbackParam *)param;
  qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code);
}

int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
  switch (msgType) {
    case TDMT_VND_SUBMIT: 
      *fp = schHandleSubmitCallback;
      break;
    case TDMT_VND_QUERY: 
      *fp = schHandleQueryCallback;
      break;
    case TDMT_VND_RES_READY: 
      *fp = schHandleReadyCallback;
      break;
    case TDMT_VND_FETCH: 
      *fp = schHandleFetchCallback;
      break;
    case TDMT_VND_DROP_TASK:
      *fp = schHandleDropCallback;
      break;
    default:
      qError("unknown msg type:%d", msgType);
      SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

  return TSDB_CODE_SUCCESS;
}


int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) {
  int32_t code = 0;
  SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
  if (NULL == pMsgSendInfo) {
    qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SSchCallbackParam *param = calloc(1, sizeof(SSchCallbackParam));
  if (NULL == param) {
    qError("calloc %d failed", (int32_t)sizeof(SSchCallbackParam));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));

  param->queryId = qId;
  param->taskId = tId;

  pMsgSendInfo->param = param;
  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
  pMsgSendInfo->msgType = msgType;

  pMsgSendInfo->fp = fp;
  
  int64_t  transporterId = 0;
  SCH_ERR_JRET(asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo));
  
  return TSDB_CODE_SUCCESS;

_return:
  tfree(param);
  tfree(pMsgSendInfo);

  SCH_RET(code);
}


int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
  uint32_t msgSize = 0;
  void *msg = NULL;
  int32_t code = 0;
  
  switch (msgType) {
    case TDMT_VND_SUBMIT: {
      if (NULL == task->msg || task->msgLen <= 0) {
        qError("submit msg is NULL");
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      }

      msgSize = task->msgLen;
      msg = task->msg;
      break;
    }
    case TDMT_VND_QUERY: {
      if (NULL == task->msg) {
        qError("query msg is NULL");
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      }

      msgSize = sizeof(SSubQueryMsg) + task->msgLen;
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      SSubQueryMsg *pMsg = msg;
      
      pMsg->sId = htobe64(schMgmt.sId);
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);
      pMsg->contentLen = htonl(task->msgLen);
      memcpy(pMsg->msg, task->msg, task->msgLen);
      break;
    }    
    case TDMT_VND_RES_READY: {
      msgSize = sizeof(SResReadyMsg);
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      SResReadyMsg *pMsg = msg;
      pMsg->sId = htobe64(schMgmt.sId);      
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);      
      break;
    }
    case TDMT_VND_FETCH: {
      if (NULL == task) {
        SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      msgSize = sizeof(SResFetchMsg);
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
    
      SResFetchMsg *pMsg = msg;
      pMsg->sId = htobe64(schMgmt.sId);      
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);      
      break;
    }
    case TDMT_VND_DROP_TASK:{
      msgSize = sizeof(STaskDropMsg);
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
    
      STaskDropMsg *pMsg = msg;
      pMsg->sId = htobe64(schMgmt.sId);      
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);      
      break;
    }
    default:
      qError("unknown msg type:%d", msgType);
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      break;
  }

  SCH_ERR_JRET(schAsyncSendMsg(job->transport, &task->plan->execEpSet, job->queryId, task->taskId, msgType, msg, msgSize));

  return TSDB_CODE_SUCCESS;

_return:

  tfree(msg);
  SCH_RET(code);
}
D
dapan1121 已提交
735 736


D
dapan 已提交
737
int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
D
dapan1121 已提交
738
  SSubplan *plan = task->plan;
D
dapan1121 已提交
739
  SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen));
D
dapan 已提交
740
  if (plan->execEpSet.numOfEps <= 0) {
D
dapan1121 已提交
741 742 743 744 745 746
    SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet));
  }

  if (plan->execEpSet.numOfEps <= 0) {
    SCH_TASK_ERR_LOG("invalid execEpSet num:%d", plan->execEpSet.numOfEps);
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
D
dapan1121 已提交
747
  }
D
dapan1121 已提交
748

H
Hongze Cheng 已提交
749
  int32_t msgType = (plan->type == QUERY_TYPE_MODIFY) ? TDMT_VND_SUBMIT : TDMT_VND_QUERY;
D
dapan 已提交
750
  
D
dapan1121 已提交
751
  SCH_ERR_RET(schBuildAndSendMsg(job, task, msgType));
D
dapan 已提交
752

D
dapan 已提交
753
  SCH_ERR_RET(schPushTaskToExecList(job, task));
D
dapan1121 已提交
754

D
dapan1121 已提交
755
  task->status = JOB_TASK_STATUS_EXECUTING;
D
dapan1121 已提交
756

D
dapan1121 已提交
757 758 759
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
760 761


D
dapan 已提交
762 763
int32_t schLaunchJob(SSchJob *job) {
  SSchLevel *level = taosArrayGet(job->levels, job->levelIdx);
D
dapan 已提交
764
  for (int32_t i = 0; i < level->taskNum; ++i) {
D
dapan 已提交
765
    SSchTask *task = taosArrayGet(level->subTasks, i);
D
dapan1121 已提交
766
    SCH_ERR_RET(schLaunchTask(job, task));
D
dapan1121 已提交
767
  }
D
dapan1121 已提交
768

D
dapan1121 已提交
769
  job->status = JOB_TASK_STATUS_EXECUTING;
D
dapan 已提交
770
  
D
dapan1121 已提交
771
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
772 773
}

D
dapan 已提交
774
void schDropJobAllTasks(SSchJob *job) {
D
dapan1121 已提交
775 776
  void *pIter = taosHashIterate(job->succTasks, NULL);
  while (pIter) {
D
dapan 已提交
777
    SSchTask *task = *(SSchTask **)pIter;
D
dapan1121 已提交
778
  
D
dapan1121 已提交
779
    schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK);
D
dapan1121 已提交
780
    
D
dapan1121 已提交
781 782 783 784 785
    pIter = taosHashIterate(job->succTasks, pIter);
  }  

  pIter = taosHashIterate(job->failTasks, NULL);
  while (pIter) {
D
dapan 已提交
786
    SSchTask *task = *(SSchTask **)pIter;
D
dapan1121 已提交
787
  
D
dapan1121 已提交
788
    schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK);
D
dapan1121 已提交
789 790
    
    pIter = taosHashIterate(job->succTasks, pIter);
D
dapan1121 已提交
791 792
  }  
}
793

D
dapan1121 已提交
794 795 796 797 798 799 800 801 802 803 804
uint64_t schGenSchId(void) {
  uint64_t sId = 0;

  // TODO

  qDebug("Gen sId:0x%"PRIx64, sId);

  return sId;
}


805
int32_t schedulerInit(SSchedulerCfg *cfg) {
D
dapan1121 已提交
806 807 808 809 810
  if (schMgmt.jobs) {
    qError("scheduler already init");
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

D
dapan 已提交
811 812
  if (cfg) {
    schMgmt.cfg = *cfg;
D
dapan1121 已提交
813 814 815 816
    
    if (schMgmt.cfg.maxJobNum <= 0) {
      schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
    }
D
dapan1121 已提交
817 818
  } else {
    schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
D
dapan 已提交
819 820
  }

D
dapan1121 已提交
821 822
  schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == schMgmt.jobs) {
D
dapan1121 已提交
823 824 825
    SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum);
  }

D
dapan1121 已提交
826
  schMgmt.sId = schGenSchId();
D
dapan1121 已提交
827
  
828 829 830 831
  return TSDB_CODE_SUCCESS;
}


D
dapan 已提交
832
int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) {
D
dapan1121 已提交
833
  if (qnodeList && taosArrayGetSize(qnodeList) <= 0) {
D
dapan1121 已提交
834 835 836
    qInfo("qnodeList is empty");
  }

D
dapan1121 已提交
837
  int32_t code = 0;
D
dapan 已提交
838
  SSchJob *job = calloc(1, sizeof(SSchJob));
839
  if (NULL == job) {
D
dapan1121 已提交
840
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
841 842
  }

D
dapan 已提交
843
  job->attr.syncSchedule = syncSchedule;
D
dapan1121 已提交
844 845
  job->transport = transport;
  job->qnodeList = qnodeList;
D
dapan 已提交
846

D
dapan1121 已提交
847
  SCH_ERR_JRET(schValidateAndBuildJob(pDag, job));
D
dapan1121 已提交
848

D
dapan 已提交
849 850 851 852 853 854 855 856 857 858 859
  job->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == job->execTasks) {
    qError("taosHashInit %d failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  job->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == job->succTasks) {
    qError("taosHashInit %d failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan 已提交
860

D
dapan1121 已提交
861 862 863 864 865 866
  job->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == job->failTasks) {
    qError("taosHashInit %d failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan 已提交
867
  tsem_init(&job->rspSem, 0, 0);
D
dapan1121 已提交
868 869 870 871 872 873 874 875 876 877

  code = taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      qError("taosHashPut queryId:%"PRIx64" already exist", job->queryId);
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
    } else {
      qError("taosHashPut queryId:%"PRIx64" failed", job->queryId);
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }
D
dapan1121 已提交
878 879
  }

D
dapan1121 已提交
880
  job->status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
881
  
D
dapan1121 已提交
882
  SCH_ERR_JRET(schLaunchJob(job));
883

D
dapan 已提交
884
  *(SSchJob **)pJob = job;
D
dapan1121 已提交
885

D
dapan 已提交
886
  if (syncSchedule) {
D
dapan1121 已提交
887 888 889
    tsem_wait(&job->rspSem);
  }

D
dapan1121 已提交
890
  return TSDB_CODE_SUCCESS;
891

D
dapan1121 已提交
892
_return:
893

D
dapan 已提交
894
  *(SSchJob **)pJob = NULL;
D
dapan1121 已提交
895 896 897
  scheduleFreeJob(job);
  
  SCH_RET(code);
898
}
D
dapan1121 已提交
899

D
dapan1121 已提交
900
int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows) {
D
dapan1121 已提交
901 902 903 904
  if (NULL == transport || /* NULL == qnodeList || */ NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == numOfRows) {
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
905 906 907 908
  *numOfRows = 0;
  
  SCH_ERR_RET(scheduleExecJobImpl(transport, qnodeList, pDag, pJob, true));

D
dapan 已提交
909
  SSchJob *job = *(SSchJob **)pJob;
D
dapan1121 已提交
910 911 912 913 914 915 916
  
  *numOfRows = job->resNumOfRows;

  return TSDB_CODE_SUCCESS;
}

int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) {
D
dapan1121 已提交
917 918 919 920
  if (NULL == transport || NULL == qnodeList ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
921 922 923 924
  return scheduleExecJobImpl(transport, qnodeList, pDag, pJob, false);
}


D
dapan1121 已提交
925 926
int32_t scheduleFetchRows(void *pJob, void **data) {
  if (NULL == pJob || NULL == data) {
D
dapan1121 已提交
927
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan 已提交
928 929
  }

D
dapan 已提交
930
  SSchJob *job = pJob;
D
dapan 已提交
931
  int32_t code = 0;
D
dapan 已提交
932

D
dapan1121 已提交
933 934 935 936 937
  if (!job->attr.needFetch) {
    qError("no need to fetch data");
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

D
dapan 已提交
938 939 940 941 942
  if (job->status == JOB_TASK_STATUS_FAILED) {
    job->res = NULL;
    SCH_RET(job->errCode);
  }

D
dapan1121 已提交
943 944 945 946 947
  if (job->status == JOB_TASK_STATUS_SUCCEED) {
    job->res = NULL;
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
948 949
  if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) {
    qError("prior fetching not finished");
D
dapan1121 已提交
950
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan 已提交
951 952
  }

D
dapan1121 已提交
953
  if (job->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
D
dapan 已提交
954 955
    SCH_ERR_JRET(schFetchFromRemote(job));
  }
D
dapan 已提交
956 957 958

  tsem_wait(&job->rspSem);

D
dapan 已提交
959 960 961 962 963 964 965 966
  if (job->status == JOB_TASK_STATUS_FAILED) {
    code = job->errCode;
  }
  
  if (job->res && ((SRetrieveTableRsp *)job->res)->completed) {
    job->status = JOB_TASK_STATUS_SUCCEED;
  }

D
dapan 已提交
967
  *data = job->res;
D
dapan 已提交
968
  job->res = NULL;
D
dapan 已提交
969

D
dapan 已提交
970 971 972
_return:
  atomic_val_compare_exchange_32(&job->userFetch, 1, 0);

D
dapan1121 已提交
973
  SCH_RET(code);
D
dapan 已提交
974
}
D
dapan1121 已提交
975

D
dapan1121 已提交
976 977
int32_t scheduleCancelJob(void *pJob) {
  //TODO
D
dapan1121 已提交
978

D
dapan1121 已提交
979 980
  //TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST

D
dapan1121 已提交
981 982 983 984 985
  return TSDB_CODE_SUCCESS;
}

void scheduleFreeJob(void *pJob) {
  if (NULL == pJob) {
D
dapan 已提交
986 987
    return;
  }
D
dapan1121 已提交
988

D
dapan 已提交
989
  SSchJob *job = pJob;
D
dapan1121 已提交
990 991

  if (job->status > 0) {
D
dapan1121 已提交
992
    if (0 != taosHashRemove(schMgmt.jobs, &job->queryId, sizeof(job->queryId))) {
D
dapan1121 已提交
993 994 995 996
      qError("remove job:%"PRIx64"from mgmt failed", job->queryId); // maybe already freed
      return;
    }

D
dapan1121 已提交
997
    if (job->status == JOB_TASK_STATUS_EXECUTING) {
D
dapan1121 已提交
998 999
      scheduleCancelJob(pJob);
    }
D
dapan1121 已提交
1000 1001

    schDropJobAllTasks(job);
D
dapan1121 已提交
1002 1003 1004
  }
  
  //TODO free job
D
dapan1121 已提交
1005 1006
}

1007
void schedulerDestroy(void) {
D
dapan1121 已提交
1008 1009 1010
  if (schMgmt.jobs) {
    taosHashCleanup(schMgmt.jobs); //TODO
    schMgmt.jobs = NULL;
1011 1012 1013
  }
}

D
dapan1121 已提交
1014