scheduler.c 28.1 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15
 */

D
dapan1121 已提交
16
#include "schedulerInt.h"
H
Hongze Cheng 已提交
17
#include "tmsg.h"
18
#include "query.h"
D
dapan 已提交
19
#include "catalog.h"
20 21

SSchedulerMgmt schMgmt = {0};
D
dapan1121 已提交
22 23


D
dapan 已提交
24
int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
D
dapan1121 已提交
25
  for (int32_t i = 0; i < job->levelNum; ++i) {
D
dapan 已提交
26
    SSchLevel *level = taosArrayGet(job->levels, i);
D
dapan 已提交
27
    
D
dapan1121 已提交
28
    for (int32_t m = 0; m < level->taskNum; ++m) {
D
dapan 已提交
29
      SSchTask *task = taosArrayGet(level->subTasks, m);
D
dapan1121 已提交
30
      SSubplan *plan = task->plan;
D
dapan1121 已提交
31 32
      int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0;
      int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0;
D
dapan1121 已提交
33 34

      if (childNum > 0) {
D
dapan 已提交
35 36
        task->children = taosArrayInit(childNum, POINTER_BYTES);
        if (NULL == task->children) {
D
dapan1121 已提交
37 38 39 40 41 42
          qError("taosArrayInit %d failed", childNum);
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      for (int32_t n = 0; n < childNum; ++n) {
D
dapan 已提交
43
        SSubplan **child = taosArrayGet(plan->pChildern, n);
D
dapan 已提交
44
        SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES);
D
dapan 已提交
45
        if (NULL == childTask || NULL == *childTask) {
D
dapan1121 已提交
46 47 48 49
          qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan 已提交
50
        if (NULL == taosArrayPush(task->children, childTask)) {
D
dapan1121 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64
          qError("taosArrayPush failed");
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      if (parentNum > 0) {
        task->parents = taosArrayInit(parentNum, POINTER_BYTES);
        if (NULL == task->parents) {
          qError("taosArrayInit %d failed", parentNum);
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      for (int32_t n = 0; n < parentNum; ++n) {
D
dapan 已提交
65
        SSubplan **parent = taosArrayGet(plan->pParents, n);
D
dapan 已提交
66
        SSchTask **parentTask = taosHashGet(planToTask, parent, POINTER_BYTES);
D
dapan 已提交
67
        if (NULL == parentTask || NULL == *parentTask) {
D
dapan1121 已提交
68 69 70 71
          qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan 已提交
72
        if (NULL == taosArrayPush(task->parents, parentTask)) {
D
dapan1121 已提交
73 74 75 76 77 78 79
          qError("taosArrayPush failed");
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }      
    }
  }

D
dapan 已提交
80 81
  SSchLevel *level = taosArrayGet(job->levels, 0);
  if (job->attr.queryJob && level->taskNum > 1) {
D
dapan 已提交
82 83 84 85
    qError("invalid plan info, level 0, taskNum:%d", level->taskNum);
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

D
dapan 已提交
86
  SSchTask *task = taosArrayGet(level->subTasks, 0);
D
dapan 已提交
87 88 89 90 91 92
  if (task->parents && taosArrayGetSize(task->parents) > 0) {
    qError("invalid plan info, level 0, parentNum:%d", (int32_t)taosArrayGetSize(task->parents));
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }


D
dapan1121 已提交
93 94 95 96
  return TSDB_CODE_SUCCESS;
}


D
dapan 已提交
97
int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
D
dapan1121 已提交
98
  int32_t code = 0;
D
dapan 已提交
99

D
dapan1121 已提交
100 101
  job->queryId = dag->queryId;
  
D
dapan 已提交
102 103 104 105 106
  if (dag->numOfSubplans <= 0) {
    qError("invalid subplan num:%d", dag->numOfSubplans);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
  
107 108 109
  int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans);
  if (levelNum <= 0) {
    qError("invalid level num:%d", levelNum);
D
dapan1121 已提交
110
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
111 112
  }

D
dapan1121 已提交
113 114 115 116 117 118
  SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == planToTask) {
    qError("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  
D
dapan 已提交
119
  job->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
120 121
  if (NULL == job->levels) {
    qError("taosArrayInit %d failed", levelNum);
D
dapan1121 已提交
122
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
123 124
  }

D
dapan1121 已提交
125 126
  job->attr.needFetch = true;
  
127 128 129 130 131
  job->levelNum = levelNum;
  job->levelIdx = levelNum - 1;

  job->subPlans = dag->pSubplans;

D
dapan 已提交
132
  SSchLevel level = {0};
133 134
  SArray *levelPlans = NULL;
  int32_t levelPlanNum = 0;
D
dapan 已提交
135
  SSchLevel *pLevel = NULL;
136

D
dapan1121 已提交
137
  level.status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
138

139
  for (int32_t i = 0; i < levelNum; ++i) {
D
dapan1121 已提交
140 141 142 143 144 145 146 147
    if (NULL == taosArrayPush(job->levels, &level)) {
      qError("taosArrayPush failed");
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    pLevel = taosArrayGet(job->levels, i);
  
    pLevel->level = i;
148 149 150
    levelPlans = taosArrayGetP(dag->pSubplans, i);
    if (NULL == levelPlans) {
      qError("no level plans for level %d", i);
D
dapan1121 已提交
151
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
152 153 154 155 156
    }

    levelPlanNum = (int32_t)taosArrayGetSize(levelPlans);
    if (levelPlanNum <= 0) {
      qError("invalid level plans number:%d, level:%d", levelPlanNum, i);
D
dapan1121 已提交
157
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
158 159
    }

D
dapan1121 已提交
160
    pLevel->taskNum = levelPlanNum;
D
dapan1121 已提交
161
    
D
dapan 已提交
162
    pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SSchTask));
D
dapan1121 已提交
163
    if (NULL == pLevel->subTasks) {
D
dapan1121 已提交
164
      qError("taosArrayInit %d failed", levelPlanNum);
D
dapan1121 已提交
165
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
166 167 168
    }
    
    for (int32_t n = 0; n < levelPlanNum; ++n) {
D
dapan1121 已提交
169
      SSubplan *plan = taosArrayGet(levelPlans, n);
D
dapan 已提交
170
      SSchTask task = {0};
D
dapan1121 已提交
171 172 173

      if (plan->type == QUERY_TYPE_MODIFY) {
        job->attr.needFetch = false;
D
dapan 已提交
174 175
      } else {
        job->attr.queryJob = true;
D
dapan1121 已提交
176
      }
D
dapan 已提交
177

D
dapan1121 已提交
178
      
D
dapan1121 已提交
179 180
      task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
      task.plan = plan;
D
dapan1121 已提交
181
      task.level = pLevel;
182
      task.status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
183

D
dapan1121 已提交
184
      void *p = taosArrayPush(pLevel->subTasks, &task);
D
dapan1121 已提交
185 186 187 188 189 190
      if (NULL == p) {
        qError("taosArrayPush failed");
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
      
      if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
D
dapan1121 已提交
191 192 193
        qError("taosHashPut failed");
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
194
    }
195 196 197

  }

D
dapan1121 已提交
198 199 200 201 202 203
  SCH_ERR_JRET(schBuildTaskRalation(job, planToTask));

  if (planToTask) {
    taosHashCleanup(planToTask);
  }
  
204
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
205 206

_return:
D
dapan1121 已提交
207 208
  if (pLevel->subTasks) {
    taosArrayDestroy(pLevel->subTasks);
D
dapan1121 已提交
209 210
  }

D
dapan1121 已提交
211 212 213 214
  if (planToTask) {
    taosHashCleanup(planToTask);
  }

D
dapan1121 已提交
215
  SCH_RET(code);
216 217
}

D
dapan 已提交
218
int32_t schSetTaskExecEpSet(SSchJob *job, SEpSet *epSet) {  
D
dapan 已提交
219
  if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) {
D
dapan 已提交
220 221 222
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
223
  int32_t nodeNum = taosArrayGetSize(job->nodeList);
D
dapan1121 已提交
224
  
D
dapan1121 已提交
225 226
  for (int32_t i = 0; i < nodeNum && epSet->numOfEps < tListLen(epSet->port); ++i) {
    SEpAddr *addr = taosArrayGet(job->nodeList, i);
D
dapan1121 已提交
227 228 229 230 231 232 233 234 235 236 237 238
    
    strncpy(epSet->fqdn[epSet->numOfEps], addr->fqdn, sizeof(addr->fqdn));
    epSet->port[epSet->numOfEps] = addr->port;
    
    ++epSet->numOfEps;
  }

  for (int32_t i = 0; i < job->dataSrcEps.numOfEps && epSet->numOfEps < tListLen(epSet->port); ++i) {
    strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
    epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
    
    ++epSet->numOfEps;
D
dapan 已提交
239
  }
D
dapan 已提交
240 241

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
242
}
D
dapan1121 已提交
243

D
dapan1121 已提交
244

D
dapan 已提交
245
int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) {
D
dapan 已提交
246 247 248 249 250 251 252 253
  if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
    qError("taosHashPut failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
254
int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) {
D
dapan 已提交
255 256 257 258 259
  if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
    qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
260 261 262 263 264 265 266 267 268 269
  if (0 != taosHashPut(job->succTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
    qError("taosHashPut failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
  
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
270
int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) {
D
dapan1121 已提交
271
  if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
D
dapan 已提交
272
    qWarn("remove task[%"PRIx64"] from execTasks failed, it may not exist", task->taskId);
D
dapan1121 已提交
273 274 275
  }

  if (0 != taosHashPut(job->failTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
D
dapan 已提交
276 277 278 279 280 281 282 283 284
    qError("taosHashPut failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
  
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
285
int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) {
D
dapan1121 已提交
286 287
  // TODO set retry or not based on task type/errCode/retry times/job status/available eps...
  // TODO if needRetry, set task retry info
D
dapan 已提交
288

D
dapan1121 已提交
289 290 291
  *needRetry = false;

  return TSDB_CODE_SUCCESS;
D
dapan 已提交
292 293
}

D
dapan 已提交
294

D
dapan 已提交
295
int32_t schFetchFromRemote(SSchJob *job) {
D
dapan 已提交
296 297 298 299 300 301 302
  int32_t code = 0;
  
  if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) {
    qInfo("prior fetching not finished");
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
303
  SCH_ERR_JRET(schBuildAndSendMsg(job, job->fetchTask, TDMT_VND_FETCH));
D
dapan 已提交
304 305 306 307 308

  return TSDB_CODE_SUCCESS;
  
_return:
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
D
dapan 已提交
309

D
dapan 已提交
310
  return code;
D
dapan 已提交
311 312
}

D
dapan 已提交
313

D
dapan 已提交
314
int32_t schProcessOnJobPartialSuccess(SSchJob *job) {
D
dapan1121 已提交
315 316
  job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;

D
dapan 已提交
317
  if ((!job->attr.needFetch) && job->attr.syncSchedule) {
D
dapan1121 已提交
318 319
    tsem_post(&job->rspSem);
  }
D
dapan1121 已提交
320
  
D
dapan 已提交
321 322 323
  if (job->userFetch) {
    SCH_ERR_RET(schFetchFromRemote(job));
  }
D
dapan 已提交
324

D
dapan 已提交
325
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
326 327
}

D
dapan 已提交
328
int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) {
D
dapan1121 已提交
329
  job->status = JOB_TASK_STATUS_FAILED;
D
dapan 已提交
330
  job->errCode = errCode;
D
dapan1121 已提交
331

D
dapan 已提交
332 333
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);

D
dapan 已提交
334
  if (job->userFetch || ((!job->attr.needFetch) && job->attr.syncSchedule)) {
D
dapan 已提交
335 336
    tsem_post(&job->rspSem);
  }
D
dapan 已提交
337

D
dapan 已提交
338 339 340
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
341
int32_t schProcessOnDataFetched(SSchJob *job) {
D
dapan 已提交
342 343 344
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);

  tsem_post(&job->rspSem);
D
dapan 已提交
345 346 347
}


D
dapan 已提交
348
int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
D
dapan 已提交
349 350 351 352
  bool moved = false;
  
  SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved));
  if (!moved) {
D
dapan 已提交
353
    SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
D
dapan 已提交
354 355
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
356

D
dapan1121 已提交
357
  task->status = JOB_TASK_STATUS_SUCCEED;
D
dapan 已提交
358
  
D
dapan 已提交
359
  int32_t parentNum = task->parents ? (int32_t)taosArrayGetSize(task->parents) : 0;
D
dapan 已提交
360 361 362 363 364 365
  if (parentNum == 0) {
    if (task->plan->level != 0) {
      qError("level error");
      SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
    }

D
dapan1121 已提交
366 367 368 369
    int32_t taskDone = 0;
    
    if (SCH_TASK_NEED_WAIT_ALL(task)) {
      SCH_LOCK(SCH_WRITE, &task->level->lock);
D
dapan 已提交
370
      task->level->taskSucceed++;
D
dapan1121 已提交
371 372 373 374 375 376 377 378 379 380
      taskDone = task->level->taskSucceed + task->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &task->level->lock);
      
      if (taskDone < task->level->taskNum) {
        qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum);
        return TSDB_CODE_SUCCESS;
      }

      if (task->level->taskFailed > 0) {
        job->status = JOB_TASK_STATUS_FAILED;
D
dapan 已提交
381
        SCH_ERR_RET(schProcessOnJobFailure(job, TSDB_CODE_QRY_APP_ERROR));
D
dapan1121 已提交
382 383 384 385 386 387 388

        return TSDB_CODE_SUCCESS;
      }
    } else {
      strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn));
      job->resEp.port = task->execAddr.port;
    }
D
dapan 已提交
389

D
dapan 已提交
390 391
    job->fetchTask = task;
    
D
dapan1121 已提交
392
    SCH_ERR_RET(schProcessOnJobPartialSuccess(job));
D
dapan 已提交
393 394 395 396

    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
397
  if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) {
D
dapan 已提交
398 399 400 401 402 403
    strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
    job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;

    ++job->dataSrcEps.numOfEps;
  }

D
dapan 已提交
404
  for (int32_t i = 0; i < parentNum; ++i) {
D
dapan 已提交
405
    SSchTask *par = *(SSchTask **)taosArrayGet(task->parents, i);
D
dapan 已提交
406 407 408

    ++par->childReady;

D
dapan 已提交
409
    SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, task->plan->id.templateId, &task->execAddr));
D
dapan 已提交
410 411
    
    if (SCH_TASK_READY_TO_LUNCH(par)) {
D
dapan 已提交
412
      SCH_ERR_RET(schLaunchTask(job, par));
D
dapan 已提交
413 414 415 416 417 418
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
419
int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
D
dapan 已提交
420
  bool needRetry = false;
D
dapan1121 已提交
421 422
  bool moved = false;
  int32_t taskDone = 0;
D
dapan 已提交
423 424 425 426
  SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry));
  
  if (!needRetry) {
    SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode);
D
dapan1121 已提交
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443

    SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved));
    if (!moved) {
      SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
    }    
    
    if (SCH_TASK_NEED_WAIT_ALL(task)) {
      SCH_LOCK(SCH_WRITE, &task->level->lock);
      task->level->taskFailed++;
      taskDone = task->level->taskSucceed + task->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &task->level->lock);
      
      if (taskDone < task->level->taskNum) {
        qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum);
        return TSDB_CODE_SUCCESS;
      }
    }
D
dapan 已提交
444
    
D
dapan1121 已提交
445
    job->status = JOB_TASK_STATUS_FAILED;
D
dapan 已提交
446
    SCH_ERR_RET(schProcessOnJobFailure(job, errCode));
D
dapan 已提交
447 448 449 450

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
451
  SCH_ERR_RET(schLaunchTask(job, task));
D
dapan 已提交
452 453 454 455

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
456
int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
D
dapan1121 已提交
457 458 459
  int32_t code = 0;
  
  switch (msgType) {
D
dapan1121 已提交
460
    case TDMT_VND_SUBMIT_RSP: {
D
dapan1121 已提交
461 462 463 464
        SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
        if (rsp->code != TSDB_CODE_SUCCESS) {
          SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
        } else {
D
dapan 已提交
465
          job->resNumOfRows += rsp->affectedRows;
D
dapan1121 已提交
466 467 468 469 470
          
          code = schProcessOnTaskSuccess(job, task);
          if (code) {
            goto _task_error;
          }               
D
dapan1121 已提交
471
        }
D
dapan1121 已提交
472
        break;
D
dapan1121 已提交
473
      }
D
dapan1121 已提交
474
    case TDMT_VND_QUERY_RSP: {
D
dapan1121 已提交
475 476 477 478 479
        SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
        
        if (rsp->code != TSDB_CODE_SUCCESS) {
          SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
        } else {
D
dapan1121 已提交
480
          code = schBuildAndSendMsg(job, task, TDMT_VND_RES_READY);
D
dapan1121 已提交
481 482 483 484 485 486
          if (code) {
            goto _task_error;
          }
        }
        break;
      }
D
dapan1121 已提交
487
    case TDMT_VND_RES_READY_RSP: {
D
dapan1121 已提交
488 489 490 491 492 493 494 495 496 497 498 499
        SResReadyRsp *rsp = (SResReadyRsp *)msg;
        
        if (rsp->code != TSDB_CODE_SUCCESS) {
          SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
        } else {
          code = schProcessOnTaskSuccess(job, task);
          if (code) {
            goto _task_error;
          }        
        }
        break;
      }
D
dapan1121 已提交
500
    case TDMT_VND_FETCH_RSP: {
D
dapan1121 已提交
501 502 503 504 505 506 507 508
        SCH_ERR_JRET(rspCode);
        SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;

        job->res = rsp;
        job->resNumOfRows = rsp->numOfRows;
        
        SCH_ERR_JRET(schProcessOnDataFetched(job));
        break;
D
dapan1121 已提交
509
      }
D
dapan1121 已提交
510 511 512
    case TDMT_VND_DROP_TASK: {

      }
D
dapan1121 已提交
513 514 515 516 517 518 519 520 521 522 523 524
    default:
      qError("unknown msg type:%d received", msgType);
      return TSDB_CODE_QRY_INVALID_INPUT;
  }

  return TSDB_CODE_SUCCESS;

_task_error:
  SCH_ERR_JRET(schProcessOnTaskFailure(job, task, code));
  return TSDB_CODE_SUCCESS;

_return:
D
dapan 已提交
525
  code = schProcessOnJobFailure(job, code);
D
dapan1121 已提交
526 527 528
  return code;
}

D
dapan 已提交
529

D
dapan1121 已提交
530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734
int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) {
  int32_t code = 0;
  SSchCallbackParam *pParam = (SSchCallbackParam *)param;
  
  SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId));
  if (NULL == job || NULL == (*job)) {
    qError("taosHashGet queryId:%"PRIx64" not exist", pParam->queryId);
    SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

  SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId));
  if (NULL == task || NULL == (*task)) {
    qError("taosHashGet taskId:%"PRIx64" not exist", pParam->taskId);
    SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
  
  schProcessRspMsg(*job, *task, msgType, pMsg->pData, pMsg->len, rspCode);

_return:  
  tfree(param);

  SCH_RET(code);
}

int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
}
int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
}
int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
}
int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {  
  SSchCallbackParam *pParam = (SSchCallbackParam *)param;
  qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code);
}

int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
  switch (msgType) {
    case TDMT_VND_SUBMIT: 
      *fp = schHandleSubmitCallback;
      break;
    case TDMT_VND_QUERY: 
      *fp = schHandleQueryCallback;
      break;
    case TDMT_VND_RES_READY: 
      *fp = schHandleReadyCallback;
      break;
    case TDMT_VND_FETCH: 
      *fp = schHandleFetchCallback;
      break;
    case TDMT_VND_DROP_TASK:
      *fp = schHandleDropCallback;
      break;
    default:
      qError("unknown msg type:%d", msgType);
      SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

  return TSDB_CODE_SUCCESS;
}


int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) {
  int32_t code = 0;
  SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
  if (NULL == pMsgSendInfo) {
    qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SSchCallbackParam *param = calloc(1, sizeof(SSchCallbackParam));
  if (NULL == param) {
    qError("calloc %d failed", (int32_t)sizeof(SSchCallbackParam));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));

  param->queryId = qId;
  param->taskId = tId;

  pMsgSendInfo->param = param;
  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
  pMsgSendInfo->msgType = msgType;

  pMsgSendInfo->fp = fp;
  
  int64_t  transporterId = 0;
  SCH_ERR_JRET(asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo));
  
  return TSDB_CODE_SUCCESS;

_return:
  tfree(param);
  tfree(pMsgSendInfo);

  SCH_RET(code);
}


int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
  uint32_t msgSize = 0;
  void *msg = NULL;
  int32_t code = 0;
  
  switch (msgType) {
    case TDMT_VND_SUBMIT: {
      if (NULL == task->msg || task->msgLen <= 0) {
        qError("submit msg is NULL");
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      }

      msgSize = task->msgLen;
      msg = task->msg;
      break;
    }
    case TDMT_VND_QUERY: {
      if (NULL == task->msg) {
        qError("query msg is NULL");
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      }

      msgSize = sizeof(SSubQueryMsg) + task->msgLen;
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      SSubQueryMsg *pMsg = msg;
      
      pMsg->sId = htobe64(schMgmt.sId);
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);
      pMsg->contentLen = htonl(task->msgLen);
      memcpy(pMsg->msg, task->msg, task->msgLen);
      break;
    }    
    case TDMT_VND_RES_READY: {
      msgSize = sizeof(SResReadyMsg);
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      SResReadyMsg *pMsg = msg;
      pMsg->sId = htobe64(schMgmt.sId);      
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);      
      break;
    }
    case TDMT_VND_FETCH: {
      if (NULL == task) {
        SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      msgSize = sizeof(SResFetchMsg);
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
    
      SResFetchMsg *pMsg = msg;
      pMsg->sId = htobe64(schMgmt.sId);      
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);      
      break;
    }
    case TDMT_VND_DROP_TASK:{
      msgSize = sizeof(STaskDropMsg);
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
    
      STaskDropMsg *pMsg = msg;
      pMsg->sId = htobe64(schMgmt.sId);      
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);      
      break;
    }
    default:
      qError("unknown msg type:%d", msgType);
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      break;
  }

  SCH_ERR_JRET(schAsyncSendMsg(job->transport, &task->plan->execEpSet, job->queryId, task->taskId, msgType, msg, msgSize));

  return TSDB_CODE_SUCCESS;

_return:

  tfree(msg);
  SCH_RET(code);
}
D
dapan1121 已提交
735 736


D
dapan 已提交
737
int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
D
dapan1121 已提交
738
  SSubplan *plan = task->plan;
D
dapan1121 已提交
739
  SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen));
D
dapan 已提交
740
  if (plan->execEpSet.numOfEps <= 0) {
D
dapan1121 已提交
741 742 743 744 745 746
    SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet));
  }

  if (plan->execEpSet.numOfEps <= 0) {
    SCH_TASK_ERR_LOG("invalid execEpSet num:%d", plan->execEpSet.numOfEps);
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
D
dapan1121 已提交
747
  }
D
dapan1121 已提交
748

H
Hongze Cheng 已提交
749
  int32_t msgType = (plan->type == QUERY_TYPE_MODIFY) ? TDMT_VND_SUBMIT : TDMT_VND_QUERY;
D
dapan 已提交
750
  
D
dapan1121 已提交
751
  SCH_ERR_RET(schBuildAndSendMsg(job, task, msgType));
D
dapan 已提交
752

D
dapan 已提交
753
  SCH_ERR_RET(schPushTaskToExecList(job, task));
D
dapan1121 已提交
754

D
dapan1121 已提交
755
  task->status = JOB_TASK_STATUS_EXECUTING;
D
dapan1121 已提交
756

D
dapan1121 已提交
757 758 759
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
760 761


D
dapan 已提交
762 763
int32_t schLaunchJob(SSchJob *job) {
  SSchLevel *level = taosArrayGet(job->levels, job->levelIdx);
D
dapan 已提交
764
  for (int32_t i = 0; i < level->taskNum; ++i) {
D
dapan 已提交
765
    SSchTask *task = taosArrayGet(level->subTasks, i);
D
dapan1121 已提交
766
    SCH_ERR_RET(schLaunchTask(job, task));
D
dapan1121 已提交
767
  }
D
dapan1121 已提交
768

D
dapan1121 已提交
769
  job->status = JOB_TASK_STATUS_EXECUTING;
D
dapan 已提交
770
  
D
dapan1121 已提交
771
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
772 773
}

D
dapan 已提交
774
void schDropJobAllTasks(SSchJob *job) {
D
dapan1121 已提交
775 776
  void *pIter = taosHashIterate(job->succTasks, NULL);
  while (pIter) {
D
dapan 已提交
777
    SSchTask *task = *(SSchTask **)pIter;
D
dapan1121 已提交
778
  
D
dapan1121 已提交
779
    schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK);
D
dapan1121 已提交
780
    
D
dapan1121 已提交
781 782 783 784 785
    pIter = taosHashIterate(job->succTasks, pIter);
  }  

  pIter = taosHashIterate(job->failTasks, NULL);
  while (pIter) {
D
dapan 已提交
786
    SSchTask *task = *(SSchTask **)pIter;
D
dapan1121 已提交
787
  
D
dapan1121 已提交
788
    schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK);
D
dapan1121 已提交
789 790
    
    pIter = taosHashIterate(job->succTasks, pIter);
D
dapan1121 已提交
791 792
  }  
}
793

D
dapan1121 已提交
794 795 796 797 798 799 800 801 802 803 804
uint64_t schGenSchId(void) {
  uint64_t sId = 0;

  // TODO

  qDebug("Gen sId:0x%"PRIx64, sId);

  return sId;
}


805
int32_t schedulerInit(SSchedulerCfg *cfg) {
D
dapan1121 已提交
806 807 808 809 810
  if (schMgmt.jobs) {
    qError("scheduler already init");
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

D
dapan 已提交
811 812
  if (cfg) {
    schMgmt.cfg = *cfg;
D
dapan1121 已提交
813 814 815 816
    
    if (schMgmt.cfg.maxJobNum <= 0) {
      schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
    }
D
dapan1121 已提交
817 818
  } else {
    schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
D
dapan 已提交
819 820
  }

D
dapan1121 已提交
821 822
  schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == schMgmt.jobs) {
D
dapan1121 已提交
823 824 825
    SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum);
  }

D
dapan1121 已提交
826
  schMgmt.sId = schGenSchId();
D
dapan1121 已提交
827
  
828 829 830 831
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
832 833
int32_t scheduleExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) {
  if (nodeList && taosArrayGetSize(nodeList) <= 0) {
D
dapan1121 已提交
834 835 836
    qInfo("qnodeList is empty");
  }

D
dapan1121 已提交
837
  int32_t code = 0;
D
dapan 已提交
838
  SSchJob *job = calloc(1, sizeof(SSchJob));
839
  if (NULL == job) {
D
dapan1121 已提交
840
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
841 842
  }

D
dapan 已提交
843
  job->attr.syncSchedule = syncSchedule;
D
dapan1121 已提交
844
  job->transport = transport;
D
dapan1121 已提交
845
  job->nodeList = nodeList;
D
dapan 已提交
846

D
dapan1121 已提交
847
  SCH_ERR_JRET(schValidateAndBuildJob(pDag, job));
D
dapan1121 已提交
848

D
dapan 已提交
849 850 851 852 853 854 855 856 857 858 859
  job->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == job->execTasks) {
    qError("taosHashInit %d failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  job->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == job->succTasks) {
    qError("taosHashInit %d failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan 已提交
860

D
dapan1121 已提交
861 862 863 864 865 866
  job->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == job->failTasks) {
    qError("taosHashInit %d failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan 已提交
867
  tsem_init(&job->rspSem, 0, 0);
D
dapan1121 已提交
868 869 870 871 872 873 874 875 876 877

  code = taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      qError("taosHashPut queryId:%"PRIx64" already exist", job->queryId);
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
    } else {
      qError("taosHashPut queryId:%"PRIx64" failed", job->queryId);
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }
D
dapan1121 已提交
878 879
  }

D
dapan1121 已提交
880
  job->status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
881
  
D
dapan1121 已提交
882
  SCH_ERR_JRET(schLaunchJob(job));
883

D
dapan 已提交
884
  *(SSchJob **)pJob = job;
D
dapan1121 已提交
885

D
dapan 已提交
886
  if (syncSchedule) {
D
dapan1121 已提交
887 888 889
    tsem_wait(&job->rspSem);
  }

D
dapan1121 已提交
890
  return TSDB_CODE_SUCCESS;
891

D
dapan1121 已提交
892
_return:
893

D
dapan 已提交
894
  *(SSchJob **)pJob = NULL;
D
dapan1121 已提交
895 896 897
  scheduleFreeJob(job);
  
  SCH_RET(code);
898
}
D
dapan1121 已提交
899

D
dapan1121 已提交
900 901
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes) {
  if (NULL == transport || /* NULL == nodeList || */ NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
D
dapan1121 已提交
902 903 904
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
905
  SCH_ERR_RET(scheduleExecJobImpl(transport, nodeList, pDag, pJob, true));
D
dapan1121 已提交
906

D
dapan 已提交
907
  SSchJob *job = *(SSchJob **)pJob;
D
dapan1121 已提交
908
  
D
dapan1121 已提交
909 910 911
  pRes->code = job->errCode;
  pRes->numOfRows = job->resNumOfRows;
  
D
dapan1121 已提交
912 913 914
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
915 916
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob) {
  if (NULL == transport || NULL == nodeList ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
D
dapan1121 已提交
917 918 919
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
920
  return scheduleExecJobImpl(transport, nodeList, pDag, pJob, false);
D
dapan1121 已提交
921 922 923
}


D
dapan1121 已提交
924 925
int32_t scheduleFetchRows(void *pJob, void **data) {
  if (NULL == pJob || NULL == data) {
D
dapan1121 已提交
926
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan 已提交
927 928
  }

D
dapan 已提交
929
  SSchJob *job = pJob;
D
dapan 已提交
930
  int32_t code = 0;
D
dapan 已提交
931

D
dapan1121 已提交
932 933 934 935 936
  if (!job->attr.needFetch) {
    qError("no need to fetch data");
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

D
dapan 已提交
937 938 939 940 941
  if (job->status == JOB_TASK_STATUS_FAILED) {
    job->res = NULL;
    SCH_RET(job->errCode);
  }

D
dapan1121 已提交
942 943 944 945 946
  if (job->status == JOB_TASK_STATUS_SUCCEED) {
    job->res = NULL;
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
947 948
  if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) {
    qError("prior fetching not finished");
D
dapan1121 已提交
949
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan 已提交
950 951
  }

D
dapan1121 已提交
952
  if (job->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
D
dapan 已提交
953 954
    SCH_ERR_JRET(schFetchFromRemote(job));
  }
D
dapan 已提交
955 956 957

  tsem_wait(&job->rspSem);

D
dapan 已提交
958 959 960 961 962 963 964 965
  if (job->status == JOB_TASK_STATUS_FAILED) {
    code = job->errCode;
  }
  
  if (job->res && ((SRetrieveTableRsp *)job->res)->completed) {
    job->status = JOB_TASK_STATUS_SUCCEED;
  }

D
dapan 已提交
966
  *data = job->res;
D
dapan 已提交
967
  job->res = NULL;
D
dapan 已提交
968

D
dapan 已提交
969 970 971
_return:
  atomic_val_compare_exchange_32(&job->userFetch, 1, 0);

D
dapan1121 已提交
972
  SCH_RET(code);
D
dapan 已提交
973
}
D
dapan1121 已提交
974

D
dapan1121 已提交
975 976
int32_t scheduleCancelJob(void *pJob) {
  //TODO
D
dapan1121 已提交
977

D
dapan1121 已提交
978 979
  //TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST

D
dapan1121 已提交
980 981 982 983 984
  return TSDB_CODE_SUCCESS;
}

void scheduleFreeJob(void *pJob) {
  if (NULL == pJob) {
D
dapan 已提交
985 986
    return;
  }
D
dapan1121 已提交
987

D
dapan 已提交
988
  SSchJob *job = pJob;
D
dapan1121 已提交
989 990

  if (job->status > 0) {
D
dapan1121 已提交
991
    if (0 != taosHashRemove(schMgmt.jobs, &job->queryId, sizeof(job->queryId))) {
D
dapan1121 已提交
992 993 994 995
      qError("remove job:%"PRIx64"from mgmt failed", job->queryId); // maybe already freed
      return;
    }

D
dapan1121 已提交
996
    if (job->status == JOB_TASK_STATUS_EXECUTING) {
D
dapan1121 已提交
997 998
      scheduleCancelJob(pJob);
    }
D
dapan1121 已提交
999 1000

    schDropJobAllTasks(job);
D
dapan1121 已提交
1001 1002 1003
  }
  
  //TODO free job
D
dapan1121 已提交
1004 1005
}

1006
void schedulerDestroy(void) {
D
dapan1121 已提交
1007 1008 1009
  if (schMgmt.jobs) {
    taosHashCleanup(schMgmt.jobs); //TODO
    schMgmt.jobs = NULL;
1010 1011 1012
  }
}

D
dapan1121 已提交
1013