scheduler.c 22.7 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15
 */

D
dapan1121 已提交
16 17
#include "schedulerInt.h"
#include "taosmsg.h"
18
#include "query.h"
D
dapan 已提交
19
#include "catalog.h"
20 21

SSchedulerMgmt schMgmt = {0};
D
dapan1121 已提交
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53


int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_fn_t fp) {
/*
  SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_SQL_CONNECT);
  if (pRequest == NULL) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  SRequestMsgBody body = {0};
  buildConnectMsg(pRequest, &body);

  int64_t transporterId = 0;
  sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);

  tsem_wait(&pRequest->body.rspSem);
  destroyConnectMsg(&body);

  if (pRequest->code != TSDB_CODE_SUCCESS) {
    const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
    tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter);
    destroyRequest(pRequest);
  }
*/  
}

D
dapan1121 已提交
54 55 56
int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
  for (int32_t i = 0; i < job->levelNum; ++i) {
    SQueryLevel *level = taosArrayGet(job->levels, i);
D
dapan 已提交
57
    
D
dapan1121 已提交
58 59 60
    for (int32_t m = 0; m < level->taskNum; ++m) {
      SQueryTask *task = taosArrayGet(level->subTasks, m);
      SSubplan *plan = task->plan;
D
dapan1121 已提交
61 62
      int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0;
      int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0;
D
dapan1121 已提交
63 64

      if (childNum > 0) {
D
dapan 已提交
65 66
        task->children = taosArrayInit(childNum, POINTER_BYTES);
        if (NULL == task->children) {
D
dapan1121 已提交
67 68 69 70 71 72 73 74 75 76 77 78 79
          qError("taosArrayInit %d failed", childNum);
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      for (int32_t n = 0; n < childNum; ++n) {
        SSubplan *child = taosArrayGet(plan->pChildern, n);
        SQueryTask *childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
        if (childTask) {
          qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan 已提交
80
        if (NULL == taosArrayPush(task->children, &childTask)) {
D
dapan1121 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
          qError("taosArrayPush failed");
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      if (parentNum > 0) {
        task->parents = taosArrayInit(parentNum, POINTER_BYTES);
        if (NULL == task->parents) {
          qError("taosArrayInit %d failed", parentNum);
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      for (int32_t n = 0; n < parentNum; ++n) {
        SSubplan *parent = taosArrayGet(plan->pParents, n);
        SQueryTask *parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
        if (parentTask) {
          qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

        if (NULL == taosArrayPush(task->parents, &parentTask)) {
          qError("taosArrayPush failed");
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }      
    }
  }

D
dapan 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122
  SQueryLevel *level = taosArrayGet(job->levels, 0);
  if (level->taskNum > 1) {
    qError("invalid plan info, level 0, taskNum:%d", level->taskNum);
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

  SQueryTask *task = taosArrayGet(level->subTasks, 0);
  if (task->parents && taosArrayGetSize(task->parents) > 0) {
    qError("invalid plan info, level 0, parentNum:%d", (int32_t)taosArrayGetSize(task->parents));
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }


D
dapan1121 已提交
123 124 125 126
  return TSDB_CODE_SUCCESS;
}


127
int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
D
dapan1121 已提交
128
  int32_t code = 0;
D
dapan 已提交
129

D
dapan1121 已提交
130 131
  job->queryId = dag->queryId;
  
D
dapan 已提交
132 133 134 135 136
  if (dag->numOfSubplans <= 0) {
    qError("invalid subplan num:%d", dag->numOfSubplans);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
  
137 138 139
  int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans);
  if (levelNum <= 0) {
    qError("invalid level num:%d", levelNum);
D
dapan1121 已提交
140
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
141 142
  }

D
dapan1121 已提交
143 144 145 146 147 148
  SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == planToTask) {
    qError("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  
149 150 151
  job->levels = taosArrayInit(levelNum, sizeof(SQueryLevel));
  if (NULL == job->levels) {
    qError("taosArrayInit %d failed", levelNum);
D
dapan1121 已提交
152
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
153 154 155 156 157 158 159 160 161 162
  }

  job->levelNum = levelNum;
  job->levelIdx = levelNum - 1;

  job->subPlans = dag->pSubplans;

  SQueryLevel level = {0};
  SArray *levelPlans = NULL;
  int32_t levelPlanNum = 0;
D
dapan1121 已提交
163
  SQueryLevel *pLevel = NULL;
164

D
dapan1121 已提交
165
  level.status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
166

167
  for (int32_t i = 0; i < levelNum; ++i) {
D
dapan1121 已提交
168 169 170 171 172 173 174 175
    if (NULL == taosArrayPush(job->levels, &level)) {
      qError("taosArrayPush failed");
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    pLevel = taosArrayGet(job->levels, i);
  
    pLevel->level = i;
176 177 178
    levelPlans = taosArrayGetP(dag->pSubplans, i);
    if (NULL == levelPlans) {
      qError("no level plans for level %d", i);
D
dapan1121 已提交
179
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
180 181 182 183 184
    }

    levelPlanNum = (int32_t)taosArrayGetSize(levelPlans);
    if (levelPlanNum <= 0) {
      qError("invalid level plans number:%d, level:%d", levelPlanNum, i);
D
dapan1121 已提交
185
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
186 187
    }

D
dapan1121 已提交
188
    pLevel->taskNum = levelPlanNum;
D
dapan1121 已提交
189
    
D
dapan1121 已提交
190 191
    pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask));
    if (NULL == pLevel->subTasks) {
D
dapan1121 已提交
192
      qError("taosArrayInit %d failed", levelPlanNum);
D
dapan1121 已提交
193
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
194 195 196
    }
    
    for (int32_t n = 0; n < levelPlanNum; ++n) {
D
dapan1121 已提交
197
      SSubplan *plan = taosArrayGet(levelPlans, n);
D
dapan1121 已提交
198
      SQueryTask task = {0};
D
dapan1121 已提交
199
      
D
dapan1121 已提交
200 201
      task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
      task.plan = plan;
D
dapan1121 已提交
202
      task.level = pLevel;
203
      task.status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
204

D
dapan1121 已提交
205
      void *p = taosArrayPush(pLevel->subTasks, &task);
D
dapan1121 已提交
206 207 208 209 210 211
      if (NULL == p) {
        qError("taosArrayPush failed");
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
      
      if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
D
dapan1121 已提交
212 213 214
        qError("taosHashPut failed");
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
215
    }
216 217 218

  }

D
dapan1121 已提交
219 220 221 222 223 224
  SCH_ERR_JRET(schBuildTaskRalation(job, planToTask));

  if (planToTask) {
    taosHashCleanup(planToTask);
  }
  
225
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
226 227

_return:
D
dapan1121 已提交
228 229
  if (pLevel->subTasks) {
    taosArrayDestroy(pLevel->subTasks);
D
dapan1121 已提交
230 231
  }

D
dapan1121 已提交
232 233 234 235
  if (planToTask) {
    taosHashCleanup(planToTask);
  }

D
dapan1121 已提交
236
  SCH_RET(code);
237 238
}

D
dapan1121 已提交
239
int32_t schSetTaskExecEpSet(SQueryJob *job, SEpSet *epSet) {  
D
dapan 已提交
240
  if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) {
D
dapan 已提交
241 242 243
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
  int32_t qnodeNum = taosArrayGetSize(job->qnodeList);
  
  for (int32_t i = 0; i < qnodeNum && epSet->numOfEps < tListLen(epSet->port); ++i) {
    SEpAddr *addr = taosArrayGet(job->qnodeList, i);
    
    strncpy(epSet->fqdn[epSet->numOfEps], addr->fqdn, sizeof(addr->fqdn));
    epSet->port[epSet->numOfEps] = addr->port;
    
    ++epSet->numOfEps;
  }

  for (int32_t i = 0; i < job->dataSrcEps.numOfEps && epSet->numOfEps < tListLen(epSet->port); ++i) {
    strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
    epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
    
    ++epSet->numOfEps;
D
dapan 已提交
260
  }
D
dapan 已提交
261 262

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
263
}
D
dapan1121 已提交
264

D
dapan1121 已提交
265

D
dapan 已提交
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) {
  if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
    qError("taosHashPut failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) {
  if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
    qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
  if (0 != taosHashPut(job->succTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
    qError("taosHashPut failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
  
  return TSDB_CODE_SUCCESS;
}

int32_t schMoveTaskToFailList(SQueryJob *job, SQueryTask *task, bool *moved) {
  if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
    qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId);
    return TSDB_CODE_SUCCESS;
  }

  if (0 != taosHashPut(job->failTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
D
dapan 已提交
298 299 300 301 302 303 304 305 306 307
    qError("taosHashPut failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
  
  return TSDB_CODE_SUCCESS;
}


D
dapan 已提交
308 309 310 311 312
int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
  int32_t msgSize = 0;
  void *msg = NULL;
  
  switch (msgType) {
D
dapan1121 已提交
313 314 315 316 317 318 319 320 321 322
    case TSDB_MSG_TYPE_SUBMIT: {
      if (NULL == task->msg || task->msgLen <= 0) {
        qError("submit msg is NULL");
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      }

      msgSize = task->msgLen;
      msg = task->msg;
      break;
    }
D
dapan 已提交
323 324 325 326 327 328
    case TSDB_MSG_TYPE_QUERY: {
      if (NULL == task->msg) {
        qError("query msg is NULL");
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      }

D
dapan1121 已提交
329
      msgSize = sizeof(SSubQueryMsg) + task->msgLen;
D
dapan 已提交
330 331 332 333 334 335
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

D
dapan1121 已提交
336
      SSubQueryMsg *pMsg = msg;
D
dapan1121 已提交
337
      
D
dapan1121 已提交
338 339 340
      pMsg->schedulerId = htobe64(schMgmt.schedulerId);
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);
D
dapan1121 已提交
341 342
      pMsg->contentLen = htonl(task->msgLen);
      memcpy(pMsg->msg, task->msg, task->msgLen);
D
dapan 已提交
343
      break;
D
dapan1121 已提交
344
    }    
D
dapan1121 已提交
345
    case TSDB_MSG_TYPE_RES_READY: {
D
dapan1121 已提交
346
      msgSize = sizeof(SResReadyMsg);
D
dapan 已提交
347 348 349 350 351 352
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

D
dapan1121 已提交
353
      SResReadyMsg *pMsg = msg;
D
dapan1121 已提交
354
      pMsg->schedulerId = htobe64(schMgmt.schedulerId);      
D
dapan1121 已提交
355 356
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);      
D
dapan 已提交
357 358 359
      break;
    }
    case TSDB_MSG_TYPE_FETCH: {
D
dapan1121 已提交
360
      msgSize = sizeof(SResFetchMsg);
D
dapan 已提交
361 362 363 364 365 366
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
    
D
dapan1121 已提交
367
      SResFetchMsg *pMsg = msg;
D
dapan1121 已提交
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
      pMsg->schedulerId = htobe64(schMgmt.schedulerId);      
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);      
      break;
    }
    case TSDB_MSG_TYPE_DROP_TASK:{
      msgSize = sizeof(STaskDropMsg);
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
    
      STaskDropMsg *pMsg = msg;
      pMsg->schedulerId = htobe64(schMgmt.schedulerId);      
D
dapan1121 已提交
383 384
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);      
D
dapan 已提交
385 386 387 388 389 390
      break;
    }
    default:
      qError("unknown msg type:%d", msgType);
      break;
  }
D
dapan1121 已提交
391

D
dapan 已提交
392
  //TODO SEND MSG
D
dapan1121 已提交
393
  //taosAsyncExec(rpcSendRequest(void * shandle, const SEpSet * pEpSet, SRpcMsg * pMsg, int64_t * pRid), p, &code);
D
dapan 已提交
394 395

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
396 397
}

D
dapan 已提交
398
int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCode, bool *needRetry) {
D
dapan1121 已提交
399 400
  // TODO set retry or not based on task type/errCode/retry times/job status/available eps...
  // TODO if needRetry, set task retry info
D
dapan 已提交
401

D
dapan1121 已提交
402 403 404
  *needRetry = false;

  return TSDB_CODE_SUCCESS;
D
dapan 已提交
405 406
}

D
dapan 已提交
407

D
dapan 已提交
408
int32_t schFetchFromRemote(SQueryJob *job) {
D
dapan 已提交
409 410 411 412 413 414 415 416 417 418 419 420 421
  int32_t code = 0;
  
  if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) {
    qInfo("prior fetching not finished");
    return TSDB_CODE_SUCCESS;
  }

  SCH_ERR_JRET(schAsyncSendMsg(job, NULL, TSDB_MSG_TYPE_FETCH));

  return TSDB_CODE_SUCCESS;
  
_return:
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
D
dapan 已提交
422

D
dapan 已提交
423
  return code;
D
dapan 已提交
424 425
}

D
dapan 已提交
426 427

int32_t schProcessOnJobSuccess(SQueryJob *job) {
D
dapan1121 已提交
428
  job->status = JOB_TASK_STATUS_SUCCEED;
D
dapan1121 已提交
429
  
D
dapan 已提交
430 431 432
  if (job->userFetch) {
    SCH_ERR_RET(schFetchFromRemote(job));
  }
D
dapan 已提交
433

D
dapan 已提交
434
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
435 436 437
}

int32_t schProcessOnJobFailure(SQueryJob *job) {
D
dapan1121 已提交
438
  job->status = JOB_TASK_STATUS_FAILED;
D
dapan1121 已提交
439

D
dapan 已提交
440 441 442 443 444
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);

  if (job->userFetch) {
    tsem_post(&job->rspSem);
  }
D
dapan 已提交
445

D
dapan 已提交
446 447 448 449 450 451 452
  return TSDB_CODE_SUCCESS;
}

int32_t schProcessOnDataFetched(SQueryJob *job) {
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);

  tsem_post(&job->rspSem);
D
dapan 已提交
453 454 455 456 457 458 459 460
}


int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
  bool moved = false;
  
  SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved));
  if (!moved) {
D
dapan 已提交
461
    SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
D
dapan 已提交
462 463
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
464

D
dapan1121 已提交
465
  task->status = JOB_TASK_STATUS_SUCCEED;
D
dapan 已提交
466 467 468 469 470 471 472 473
  
  int32_t parentNum = (int32_t)taosArrayGetSize(task->parents);
  if (parentNum == 0) {
    if (task->plan->level != 0) {
      qError("level error");
      SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
    }

D
dapan1121 已提交
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496
    int32_t taskDone = 0;
    
    if (SCH_TASK_NEED_WAIT_ALL(task)) {
      SCH_LOCK(SCH_WRITE, &task->level->lock);
      task->level->taskFailed++;
      taskDone = task->level->taskSucceed + task->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &task->level->lock);
      
      if (taskDone < task->level->taskNum) {
        qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum);
        return TSDB_CODE_SUCCESS;
      }

      if (task->level->taskFailed > 0) {
        job->status = JOB_TASK_STATUS_FAILED;
        SCH_ERR_RET(schProcessOnJobFailure(job));

        return TSDB_CODE_SUCCESS;
      }
    } else {
      strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn));
      job->resEp.port = task->execAddr.port;
    }
D
dapan 已提交
497

D
dapan 已提交
498
    SCH_ERR_RET(schProcessOnJobSuccess(job));
D
dapan 已提交
499 500 501 502

    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
503
  if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) {
D
dapan 已提交
504 505 506 507 508 509
    strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
    job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;

    ++job->dataSrcEps.numOfEps;
  }

D
dapan 已提交
510 511 512 513 514
  for (int32_t i = 0; i < parentNum; ++i) {
    SQueryTask *par = taosArrayGet(task->parents, i);

    ++par->childReady;

D
dapan 已提交
515
    SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, task->plan->id.templateId, &task->execAddr));
D
dapan 已提交
516 517
    
    if (SCH_TASK_READY_TO_LUNCH(par)) {
D
dapan1121 已提交
518
      SCH_ERR_RET(schLaunchTask(job, task));
D
dapan 已提交
519 520 521 522 523 524 525 526
    }
  }

  return TSDB_CODE_SUCCESS;
}

int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCode) {
  bool needRetry = false;
D
dapan1121 已提交
527 528
  bool moved = false;
  int32_t taskDone = 0;
D
dapan 已提交
529 530 531 532
  SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry));
  
  if (!needRetry) {
    SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode);
D
dapan1121 已提交
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550

    SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved));
    if (!moved) {
      SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
      return TSDB_CODE_SUCCESS;
    }    
    
    if (SCH_TASK_NEED_WAIT_ALL(task)) {
      SCH_LOCK(SCH_WRITE, &task->level->lock);
      task->level->taskFailed++;
      taskDone = task->level->taskSucceed + task->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &task->level->lock);
      
      if (taskDone < task->level->taskNum) {
        qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum);
        return TSDB_CODE_SUCCESS;
      }
    }
D
dapan 已提交
551
    
D
dapan1121 已提交
552
    job->status = JOB_TASK_STATUS_FAILED;
D
dapan 已提交
553 554 555 556 557
    SCH_ERR_RET(schProcessOnJobFailure(job));

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
558
  SCH_ERR_RET(schLaunchTask(job, task));
D
dapan 已提交
559 560 561 562

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
563 564 565 566 567 568 569 570
int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32_t rspCode) {
  int32_t code = 0;
  
  switch (msgType) {
    case TSDB_MSG_TYPE_QUERY:
      if (rspCode != TSDB_CODE_SUCCESS) {
        SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
      } else {
D
dapan1121 已提交
571
        code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RES_READY);
D
dapan1121 已提交
572 573 574 575 576
        if (code) {
          goto _task_error;
        }
      }
      break;
D
dapan1121 已提交
577
    case TSDB_MSG_TYPE_RES_READY:
D
dapan1121 已提交
578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606
      if (rspCode != TSDB_CODE_SUCCESS) {
        SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
      } else {
        code = schProcessOnTaskSuccess(job, task);
        if (code) {
          goto _task_error;
        }        
      }
      break;
    case TSDB_MSG_TYPE_FETCH:
      SCH_ERR_JRET(rspCode);
      SCH_ERR_JRET(schProcessOnDataFetched(job));
      break;
    default:
      qError("unknown msg type:%d received", msgType);
      return TSDB_CODE_QRY_INVALID_INPUT;
  }

  return TSDB_CODE_SUCCESS;

_task_error:
  SCH_ERR_JRET(schProcessOnTaskFailure(job, task, code));
  return TSDB_CODE_SUCCESS;

_return:
  code = schProcessOnJobFailure(job);
  return code;
}

D
dapan 已提交
607

D
dapan1121 已提交
608 609


D
dapan1121 已提交
610
int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) {
D
dapan1121 已提交
611
  SSubplan *plan = task->plan;
D
dapan1121 已提交
612
  SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen));
D
dapan 已提交
613
  if (plan->execEpSet.numOfEps <= 0) {
D
dapan1121 已提交
614 615 616 617 618 619
    SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet));
  }

  if (plan->execEpSet.numOfEps <= 0) {
    SCH_TASK_ERR_LOG("invalid execEpSet num:%d", plan->execEpSet.numOfEps);
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
D
dapan1121 已提交
620
  }
D
dapan1121 已提交
621 622

  int32_t msgType = (plan->type == QUERY_TYPE_MODIFY) ? TSDB_MSG_TYPE_SUBMIT : TSDB_MSG_TYPE_QUERY;
D
dapan 已提交
623
  
D
dapan1121 已提交
624
  SCH_ERR_RET(schAsyncSendMsg(job, task, msgType));
D
dapan 已提交
625

D
dapan 已提交
626
  SCH_ERR_RET(schPushTaskToExecList(job, task));
D
dapan1121 已提交
627

D
dapan1121 已提交
628
  task->status = JOB_TASK_STATUS_EXECUTING;
D
dapan1121 已提交
629

D
dapan1121 已提交
630 631 632
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
633
int32_t schLaunchJob(SQueryJob *job) {
D
dapan 已提交
634 635 636
  SQueryLevel *level = taosArrayGet(job->levels, job->levelIdx);
  for (int32_t i = 0; i < level->taskNum; ++i) {
    SQueryTask *task = taosArrayGet(level->subTasks, i);
D
dapan1121 已提交
637
    SCH_ERR_RET(schLaunchTask(job, task));
D
dapan1121 已提交
638
  }
D
dapan1121 已提交
639

D
dapan1121 已提交
640
  job->status = JOB_TASK_STATUS_EXECUTING;
D
dapan 已提交
641
  
D
dapan1121 已提交
642
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
643 644
}

D
dapan1121 已提交
645 646 647 648 649
void schDropJobAllTasks(SQueryJob *job) {
  void *pIter = taosHashIterate(job->succTasks, NULL);
  while (pIter) {
    SQueryTask *task = *(SQueryTask **)pIter;
  
D
dapan1121 已提交
650
    schAsyncSendMsg(job, task, TSDB_MSG_TYPE_DROP_TASK);
D
dapan1121 已提交
651
    
D
dapan1121 已提交
652 653 654 655 656 657 658 659 660 661
    pIter = taosHashIterate(job->succTasks, pIter);
  }  

  pIter = taosHashIterate(job->failTasks, NULL);
  while (pIter) {
    SQueryTask *task = *(SQueryTask **)pIter;
  
    schAsyncSendMsg(job, task, TSDB_MSG_TYPE_DROP_TASK);
    
    pIter = taosHashIterate(job->succTasks, pIter);
D
dapan1121 已提交
662 663
  }  
}
664 665

int32_t schedulerInit(SSchedulerCfg *cfg) {
D
dapan 已提交
666 667
  if (cfg) {
    schMgmt.cfg = *cfg;
D
dapan1121 已提交
668 669
  } else {
    schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
D
dapan 已提交
670 671
  }

D
dapan1121 已提交
672 673
  schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == schMgmt.jobs) {
D
dapan1121 已提交
674 675 676 677 678
    SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum);
  }

  schMgmt.schedulerId = 1; //TODO GENERATE A UUID
  
679 680 681 682
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
683 684
int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) {
  if (NULL == transport || NULL == transport ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
D
dapan1121 已提交
685
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
686 687
  }

D
dapan1121 已提交
688 689 690 691
  if (taosArrayGetSize(qnodeList) <= 0) {
    qInfo("qnodeList is empty");
  }

D
dapan1121 已提交
692
  int32_t code = 0;
693 694
  SQueryJob *job = calloc(1, sizeof(SQueryJob));
  if (NULL == job) {
D
dapan1121 已提交
695
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
696 697
  }

D
dapan1121 已提交
698 699
  job->transport = transport;
  job->qnodeList = qnodeList;
D
dapan 已提交
700

D
dapan1121 已提交
701
  SCH_ERR_JRET(schValidateAndBuildJob(pDag, job));
D
dapan1121 已提交
702

D
dapan 已提交
703 704 705 706 707 708 709 710 711 712 713
  job->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == job->execTasks) {
    qError("taosHashInit %d failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  job->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == job->succTasks) {
    qError("taosHashInit %d failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan 已提交
714

D
dapan1121 已提交
715 716 717 718 719 720
  job->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == job->failTasks) {
    qError("taosHashInit %d failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan 已提交
721
  tsem_init(&job->rspSem, 0, 0);
D
dapan 已提交
722
  
D
dapan1121 已提交
723
  if (0 != taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES)) {
D
dapan1121 已提交
724 725 726 727
    qError("taosHashPut queryId:%"PRIx64" failed", job->queryId);
    SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

D
dapan1121 已提交
728
  job->status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
729
  
D
dapan1121 已提交
730
  SCH_ERR_JRET(schLaunchJob(job));
731 732

  *(SQueryJob **)pJob = job;
D
dapan1121 已提交
733

D
dapan1121 已提交
734
  return TSDB_CODE_SUCCESS;
735

D
dapan1121 已提交
736
_return:
737

D
dapan1121 已提交
738 739 740 741
  *(SQueryJob **)pJob = NULL;
  scheduleFreeJob(job);
  
  SCH_RET(code);
742
}
D
dapan1121 已提交
743

D
dapan1121 已提交
744 745
int32_t scheduleFetchRows(void *pJob, void **data) {
  if (NULL == pJob || NULL == data) {
D
dapan 已提交
746 747 748 749
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  SQueryJob *job = pJob;
D
dapan 已提交
750
  int32_t code = 0;
D
dapan 已提交
751 752 753 754 755 756

  if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) {
    qError("prior fetching not finished");
    return TSDB_CODE_QRY_APP_ERROR;
  }

D
dapan1121 已提交
757
  if (job->status == JOB_TASK_STATUS_SUCCEED) {
D
dapan 已提交
758 759
    SCH_ERR_JRET(schFetchFromRemote(job));
  }
D
dapan 已提交
760 761 762 763

  tsem_wait(&job->rspSem);

  *data = job->res;
D
dapan 已提交
764
  job->res = NULL;
D
dapan 已提交
765

D
dapan 已提交
766 767 768 769
_return:
  atomic_val_compare_exchange_32(&job->userFetch, 1, 0);

  return code;
D
dapan 已提交
770
}
D
dapan1121 已提交
771

D
dapan1121 已提交
772 773
int32_t scheduleCancelJob(void *pJob) {
  //TODO
D
dapan1121 已提交
774

D
dapan1121 已提交
775 776
  //TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST

D
dapan1121 已提交
777 778 779 780 781
  return TSDB_CODE_SUCCESS;
}

void scheduleFreeJob(void *pJob) {
  if (NULL == pJob) {
D
dapan 已提交
782 783
    return;
  }
D
dapan1121 已提交
784

D
dapan1121 已提交
785 786 787
  SQueryJob *job = pJob;

  if (job->status > 0) {
D
dapan1121 已提交
788
    if (0 != taosHashRemove(schMgmt.jobs, &job->queryId, sizeof(job->queryId))) {
D
dapan1121 已提交
789 790 791 792
      qError("remove job:%"PRIx64"from mgmt failed", job->queryId); // maybe already freed
      return;
    }

D
dapan1121 已提交
793
    if (job->status == JOB_TASK_STATUS_EXECUTING) {
D
dapan1121 已提交
794 795
      scheduleCancelJob(pJob);
    }
D
dapan1121 已提交
796 797

    schDropJobAllTasks(job);
D
dapan1121 已提交
798 799 800
  }
  
  //TODO free job
D
dapan1121 已提交
801 802
}

803
void schedulerDestroy(void) {
D
dapan1121 已提交
804 805 806
  if (schMgmt.jobs) {
    taosHashCleanup(schMgmt.jobs); //TODO
    schMgmt.jobs = NULL;
807 808 809
  }
}

D
dapan1121 已提交
810