scheduler.c 25.2 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15
 */

D
dapan1121 已提交
16
#include "schedulerInt.h"
H
Hongze Cheng 已提交
17
#include "tmsg.h"
18
#include "query.h"
D
dapan 已提交
19
#include "catalog.h"
20 21

SSchedulerMgmt schMgmt = {0};
D
dapan1121 已提交
22 23 24 25 26 27 28 29 30 31 32 33 34


int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_fn_t fp) {
/*
  SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_SQL_CONNECT);
  if (pRequest == NULL) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  SRequestMsgBody body = {0};
  buildConnectMsg(pRequest, &body);

  int64_t transporterId = 0;
H
Haojun Liao 已提交
35
  asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);
D
dapan1121 已提交
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53

  tsem_wait(&pRequest->body.rspSem);
  destroyConnectMsg(&body);

  if (pRequest->code != TSDB_CODE_SUCCESS) {
    const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
    tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter);
    destroyRequest(pRequest);
  }
*/  
}

D
dapan 已提交
54
int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
D
dapan1121 已提交
55
  for (int32_t i = 0; i < job->levelNum; ++i) {
D
dapan 已提交
56
    SSchLevel *level = taosArrayGet(job->levels, i);
D
dapan 已提交
57
    
D
dapan1121 已提交
58
    for (int32_t m = 0; m < level->taskNum; ++m) {
D
dapan 已提交
59
      SSchTask *task = taosArrayGet(level->subTasks, m);
D
dapan1121 已提交
60
      SSubplan *plan = task->plan;
D
dapan1121 已提交
61 62
      int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0;
      int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0;
D
dapan1121 已提交
63 64

      if (childNum > 0) {
D
dapan 已提交
65 66
        task->children = taosArrayInit(childNum, POINTER_BYTES);
        if (NULL == task->children) {
D
dapan1121 已提交
67 68 69 70 71 72
          qError("taosArrayInit %d failed", childNum);
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      for (int32_t n = 0; n < childNum; ++n) {
D
dapan 已提交
73
        SSubplan **child = taosArrayGet(plan->pChildern, n);
D
dapan 已提交
74
        SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES);
D
dapan 已提交
75
        if (NULL == childTask || NULL == *childTask) {
D
dapan1121 已提交
76 77 78 79
          qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan 已提交
80
        if (NULL == taosArrayPush(task->children, childTask)) {
D
dapan1121 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94
          qError("taosArrayPush failed");
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      if (parentNum > 0) {
        task->parents = taosArrayInit(parentNum, POINTER_BYTES);
        if (NULL == task->parents) {
          qError("taosArrayInit %d failed", parentNum);
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      for (int32_t n = 0; n < parentNum; ++n) {
D
dapan 已提交
95
        SSubplan **parent = taosArrayGet(plan->pParents, n);
D
dapan 已提交
96
        SSchTask **parentTask = taosHashGet(planToTask, parent, POINTER_BYTES);
D
dapan 已提交
97
        if (NULL == parentTask || NULL == *parentTask) {
D
dapan1121 已提交
98 99 100 101
          qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan 已提交
102
        if (NULL == taosArrayPush(task->parents, parentTask)) {
D
dapan1121 已提交
103 104 105 106 107 108 109
          qError("taosArrayPush failed");
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }      
    }
  }

D
dapan 已提交
110 111
  SSchLevel *level = taosArrayGet(job->levels, 0);
  if (job->attr.queryJob && level->taskNum > 1) {
D
dapan 已提交
112 113 114 115
    qError("invalid plan info, level 0, taskNum:%d", level->taskNum);
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

D
dapan 已提交
116
  SSchTask *task = taosArrayGet(level->subTasks, 0);
D
dapan 已提交
117 118 119 120 121 122
  if (task->parents && taosArrayGetSize(task->parents) > 0) {
    qError("invalid plan info, level 0, parentNum:%d", (int32_t)taosArrayGetSize(task->parents));
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }


D
dapan1121 已提交
123 124 125 126
  return TSDB_CODE_SUCCESS;
}


D
dapan 已提交
127
int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
D
dapan1121 已提交
128
  int32_t code = 0;
D
dapan 已提交
129

D
dapan1121 已提交
130 131
  job->queryId = dag->queryId;
  
D
dapan 已提交
132 133 134 135 136
  if (dag->numOfSubplans <= 0) {
    qError("invalid subplan num:%d", dag->numOfSubplans);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
  
137 138 139
  int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans);
  if (levelNum <= 0) {
    qError("invalid level num:%d", levelNum);
D
dapan1121 已提交
140
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
141 142
  }

D
dapan1121 已提交
143 144 145 146 147 148
  SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == planToTask) {
    qError("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  
D
dapan 已提交
149
  job->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
150 151
  if (NULL == job->levels) {
    qError("taosArrayInit %d failed", levelNum);
D
dapan1121 已提交
152
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
153 154
  }

D
dapan1121 已提交
155 156
  job->attr.needFetch = true;
  
157 158 159 160 161
  job->levelNum = levelNum;
  job->levelIdx = levelNum - 1;

  job->subPlans = dag->pSubplans;

D
dapan 已提交
162
  SSchLevel level = {0};
163 164
  SArray *levelPlans = NULL;
  int32_t levelPlanNum = 0;
D
dapan 已提交
165
  SSchLevel *pLevel = NULL;
166

D
dapan1121 已提交
167
  level.status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
168

169
  for (int32_t i = 0; i < levelNum; ++i) {
D
dapan1121 已提交
170 171 172 173 174 175 176 177
    if (NULL == taosArrayPush(job->levels, &level)) {
      qError("taosArrayPush failed");
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    pLevel = taosArrayGet(job->levels, i);
  
    pLevel->level = i;
178 179 180
    levelPlans = taosArrayGetP(dag->pSubplans, i);
    if (NULL == levelPlans) {
      qError("no level plans for level %d", i);
D
dapan1121 已提交
181
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
182 183 184 185 186
    }

    levelPlanNum = (int32_t)taosArrayGetSize(levelPlans);
    if (levelPlanNum <= 0) {
      qError("invalid level plans number:%d, level:%d", levelPlanNum, i);
D
dapan1121 已提交
187
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
188 189
    }

D
dapan1121 已提交
190
    pLevel->taskNum = levelPlanNum;
D
dapan1121 已提交
191
    
D
dapan 已提交
192
    pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SSchTask));
D
dapan1121 已提交
193
    if (NULL == pLevel->subTasks) {
D
dapan1121 已提交
194
      qError("taosArrayInit %d failed", levelPlanNum);
D
dapan1121 已提交
195
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
196 197 198
    }
    
    for (int32_t n = 0; n < levelPlanNum; ++n) {
D
dapan1121 已提交
199
      SSubplan *plan = taosArrayGet(levelPlans, n);
D
dapan 已提交
200
      SSchTask task = {0};
D
dapan1121 已提交
201 202 203

      if (plan->type == QUERY_TYPE_MODIFY) {
        job->attr.needFetch = false;
D
dapan 已提交
204 205
      } else {
        job->attr.queryJob = true;
D
dapan1121 已提交
206
      }
D
dapan 已提交
207

D
dapan1121 已提交
208
      
D
dapan1121 已提交
209 210
      task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
      task.plan = plan;
D
dapan1121 已提交
211
      task.level = pLevel;
212
      task.status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
213

D
dapan1121 已提交
214
      void *p = taosArrayPush(pLevel->subTasks, &task);
D
dapan1121 已提交
215 216 217 218 219 220
      if (NULL == p) {
        qError("taosArrayPush failed");
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
      
      if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
D
dapan1121 已提交
221 222 223
        qError("taosHashPut failed");
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
224
    }
225 226 227

  }

D
dapan1121 已提交
228 229 230 231 232 233
  SCH_ERR_JRET(schBuildTaskRalation(job, planToTask));

  if (planToTask) {
    taosHashCleanup(planToTask);
  }
  
234
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
235 236

_return:
D
dapan1121 已提交
237 238
  if (pLevel->subTasks) {
    taosArrayDestroy(pLevel->subTasks);
D
dapan1121 已提交
239 240
  }

D
dapan1121 已提交
241 242 243 244
  if (planToTask) {
    taosHashCleanup(planToTask);
  }

D
dapan1121 已提交
245
  SCH_RET(code);
246 247
}

D
dapan 已提交
248
int32_t schSetTaskExecEpSet(SSchJob *job, SEpSet *epSet) {  
D
dapan 已提交
249
  if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) {
D
dapan 已提交
250 251 252
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
  int32_t qnodeNum = taosArrayGetSize(job->qnodeList);
  
  for (int32_t i = 0; i < qnodeNum && epSet->numOfEps < tListLen(epSet->port); ++i) {
    SEpAddr *addr = taosArrayGet(job->qnodeList, i);
    
    strncpy(epSet->fqdn[epSet->numOfEps], addr->fqdn, sizeof(addr->fqdn));
    epSet->port[epSet->numOfEps] = addr->port;
    
    ++epSet->numOfEps;
  }

  for (int32_t i = 0; i < job->dataSrcEps.numOfEps && epSet->numOfEps < tListLen(epSet->port); ++i) {
    strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
    epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
    
    ++epSet->numOfEps;
D
dapan 已提交
269
  }
D
dapan 已提交
270 271

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
272
}
D
dapan1121 已提交
273

D
dapan1121 已提交
274

D
dapan 已提交
275
int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) {
D
dapan 已提交
276 277 278 279 280 281 282 283
  if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
    qError("taosHashPut failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
284
int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) {
D
dapan 已提交
285 286 287 288 289
  if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
    qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
290 291 292 293 294 295 296 297 298 299
  if (0 != taosHashPut(job->succTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
    qError("taosHashPut failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
  
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
300
int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) {
D
dapan1121 已提交
301
  if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
D
dapan 已提交
302
    qWarn("remove task[%"PRIx64"] from execTasks failed, it may not exist", task->taskId);
D
dapan1121 已提交
303 304 305
  }

  if (0 != taosHashPut(job->failTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
D
dapan 已提交
306 307 308 309 310 311 312 313 314 315
    qError("taosHashPut failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
  
  return TSDB_CODE_SUCCESS;
}


D
dapan 已提交
316
int32_t schAsyncSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
D
dapan 已提交
317 318 319 320
  int32_t msgSize = 0;
  void *msg = NULL;
  
  switch (msgType) {
H
Hongze Cheng 已提交
321
    case TDMT_VND_SUBMIT: {
D
dapan1121 已提交
322 323 324 325 326 327 328 329 330
      if (NULL == task->msg || task->msgLen <= 0) {
        qError("submit msg is NULL");
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      }

      msgSize = task->msgLen;
      msg = task->msg;
      break;
    }
H
Hongze Cheng 已提交
331
    case TDMT_VND_QUERY: {
D
dapan 已提交
332 333 334 335 336
      if (NULL == task->msg) {
        qError("query msg is NULL");
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      }

D
dapan1121 已提交
337
      msgSize = sizeof(SSubQueryMsg) + task->msgLen;
D
dapan 已提交
338 339 340 341 342 343
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

D
dapan1121 已提交
344
      SSubQueryMsg *pMsg = msg;
D
dapan1121 已提交
345
      
D
dapan1121 已提交
346 347 348
      pMsg->schedulerId = htobe64(schMgmt.schedulerId);
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);
D
dapan1121 已提交
349 350
      pMsg->contentLen = htonl(task->msgLen);
      memcpy(pMsg->msg, task->msg, task->msgLen);
D
dapan 已提交
351
      break;
D
dapan1121 已提交
352
    }    
H
Hongze Cheng 已提交
353
    case TDMT_VND_RES_READY: {
D
dapan1121 已提交
354
      msgSize = sizeof(SResReadyMsg);
D
dapan 已提交
355 356 357 358 359 360
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

D
dapan1121 已提交
361
      SResReadyMsg *pMsg = msg;
D
dapan1121 已提交
362
      pMsg->schedulerId = htobe64(schMgmt.schedulerId);      
D
dapan1121 已提交
363 364
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);      
D
dapan 已提交
365 366
      break;
    }
H
Hongze Cheng 已提交
367
    case TDMT_VND_FETCH: {
D
dapan 已提交
368 369 370
      if (NULL == task) {
        SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
D
dapan1121 已提交
371
      msgSize = sizeof(SResFetchMsg);
D
dapan 已提交
372 373 374 375 376 377
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
    
D
dapan1121 已提交
378
      SResFetchMsg *pMsg = msg;
D
dapan1121 已提交
379 380 381 382 383
      pMsg->schedulerId = htobe64(schMgmt.schedulerId);      
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);      
      break;
    }
H
Hongze Cheng 已提交
384
    case TDMT_VND_DROP_TASK:{
D
dapan1121 已提交
385 386 387 388 389 390 391 392 393
      msgSize = sizeof(STaskDropMsg);
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
    
      STaskDropMsg *pMsg = msg;
      pMsg->schedulerId = htobe64(schMgmt.schedulerId);      
D
dapan1121 已提交
394 395
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);      
D
dapan 已提交
396 397 398 399 400 401
      break;
    }
    default:
      qError("unknown msg type:%d", msgType);
      break;
  }
D
dapan1121 已提交
402

D
dapan 已提交
403
  //TODO SEND MSG
D
dapan1121 已提交
404
  //taosAsyncExec(rpcSendRequest(void * shandle, const SEpSet * pEpSet, SRpcMsg * pMsg, int64_t * pRid), p, &code);
D
dapan 已提交
405 406

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
407 408
}

D
dapan 已提交
409
int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) {
D
dapan1121 已提交
410 411
  // TODO set retry or not based on task type/errCode/retry times/job status/available eps...
  // TODO if needRetry, set task retry info
D
dapan 已提交
412

D
dapan1121 已提交
413 414 415
  *needRetry = false;

  return TSDB_CODE_SUCCESS;
D
dapan 已提交
416 417
}

D
dapan 已提交
418

D
dapan 已提交
419
int32_t schFetchFromRemote(SSchJob *job) {
D
dapan 已提交
420 421 422 423 424 425 426
  int32_t code = 0;
  
  if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) {
    qInfo("prior fetching not finished");
    return TSDB_CODE_SUCCESS;
  }

427
  SCH_ERR_JRET(schAsyncSendMsg(job, job->fetchTask, TDMT_VND_FETCH));
D
dapan 已提交
428 429 430 431 432

  return TSDB_CODE_SUCCESS;
  
_return:
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
D
dapan 已提交
433

D
dapan 已提交
434
  return code;
D
dapan 已提交
435 436
}

D
dapan 已提交
437

D
dapan 已提交
438
int32_t schProcessOnJobPartialSuccess(SSchJob *job) {
D
dapan1121 已提交
439 440
  job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;

D
dapan 已提交
441
  if ((!job->attr.needFetch) && job->attr.syncSchedule) {
D
dapan1121 已提交
442 443
    tsem_post(&job->rspSem);
  }
D
dapan1121 已提交
444
  
D
dapan 已提交
445 446 447
  if (job->userFetch) {
    SCH_ERR_RET(schFetchFromRemote(job));
  }
D
dapan 已提交
448

D
dapan 已提交
449
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
450 451
}

D
dapan 已提交
452
int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) {
D
dapan1121 已提交
453
  job->status = JOB_TASK_STATUS_FAILED;
D
dapan 已提交
454
  job->errCode = errCode;
D
dapan1121 已提交
455

D
dapan 已提交
456 457
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);

D
dapan 已提交
458
  if (job->userFetch || ((!job->attr.needFetch) && job->attr.syncSchedule)) {
D
dapan 已提交
459 460
    tsem_post(&job->rspSem);
  }
D
dapan 已提交
461

D
dapan 已提交
462 463 464
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
465
int32_t schProcessOnDataFetched(SSchJob *job) {
D
dapan 已提交
466 467 468
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);

  tsem_post(&job->rspSem);
D
dapan 已提交
469 470 471
}


D
dapan 已提交
472
int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
D
dapan 已提交
473 474 475 476
  bool moved = false;
  
  SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved));
  if (!moved) {
D
dapan 已提交
477
    SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
D
dapan 已提交
478 479
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
480

D
dapan1121 已提交
481
  task->status = JOB_TASK_STATUS_SUCCEED;
D
dapan 已提交
482
  
D
dapan 已提交
483
  int32_t parentNum = task->parents ? (int32_t)taosArrayGetSize(task->parents) : 0;
D
dapan 已提交
484 485 486 487 488 489
  if (parentNum == 0) {
    if (task->plan->level != 0) {
      qError("level error");
      SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
    }

D
dapan1121 已提交
490 491 492 493
    int32_t taskDone = 0;
    
    if (SCH_TASK_NEED_WAIT_ALL(task)) {
      SCH_LOCK(SCH_WRITE, &task->level->lock);
D
dapan 已提交
494
      task->level->taskSucceed++;
D
dapan1121 已提交
495 496 497 498 499 500 501 502 503 504
      taskDone = task->level->taskSucceed + task->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &task->level->lock);
      
      if (taskDone < task->level->taskNum) {
        qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum);
        return TSDB_CODE_SUCCESS;
      }

      if (task->level->taskFailed > 0) {
        job->status = JOB_TASK_STATUS_FAILED;
D
dapan 已提交
505
        SCH_ERR_RET(schProcessOnJobFailure(job, TSDB_CODE_QRY_APP_ERROR));
D
dapan1121 已提交
506 507 508 509 510 511 512

        return TSDB_CODE_SUCCESS;
      }
    } else {
      strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn));
      job->resEp.port = task->execAddr.port;
    }
D
dapan 已提交
513

D
dapan 已提交
514 515
    job->fetchTask = task;
    
D
dapan1121 已提交
516
    SCH_ERR_RET(schProcessOnJobPartialSuccess(job));
D
dapan 已提交
517 518 519 520

    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
521
  if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) {
D
dapan 已提交
522 523 524 525 526 527
    strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
    job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;

    ++job->dataSrcEps.numOfEps;
  }

D
dapan 已提交
528
  for (int32_t i = 0; i < parentNum; ++i) {
D
dapan 已提交
529
    SSchTask *par = *(SSchTask **)taosArrayGet(task->parents, i);
D
dapan 已提交
530 531 532

    ++par->childReady;

D
dapan 已提交
533
    SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, task->plan->id.templateId, &task->execAddr));
D
dapan 已提交
534 535
    
    if (SCH_TASK_READY_TO_LUNCH(par)) {
D
dapan 已提交
536
      SCH_ERR_RET(schLaunchTask(job, par));
D
dapan 已提交
537 538 539 540 541 542
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
543
int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
D
dapan 已提交
544
  bool needRetry = false;
D
dapan1121 已提交
545 546
  bool moved = false;
  int32_t taskDone = 0;
D
dapan 已提交
547 548 549 550
  SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry));
  
  if (!needRetry) {
    SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode);
D
dapan1121 已提交
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567

    SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved));
    if (!moved) {
      SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
    }    
    
    if (SCH_TASK_NEED_WAIT_ALL(task)) {
      SCH_LOCK(SCH_WRITE, &task->level->lock);
      task->level->taskFailed++;
      taskDone = task->level->taskSucceed + task->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &task->level->lock);
      
      if (taskDone < task->level->taskNum) {
        qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum);
        return TSDB_CODE_SUCCESS;
      }
    }
D
dapan 已提交
568
    
D
dapan1121 已提交
569
    job->status = JOB_TASK_STATUS_FAILED;
D
dapan 已提交
570
    SCH_ERR_RET(schProcessOnJobFailure(job, errCode));
D
dapan 已提交
571 572 573 574

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
575
  SCH_ERR_RET(schLaunchTask(job, task));
D
dapan 已提交
576 577 578 579

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
580
int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
D
dapan1121 已提交
581 582 583
  int32_t code = 0;
  
  switch (msgType) {
584
    case TDMT_VND_SUBMIT: {
D
dapan1121 已提交
585 586 587 588
        SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
        if (rsp->code != TSDB_CODE_SUCCESS) {
          SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
        } else {
D
dapan 已提交
589
          job->resNumOfRows += rsp->affectedRows;
D
dapan1121 已提交
590 591 592 593 594
          
          code = schProcessOnTaskSuccess(job, task);
          if (code) {
            goto _task_error;
          }               
D
dapan1121 已提交
595
        }
D
dapan1121 已提交
596
        break;
D
dapan1121 已提交
597
      }
598
    case TDMT_VND_QUERY: {
D
dapan1121 已提交
599 600 601 602 603
        SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
        
        if (rsp->code != TSDB_CODE_SUCCESS) {
          SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
        } else {
604
          code = schAsyncSendMsg(job, task, TDMT_VND_RES_READY);
D
dapan1121 已提交
605 606 607 608 609 610
          if (code) {
            goto _task_error;
          }
        }
        break;
      }
611
    case TDMT_VND_RES_READY: {
D
dapan1121 已提交
612 613 614 615 616 617 618 619 620 621 622 623
        SResReadyRsp *rsp = (SResReadyRsp *)msg;
        
        if (rsp->code != TSDB_CODE_SUCCESS) {
          SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
        } else {
          code = schProcessOnTaskSuccess(job, task);
          if (code) {
            goto _task_error;
          }        
        }
        break;
      }
624
    case TDMT_VND_FETCH: {
D
dapan1121 已提交
625 626 627 628 629 630 631 632
        SCH_ERR_JRET(rspCode);
        SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;

        job->res = rsp;
        job->resNumOfRows = rsp->numOfRows;
        
        SCH_ERR_JRET(schProcessOnDataFetched(job));
        break;
D
dapan1121 已提交
633 634 635 636 637 638 639 640 641 642 643 644 645
      }
    default:
      qError("unknown msg type:%d received", msgType);
      return TSDB_CODE_QRY_INVALID_INPUT;
  }

  return TSDB_CODE_SUCCESS;

_task_error:
  SCH_ERR_JRET(schProcessOnTaskFailure(job, task, code));
  return TSDB_CODE_SUCCESS;

_return:
D
dapan 已提交
646
  code = schProcessOnJobFailure(job, code);
D
dapan1121 已提交
647 648 649
  return code;
}

D
dapan 已提交
650

D
dapan1121 已提交
651 652


D
dapan 已提交
653
int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
D
dapan1121 已提交
654
  SSubplan *plan = task->plan;
D
dapan1121 已提交
655
  SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen));
D
dapan 已提交
656
  if (plan->execEpSet.numOfEps <= 0) {
D
dapan1121 已提交
657 658 659 660 661 662
    SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet));
  }

  if (plan->execEpSet.numOfEps <= 0) {
    SCH_TASK_ERR_LOG("invalid execEpSet num:%d", plan->execEpSet.numOfEps);
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
D
dapan1121 已提交
663
  }
D
dapan1121 已提交
664

H
Hongze Cheng 已提交
665
  int32_t msgType = (plan->type == QUERY_TYPE_MODIFY) ? TDMT_VND_SUBMIT : TDMT_VND_QUERY;
D
dapan 已提交
666
  
D
dapan1121 已提交
667
  SCH_ERR_RET(schAsyncSendMsg(job, task, msgType));
D
dapan 已提交
668

D
dapan 已提交
669
  SCH_ERR_RET(schPushTaskToExecList(job, task));
D
dapan1121 已提交
670

D
dapan1121 已提交
671
  task->status = JOB_TASK_STATUS_EXECUTING;
D
dapan1121 已提交
672

D
dapan1121 已提交
673 674 675
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
676 677
int32_t schLaunchJob(SSchJob *job) {
  SSchLevel *level = taosArrayGet(job->levels, job->levelIdx);
D
dapan 已提交
678
  for (int32_t i = 0; i < level->taskNum; ++i) {
D
dapan 已提交
679
    SSchTask *task = taosArrayGet(level->subTasks, i);
D
dapan1121 已提交
680
    SCH_ERR_RET(schLaunchTask(job, task));
D
dapan1121 已提交
681
  }
D
dapan1121 已提交
682

D
dapan1121 已提交
683
  job->status = JOB_TASK_STATUS_EXECUTING;
D
dapan 已提交
684
  
D
dapan1121 已提交
685
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
686 687
}

D
dapan 已提交
688
void schDropJobAllTasks(SSchJob *job) {
D
dapan1121 已提交
689 690
  void *pIter = taosHashIterate(job->succTasks, NULL);
  while (pIter) {
D
dapan 已提交
691
    SSchTask *task = *(SSchTask **)pIter;
D
dapan1121 已提交
692
  
H
Hongze Cheng 已提交
693
    schAsyncSendMsg(job, task, TDMT_VND_DROP_TASK);
D
dapan1121 已提交
694
    
D
dapan1121 已提交
695 696 697 698 699
    pIter = taosHashIterate(job->succTasks, pIter);
  }  

  pIter = taosHashIterate(job->failTasks, NULL);
  while (pIter) {
D
dapan 已提交
700
    SSchTask *task = *(SSchTask **)pIter;
D
dapan1121 已提交
701
  
H
Hongze Cheng 已提交
702
    schAsyncSendMsg(job, task, TDMT_VND_DROP_TASK);
D
dapan1121 已提交
703 704
    
    pIter = taosHashIterate(job->succTasks, pIter);
D
dapan1121 已提交
705 706
  }  
}
707 708

int32_t schedulerInit(SSchedulerCfg *cfg) {
D
dapan 已提交
709 710
  if (cfg) {
    schMgmt.cfg = *cfg;
D
dapan1121 已提交
711 712
  } else {
    schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
D
dapan 已提交
713 714
  }

D
dapan1121 已提交
715 716
  schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == schMgmt.jobs) {
D
dapan1121 已提交
717 718 719 720 721
    SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum);
  }

  schMgmt.schedulerId = 1; //TODO GENERATE A UUID
  
722 723 724 725
  return TSDB_CODE_SUCCESS;
}


D
dapan 已提交
726
int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) {
D
dapan1121 已提交
727
  if (NULL == transport || NULL == transport ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
D
dapan1121 已提交
728
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
729 730
  }

D
dapan1121 已提交
731 732 733 734
  if (taosArrayGetSize(qnodeList) <= 0) {
    qInfo("qnodeList is empty");
  }

D
dapan1121 已提交
735
  int32_t code = 0;
D
dapan 已提交
736
  SSchJob *job = calloc(1, sizeof(SSchJob));
737
  if (NULL == job) {
D
dapan1121 已提交
738
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
739 740
  }

D
dapan 已提交
741
  job->attr.syncSchedule = syncSchedule;
D
dapan1121 已提交
742 743
  job->transport = transport;
  job->qnodeList = qnodeList;
D
dapan 已提交
744

D
dapan1121 已提交
745
  SCH_ERR_JRET(schValidateAndBuildJob(pDag, job));
D
dapan1121 已提交
746

D
dapan 已提交
747 748 749 750 751 752 753 754 755 756 757
  job->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == job->execTasks) {
    qError("taosHashInit %d failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  job->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == job->succTasks) {
    qError("taosHashInit %d failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan 已提交
758

D
dapan1121 已提交
759 760 761 762 763 764
  job->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == job->failTasks) {
    qError("taosHashInit %d failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan 已提交
765
  tsem_init(&job->rspSem, 0, 0);
D
dapan1121 已提交
766 767 768 769 770 771 772 773 774 775

  code = taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      qError("taosHashPut queryId:%"PRIx64" already exist", job->queryId);
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
    } else {
      qError("taosHashPut queryId:%"PRIx64" failed", job->queryId);
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }
D
dapan1121 已提交
776 777
  }

D
dapan1121 已提交
778
  job->status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
779
  
D
dapan1121 已提交
780
  SCH_ERR_JRET(schLaunchJob(job));
781

D
dapan 已提交
782
  *(SSchJob **)pJob = job;
D
dapan1121 已提交
783

D
dapan 已提交
784
  if (syncSchedule) {
D
dapan1121 已提交
785 786
    tsem_wait(&job->rspSem);
  }
D
dapan1121 已提交
787

D
dapan1121 已提交
788
  return TSDB_CODE_SUCCESS;
789

D
dapan1121 已提交
790
_return:
791

D
dapan 已提交
792
  *(SSchJob **)pJob = NULL;
D
dapan1121 已提交
793 794 795
  scheduleFreeJob(job);
  
  SCH_RET(code);
796
}
D
dapan1121 已提交
797

D
dapan1121 已提交
798 799 800 801 802
int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows) {
  *numOfRows = 0;
  
  SCH_ERR_RET(scheduleExecJobImpl(transport, qnodeList, pDag, pJob, true));

D
dapan 已提交
803
  SSchJob *job = *(SSchJob **)pJob;
D
dapan1121 已提交
804 805 806 807 808 809 810 811 812 813 814
  
  *numOfRows = job->resNumOfRows;

  return TSDB_CODE_SUCCESS;
}

int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) {
  return scheduleExecJobImpl(transport, qnodeList, pDag, pJob, false);
}


D
dapan1121 已提交
815 816
int32_t scheduleFetchRows(void *pJob, void **data) {
  if (NULL == pJob || NULL == data) {
D
dapan1121 已提交
817
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan 已提交
818 819
  }

D
dapan 已提交
820
  SSchJob *job = pJob;
D
dapan 已提交
821
  int32_t code = 0;
D
dapan 已提交
822

D
dapan1121 已提交
823 824 825 826 827
  if (!job->attr.needFetch) {
    qError("no need to fetch data");
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

D
dapan 已提交
828 829 830 831 832
  if (job->status == JOB_TASK_STATUS_FAILED) {
    job->res = NULL;
    SCH_RET(job->errCode);
  }

D
dapan1121 已提交
833 834 835 836 837
  if (job->status == JOB_TASK_STATUS_SUCCEED) {
    job->res = NULL;
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
838 839
  if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) {
    qError("prior fetching not finished");
D
dapan1121 已提交
840
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan 已提交
841 842
  }

D
dapan1121 已提交
843
  if (job->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
D
dapan 已提交
844 845
    SCH_ERR_JRET(schFetchFromRemote(job));
  }
D
dapan 已提交
846 847 848

  tsem_wait(&job->rspSem);

D
dapan 已提交
849 850 851 852 853 854 855 856
  if (job->status == JOB_TASK_STATUS_FAILED) {
    code = job->errCode;
  }
  
  if (job->res && ((SRetrieveTableRsp *)job->res)->completed) {
    job->status = JOB_TASK_STATUS_SUCCEED;
  }

D
dapan 已提交
857
  *data = job->res;
D
dapan 已提交
858
  job->res = NULL;
D
dapan 已提交
859

D
dapan 已提交
860 861 862
_return:
  atomic_val_compare_exchange_32(&job->userFetch, 1, 0);

D
dapan1121 已提交
863
  SCH_RET(code);
D
dapan 已提交
864
}
D
dapan1121 已提交
865

D
dapan1121 已提交
866 867
int32_t scheduleCancelJob(void *pJob) {
  //TODO
D
dapan1121 已提交
868

D
dapan1121 已提交
869 870
  //TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST

D
dapan1121 已提交
871 872 873 874 875
  return TSDB_CODE_SUCCESS;
}

void scheduleFreeJob(void *pJob) {
  if (NULL == pJob) {
D
dapan 已提交
876 877
    return;
  }
D
dapan1121 已提交
878

D
dapan 已提交
879
  SSchJob *job = pJob;
D
dapan1121 已提交
880 881

  if (job->status > 0) {
D
dapan1121 已提交
882
    if (0 != taosHashRemove(schMgmt.jobs, &job->queryId, sizeof(job->queryId))) {
D
dapan1121 已提交
883 884 885 886
      qError("remove job:%"PRIx64"from mgmt failed", job->queryId); // maybe already freed
      return;
    }

D
dapan1121 已提交
887
    if (job->status == JOB_TASK_STATUS_EXECUTING) {
D
dapan1121 已提交
888 889
      scheduleCancelJob(pJob);
    }
D
dapan1121 已提交
890 891

    schDropJobAllTasks(job);
D
dapan1121 已提交
892 893 894
  }
  
  //TODO free job
D
dapan1121 已提交
895 896
}

897
void schedulerDestroy(void) {
D
dapan1121 已提交
898 899 900
  if (schMgmt.jobs) {
    taosHashCleanup(schMgmt.jobs); //TODO
    schMgmt.jobs = NULL;
901 902 903
  }
}

D
dapan1121 已提交
904