scheduler.c 25.3 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15
 */

D
dapan1121 已提交
16 17
#include "schedulerInt.h"
#include "taosmsg.h"
18
#include "query.h"
D
dapan 已提交
19
#include "catalog.h"
20 21

SSchedulerMgmt schMgmt = {0};
D
dapan1121 已提交
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53


int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_fn_t fp) {
/*
  SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_SQL_CONNECT);
  if (pRequest == NULL) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  SRequestMsgBody body = {0};
  buildConnectMsg(pRequest, &body);

  int64_t transporterId = 0;
  sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);

  tsem_wait(&pRequest->body.rspSem);
  destroyConnectMsg(&body);

  if (pRequest->code != TSDB_CODE_SUCCESS) {
    const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
    tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter);
    destroyRequest(pRequest);
  }
*/  
}

D
dapan 已提交
54
int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
D
dapan1121 已提交
55
  for (int32_t i = 0; i < job->levelNum; ++i) {
D
dapan 已提交
56
    SSchLevel *level = taosArrayGet(job->levels, i);
D
dapan 已提交
57
    
D
dapan1121 已提交
58
    for (int32_t m = 0; m < level->taskNum; ++m) {
D
dapan 已提交
59
      SSchTask *task = taosArrayGet(level->subTasks, m);
D
dapan1121 已提交
60
      SSubplan *plan = task->plan;
D
dapan1121 已提交
61 62
      int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0;
      int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0;
D
dapan1121 已提交
63 64

      if (childNum > 0) {
D
dapan 已提交
65 66
        task->children = taosArrayInit(childNum, POINTER_BYTES);
        if (NULL == task->children) {
D
dapan1121 已提交
67 68 69 70 71 72
          qError("taosArrayInit %d failed", childNum);
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      for (int32_t n = 0; n < childNum; ++n) {
D
dapan 已提交
73
        SSubplan **child = taosArrayGet(plan->pChildern, n);
D
dapan 已提交
74
        SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES);
D
dapan 已提交
75
        if (NULL == childTask || NULL == *childTask) {
D
dapan1121 已提交
76 77 78 79
          qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan 已提交
80
        if (NULL == taosArrayPush(task->children, childTask)) {
D
dapan1121 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94
          qError("taosArrayPush failed");
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      if (parentNum > 0) {
        task->parents = taosArrayInit(parentNum, POINTER_BYTES);
        if (NULL == task->parents) {
          qError("taosArrayInit %d failed", parentNum);
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      for (int32_t n = 0; n < parentNum; ++n) {
D
dapan 已提交
95
        SSubplan **parent = taosArrayGet(plan->pParents, n);
D
dapan 已提交
96
        SSchTask **parentTask = taosHashGet(planToTask, parent, POINTER_BYTES);
D
dapan 已提交
97
        if (NULL == parentTask || NULL == *parentTask) {
D
dapan1121 已提交
98 99 100 101
          qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan 已提交
102
        if (NULL == taosArrayPush(task->parents, parentTask)) {
D
dapan1121 已提交
103 104 105 106 107 108 109
          qError("taosArrayPush failed");
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }      
    }
  }

D
dapan 已提交
110 111
  SSchLevel *level = taosArrayGet(job->levels, 0);
  if (job->attr.queryJob && level->taskNum > 1) {
D
dapan 已提交
112 113 114 115
    qError("invalid plan info, level 0, taskNum:%d", level->taskNum);
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

D
dapan 已提交
116
  SSchTask *task = taosArrayGet(level->subTasks, 0);
D
dapan 已提交
117 118 119 120 121 122
  if (task->parents && taosArrayGetSize(task->parents) > 0) {
    qError("invalid plan info, level 0, parentNum:%d", (int32_t)taosArrayGetSize(task->parents));
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }


D
dapan1121 已提交
123 124 125 126
  return TSDB_CODE_SUCCESS;
}


D
dapan 已提交
127
int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
D
dapan1121 已提交
128
  int32_t code = 0;
D
dapan 已提交
129

D
dapan1121 已提交
130 131
  job->queryId = dag->queryId;
  
D
dapan 已提交
132 133 134 135 136
  if (dag->numOfSubplans <= 0) {
    qError("invalid subplan num:%d", dag->numOfSubplans);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
  
137 138 139
  int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans);
  if (levelNum <= 0) {
    qError("invalid level num:%d", levelNum);
D
dapan1121 已提交
140
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
141 142
  }

D
dapan1121 已提交
143 144 145 146 147 148
  SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == planToTask) {
    qError("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  
D
dapan 已提交
149
  job->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
150 151
  if (NULL == job->levels) {
    qError("taosArrayInit %d failed", levelNum);
D
dapan1121 已提交
152
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
153 154
  }

D
dapan1121 已提交
155 156
  job->attr.needFetch = true;
  
157 158 159 160 161
  job->levelNum = levelNum;
  job->levelIdx = levelNum - 1;

  job->subPlans = dag->pSubplans;

D
dapan 已提交
162
  SSchLevel level = {0};
163 164
  SArray *levelPlans = NULL;
  int32_t levelPlanNum = 0;
D
dapan 已提交
165
  SSchLevel *pLevel = NULL;
166

D
dapan1121 已提交
167
  level.status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
168

169
  for (int32_t i = 0; i < levelNum; ++i) {
D
dapan1121 已提交
170 171 172 173 174 175 176 177
    if (NULL == taosArrayPush(job->levels, &level)) {
      qError("taosArrayPush failed");
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    pLevel = taosArrayGet(job->levels, i);
  
    pLevel->level = i;
178 179 180
    levelPlans = taosArrayGetP(dag->pSubplans, i);
    if (NULL == levelPlans) {
      qError("no level plans for level %d", i);
D
dapan1121 已提交
181
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
182 183 184 185 186
    }

    levelPlanNum = (int32_t)taosArrayGetSize(levelPlans);
    if (levelPlanNum <= 0) {
      qError("invalid level plans number:%d, level:%d", levelPlanNum, i);
D
dapan1121 已提交
187
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
188 189
    }

D
dapan1121 已提交
190
    pLevel->taskNum = levelPlanNum;
D
dapan1121 已提交
191
    
D
dapan 已提交
192
    pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SSchTask));
D
dapan1121 已提交
193
    if (NULL == pLevel->subTasks) {
D
dapan1121 已提交
194
      qError("taosArrayInit %d failed", levelPlanNum);
D
dapan1121 已提交
195
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
196 197 198
    }
    
    for (int32_t n = 0; n < levelPlanNum; ++n) {
D
dapan1121 已提交
199
      SSubplan *plan = taosArrayGet(levelPlans, n);
D
dapan 已提交
200
      SSchTask task = {0};
D
dapan1121 已提交
201 202 203

      if (plan->type == QUERY_TYPE_MODIFY) {
        job->attr.needFetch = false;
D
dapan 已提交
204 205
      } else {
        job->attr.queryJob = true;
D
dapan1121 已提交
206
      }
D
dapan 已提交
207

D
dapan1121 已提交
208
      
D
dapan1121 已提交
209 210
      task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
      task.plan = plan;
D
dapan1121 已提交
211
      task.level = pLevel;
212
      task.status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
213

D
dapan1121 已提交
214
      void *p = taosArrayPush(pLevel->subTasks, &task);
D
dapan1121 已提交
215 216 217 218 219 220
      if (NULL == p) {
        qError("taosArrayPush failed");
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
      
      if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
D
dapan1121 已提交
221 222 223
        qError("taosHashPut failed");
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
224
    }
225 226 227

  }

D
dapan1121 已提交
228 229 230 231 232 233
  SCH_ERR_JRET(schBuildTaskRalation(job, planToTask));

  if (planToTask) {
    taosHashCleanup(planToTask);
  }
  
234
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
235 236

_return:
D
dapan1121 已提交
237 238
  if (pLevel->subTasks) {
    taosArrayDestroy(pLevel->subTasks);
D
dapan1121 已提交
239 240
  }

D
dapan1121 已提交
241 242 243 244
  if (planToTask) {
    taosHashCleanup(planToTask);
  }

D
dapan1121 已提交
245
  SCH_RET(code);
246 247
}

D
dapan 已提交
248
int32_t schSetTaskExecEpSet(SSchJob *job, SEpSet *epSet) {  
D
dapan 已提交
249
  if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) {
D
dapan 已提交
250 251 252
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
  int32_t qnodeNum = taosArrayGetSize(job->qnodeList);
  
  for (int32_t i = 0; i < qnodeNum && epSet->numOfEps < tListLen(epSet->port); ++i) {
    SEpAddr *addr = taosArrayGet(job->qnodeList, i);
    
    strncpy(epSet->fqdn[epSet->numOfEps], addr->fqdn, sizeof(addr->fqdn));
    epSet->port[epSet->numOfEps] = addr->port;
    
    ++epSet->numOfEps;
  }

  for (int32_t i = 0; i < job->dataSrcEps.numOfEps && epSet->numOfEps < tListLen(epSet->port); ++i) {
    strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
    epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
    
    ++epSet->numOfEps;
D
dapan 已提交
269
  }
D
dapan 已提交
270 271

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
272
}
D
dapan1121 已提交
273

D
dapan1121 已提交
274

D
dapan 已提交
275
int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) {
D
dapan 已提交
276 277 278 279 280 281 282 283
  if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
    qError("taosHashPut failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
284
int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) {
D
dapan 已提交
285 286 287 288 289
  if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
    qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
290 291 292 293 294 295 296 297 298 299
  if (0 != taosHashPut(job->succTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
    qError("taosHashPut failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
  
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
300
int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) {
D
dapan1121 已提交
301
  if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
D
dapan 已提交
302
    qWarn("remove task[%"PRIx64"] from execTasks failed, it may not exist", task->taskId);
D
dapan1121 已提交
303 304 305
  }

  if (0 != taosHashPut(job->failTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
D
dapan 已提交
306 307 308 309 310 311 312 313 314 315
    qError("taosHashPut failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
  
  return TSDB_CODE_SUCCESS;
}


D
dapan 已提交
316
int32_t schAsyncSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
D
dapan 已提交
317 318 319 320
  int32_t msgSize = 0;
  void *msg = NULL;
  
  switch (msgType) {
D
dapan1121 已提交
321 322 323 324 325 326 327 328 329 330
    case TSDB_MSG_TYPE_SUBMIT: {
      if (NULL == task->msg || task->msgLen <= 0) {
        qError("submit msg is NULL");
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      }

      msgSize = task->msgLen;
      msg = task->msg;
      break;
    }
D
dapan 已提交
331 332 333 334 335 336
    case TSDB_MSG_TYPE_QUERY: {
      if (NULL == task->msg) {
        qError("query msg is NULL");
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      }

D
dapan1121 已提交
337
      msgSize = sizeof(SSubQueryMsg) + task->msgLen;
D
dapan 已提交
338 339 340 341 342 343
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

D
dapan1121 已提交
344
      SSubQueryMsg *pMsg = msg;
D
dapan1121 已提交
345
      
D
dapan1121 已提交
346 347 348
      pMsg->schedulerId = htobe64(schMgmt.schedulerId);
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);
D
dapan1121 已提交
349 350
      pMsg->contentLen = htonl(task->msgLen);
      memcpy(pMsg->msg, task->msg, task->msgLen);
D
dapan 已提交
351
      break;
D
dapan1121 已提交
352
    }    
D
dapan1121 已提交
353
    case TSDB_MSG_TYPE_RES_READY: {
D
dapan1121 已提交
354
      msgSize = sizeof(SResReadyMsg);
D
dapan 已提交
355 356 357 358 359 360
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

D
dapan1121 已提交
361
      SResReadyMsg *pMsg = msg;
D
dapan1121 已提交
362
      pMsg->schedulerId = htobe64(schMgmt.schedulerId);      
D
dapan1121 已提交
363 364
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);      
D
dapan 已提交
365 366 367
      break;
    }
    case TSDB_MSG_TYPE_FETCH: {
D
dapan 已提交
368 369 370 371
      if (NULL == task) {
        SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      
D
dapan1121 已提交
372
      msgSize = sizeof(SResFetchMsg);
D
dapan 已提交
373 374 375 376 377 378
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
    
D
dapan1121 已提交
379
      SResFetchMsg *pMsg = msg;
D
dapan1121 已提交
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
      pMsg->schedulerId = htobe64(schMgmt.schedulerId);      
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);      
      break;
    }
    case TSDB_MSG_TYPE_DROP_TASK:{
      msgSize = sizeof(STaskDropMsg);
      msg = calloc(1, msgSize);
      if (NULL == msg) {
        qError("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
    
      STaskDropMsg *pMsg = msg;
      pMsg->schedulerId = htobe64(schMgmt.schedulerId);      
D
dapan1121 已提交
395 396
      pMsg->queryId = htobe64(job->queryId);
      pMsg->taskId = htobe64(task->taskId);      
D
dapan 已提交
397 398 399 400 401 402
      break;
    }
    default:
      qError("unknown msg type:%d", msgType);
      break;
  }
D
dapan1121 已提交
403

D
dapan 已提交
404
  //TODO SEND MSG
D
dapan1121 已提交
405
  //taosAsyncExec(rpcSendRequest(void * shandle, const SEpSet * pEpSet, SRpcMsg * pMsg, int64_t * pRid), p, &code);
D
dapan 已提交
406 407

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
408 409
}

D
dapan 已提交
410
int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) {
D
dapan1121 已提交
411 412
  // TODO set retry or not based on task type/errCode/retry times/job status/available eps...
  // TODO if needRetry, set task retry info
D
dapan 已提交
413

D
dapan1121 已提交
414 415 416
  *needRetry = false;

  return TSDB_CODE_SUCCESS;
D
dapan 已提交
417 418
}

D
dapan 已提交
419

D
dapan 已提交
420
int32_t schFetchFromRemote(SSchJob *job) {
D
dapan 已提交
421 422 423 424 425 426 427
  int32_t code = 0;
  
  if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) {
    qInfo("prior fetching not finished");
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
428
  SCH_ERR_JRET(schAsyncSendMsg(job, job->fetchTask, TSDB_MSG_TYPE_FETCH));
D
dapan 已提交
429 430 431 432 433

  return TSDB_CODE_SUCCESS;
  
_return:
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
D
dapan 已提交
434

D
dapan 已提交
435
  return code;
D
dapan 已提交
436 437
}

D
dapan 已提交
438

D
dapan 已提交
439
int32_t schProcessOnJobPartialSuccess(SSchJob *job) {
D
dapan1121 已提交
440 441
  job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;

D
dapan 已提交
442
  if ((!job->attr.needFetch) && job->attr.syncSchedule) {
D
dapan1121 已提交
443 444
    tsem_post(&job->rspSem);
  }
D
dapan1121 已提交
445
  
D
dapan 已提交
446 447 448
  if (job->userFetch) {
    SCH_ERR_RET(schFetchFromRemote(job));
  }
D
dapan 已提交
449

D
dapan 已提交
450
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
451 452
}

D
dapan 已提交
453
int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) {
D
dapan1121 已提交
454
  job->status = JOB_TASK_STATUS_FAILED;
D
dapan 已提交
455
  job->errCode = errCode;
D
dapan1121 已提交
456

D
dapan 已提交
457 458
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);

D
dapan 已提交
459
  if (job->userFetch || ((!job->attr.needFetch) && job->attr.syncSchedule)) {
D
dapan 已提交
460 461
    tsem_post(&job->rspSem);
  }
D
dapan 已提交
462

D
dapan 已提交
463 464 465
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
466
int32_t schProcessOnDataFetched(SSchJob *job) {
D
dapan 已提交
467 468 469
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);

  tsem_post(&job->rspSem);
D
dapan 已提交
470 471 472
}


D
dapan 已提交
473
int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
D
dapan 已提交
474 475 476 477
  bool moved = false;
  
  SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved));
  if (!moved) {
D
dapan 已提交
478
    SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
D
dapan 已提交
479 480
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
481

D
dapan1121 已提交
482
  task->status = JOB_TASK_STATUS_SUCCEED;
D
dapan 已提交
483
  
D
dapan 已提交
484
  int32_t parentNum = task->parents ? (int32_t)taosArrayGetSize(task->parents) : 0;
D
dapan 已提交
485 486 487 488 489 490
  if (parentNum == 0) {
    if (task->plan->level != 0) {
      qError("level error");
      SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
    }

D
dapan1121 已提交
491 492 493 494
    int32_t taskDone = 0;
    
    if (SCH_TASK_NEED_WAIT_ALL(task)) {
      SCH_LOCK(SCH_WRITE, &task->level->lock);
D
dapan 已提交
495
      task->level->taskSucceed++;
D
dapan1121 已提交
496 497 498 499 500 501 502 503 504 505
      taskDone = task->level->taskSucceed + task->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &task->level->lock);
      
      if (taskDone < task->level->taskNum) {
        qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum);
        return TSDB_CODE_SUCCESS;
      }

      if (task->level->taskFailed > 0) {
        job->status = JOB_TASK_STATUS_FAILED;
D
dapan 已提交
506
        SCH_ERR_RET(schProcessOnJobFailure(job, TSDB_CODE_QRY_APP_ERROR));
D
dapan1121 已提交
507 508 509 510 511 512 513

        return TSDB_CODE_SUCCESS;
      }
    } else {
      strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn));
      job->resEp.port = task->execAddr.port;
    }
D
dapan 已提交
514

D
dapan 已提交
515 516
    job->fetchTask = task;
    
D
dapan1121 已提交
517
    SCH_ERR_RET(schProcessOnJobPartialSuccess(job));
D
dapan 已提交
518 519 520 521

    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
522
  if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) {
D
dapan 已提交
523 524 525 526 527 528
    strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
    job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;

    ++job->dataSrcEps.numOfEps;
  }

D
dapan 已提交
529
  for (int32_t i = 0; i < parentNum; ++i) {
D
dapan 已提交
530
    SSchTask *par = *(SSchTask **)taosArrayGet(task->parents, i);
D
dapan 已提交
531 532 533

    ++par->childReady;

D
dapan 已提交
534
    SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, task->plan->id.templateId, &task->execAddr));
D
dapan 已提交
535 536
    
    if (SCH_TASK_READY_TO_LUNCH(par)) {
D
dapan 已提交
537
      SCH_ERR_RET(schLaunchTask(job, par));
D
dapan 已提交
538 539 540 541 542 543
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
544
int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
D
dapan 已提交
545
  bool needRetry = false;
D
dapan1121 已提交
546 547
  bool moved = false;
  int32_t taskDone = 0;
D
dapan 已提交
548 549 550 551
  SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry));
  
  if (!needRetry) {
    SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode);
D
dapan1121 已提交
552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568

    SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved));
    if (!moved) {
      SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
    }    
    
    if (SCH_TASK_NEED_WAIT_ALL(task)) {
      SCH_LOCK(SCH_WRITE, &task->level->lock);
      task->level->taskFailed++;
      taskDone = task->level->taskSucceed + task->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &task->level->lock);
      
      if (taskDone < task->level->taskNum) {
        qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum);
        return TSDB_CODE_SUCCESS;
      }
    }
D
dapan 已提交
569
    
D
dapan1121 已提交
570
    job->status = JOB_TASK_STATUS_FAILED;
D
dapan 已提交
571
    SCH_ERR_RET(schProcessOnJobFailure(job, errCode));
D
dapan 已提交
572 573 574 575

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
576
  SCH_ERR_RET(schLaunchTask(job, task));
D
dapan 已提交
577 578 579 580

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
581
int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
D
dapan1121 已提交
582 583 584
  int32_t code = 0;
  
  switch (msgType) {
D
dapan1121 已提交
585 586 587 588 589
    case TSDB_MSG_TYPE_SUBMIT: {
        SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
        if (rsp->code != TSDB_CODE_SUCCESS) {
          SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
        } else {
D
dapan 已提交
590
          job->resNumOfRows += rsp->affectedRows;
D
dapan1121 已提交
591 592 593 594 595
          
          code = schProcessOnTaskSuccess(job, task);
          if (code) {
            goto _task_error;
          }               
D
dapan1121 已提交
596
        }
D
dapan1121 已提交
597
        break;
D
dapan1121 已提交
598
      }
D
dapan1121 已提交
599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633
    case TSDB_MSG_TYPE_QUERY: {
        SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
        
        if (rsp->code != TSDB_CODE_SUCCESS) {
          SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
        } else {
          code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RES_READY);
          if (code) {
            goto _task_error;
          }
        }
        break;
      }
    case TSDB_MSG_TYPE_RES_READY: {
        SResReadyRsp *rsp = (SResReadyRsp *)msg;
        
        if (rsp->code != TSDB_CODE_SUCCESS) {
          SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
        } else {
          code = schProcessOnTaskSuccess(job, task);
          if (code) {
            goto _task_error;
          }        
        }
        break;
      }
    case TSDB_MSG_TYPE_FETCH: {        
        SCH_ERR_JRET(rspCode);
        SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;

        job->res = rsp;
        job->resNumOfRows = rsp->numOfRows;
        
        SCH_ERR_JRET(schProcessOnDataFetched(job));
        break;
D
dapan1121 已提交
634 635 636 637 638 639 640 641 642 643 644 645 646
      }
    default:
      qError("unknown msg type:%d received", msgType);
      return TSDB_CODE_QRY_INVALID_INPUT;
  }

  return TSDB_CODE_SUCCESS;

_task_error:
  SCH_ERR_JRET(schProcessOnTaskFailure(job, task, code));
  return TSDB_CODE_SUCCESS;

_return:
D
dapan 已提交
647
  code = schProcessOnJobFailure(job, code);
D
dapan1121 已提交
648 649 650
  return code;
}

D
dapan 已提交
651

D
dapan1121 已提交
652 653


D
dapan 已提交
654
int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
D
dapan1121 已提交
655
  SSubplan *plan = task->plan;
D
dapan1121 已提交
656
  SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen));
D
dapan 已提交
657
  if (plan->execEpSet.numOfEps <= 0) {
D
dapan1121 已提交
658 659 660 661 662 663
    SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet));
  }

  if (plan->execEpSet.numOfEps <= 0) {
    SCH_TASK_ERR_LOG("invalid execEpSet num:%d", plan->execEpSet.numOfEps);
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
D
dapan1121 已提交
664
  }
D
dapan1121 已提交
665 666

  int32_t msgType = (plan->type == QUERY_TYPE_MODIFY) ? TSDB_MSG_TYPE_SUBMIT : TSDB_MSG_TYPE_QUERY;
D
dapan 已提交
667
  
D
dapan1121 已提交
668
  SCH_ERR_RET(schAsyncSendMsg(job, task, msgType));
D
dapan 已提交
669

D
dapan 已提交
670
  SCH_ERR_RET(schPushTaskToExecList(job, task));
D
dapan1121 已提交
671

D
dapan1121 已提交
672
  task->status = JOB_TASK_STATUS_EXECUTING;
D
dapan1121 已提交
673

D
dapan1121 已提交
674 675 676
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
677 678
int32_t schLaunchJob(SSchJob *job) {
  SSchLevel *level = taosArrayGet(job->levels, job->levelIdx);
D
dapan 已提交
679
  for (int32_t i = 0; i < level->taskNum; ++i) {
D
dapan 已提交
680
    SSchTask *task = taosArrayGet(level->subTasks, i);
D
dapan1121 已提交
681
    SCH_ERR_RET(schLaunchTask(job, task));
D
dapan1121 已提交
682
  }
D
dapan1121 已提交
683

D
dapan1121 已提交
684
  job->status = JOB_TASK_STATUS_EXECUTING;
D
dapan 已提交
685
  
D
dapan1121 已提交
686
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
687 688
}

D
dapan 已提交
689
void schDropJobAllTasks(SSchJob *job) {
D
dapan1121 已提交
690 691
  void *pIter = taosHashIterate(job->succTasks, NULL);
  while (pIter) {
D
dapan 已提交
692
    SSchTask *task = *(SSchTask **)pIter;
D
dapan1121 已提交
693
  
D
dapan1121 已提交
694
    schAsyncSendMsg(job, task, TSDB_MSG_TYPE_DROP_TASK);
D
dapan1121 已提交
695
    
D
dapan1121 已提交
696 697 698 699 700
    pIter = taosHashIterate(job->succTasks, pIter);
  }  

  pIter = taosHashIterate(job->failTasks, NULL);
  while (pIter) {
D
dapan 已提交
701
    SSchTask *task = *(SSchTask **)pIter;
D
dapan1121 已提交
702 703 704 705
  
    schAsyncSendMsg(job, task, TSDB_MSG_TYPE_DROP_TASK);
    
    pIter = taosHashIterate(job->succTasks, pIter);
D
dapan1121 已提交
706 707
  }  
}
708 709

int32_t schedulerInit(SSchedulerCfg *cfg) {
D
dapan 已提交
710 711
  if (cfg) {
    schMgmt.cfg = *cfg;
D
dapan1121 已提交
712 713
  } else {
    schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
D
dapan 已提交
714 715
  }

D
dapan1121 已提交
716 717
  schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == schMgmt.jobs) {
D
dapan1121 已提交
718 719 720 721 722
    SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum);
  }

  schMgmt.schedulerId = 1; //TODO GENERATE A UUID
  
723 724 725 726
  return TSDB_CODE_SUCCESS;
}


D
dapan 已提交
727
int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) {
D
dapan1121 已提交
728
  if (NULL == transport || NULL == transport ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
D
dapan1121 已提交
729
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
730 731
  }

D
dapan1121 已提交
732 733 734 735
  if (taosArrayGetSize(qnodeList) <= 0) {
    qInfo("qnodeList is empty");
  }

D
dapan1121 已提交
736
  int32_t code = 0;
D
dapan 已提交
737
  SSchJob *job = calloc(1, sizeof(SSchJob));
738
  if (NULL == job) {
D
dapan1121 已提交
739
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
740 741
  }

D
dapan 已提交
742
  job->attr.syncSchedule = syncSchedule;
D
dapan1121 已提交
743 744
  job->transport = transport;
  job->qnodeList = qnodeList;
D
dapan 已提交
745

D
dapan1121 已提交
746
  SCH_ERR_JRET(schValidateAndBuildJob(pDag, job));
D
dapan1121 已提交
747

D
dapan 已提交
748 749 750 751 752 753 754 755 756 757 758
  job->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == job->execTasks) {
    qError("taosHashInit %d failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  job->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == job->succTasks) {
    qError("taosHashInit %d failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan 已提交
759

D
dapan1121 已提交
760 761 762 763 764 765
  job->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == job->failTasks) {
    qError("taosHashInit %d failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan 已提交
766
  tsem_init(&job->rspSem, 0, 0);
D
dapan1121 已提交
767 768 769 770 771 772 773 774 775 776

  code = taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      qError("taosHashPut queryId:%"PRIx64" already exist", job->queryId);
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
    } else {
      qError("taosHashPut queryId:%"PRIx64" failed", job->queryId);
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }
D
dapan1121 已提交
777 778
  }

D
dapan1121 已提交
779
  job->status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
780
  
D
dapan1121 已提交
781
  SCH_ERR_JRET(schLaunchJob(job));
782

D
dapan 已提交
783
  *(SSchJob **)pJob = job;
D
dapan1121 已提交
784

D
dapan 已提交
785
  if (syncSchedule) {
D
dapan1121 已提交
786 787 788
    tsem_wait(&job->rspSem);
  }

D
dapan1121 已提交
789
  return TSDB_CODE_SUCCESS;
790

D
dapan1121 已提交
791
_return:
792

D
dapan 已提交
793
  *(SSchJob **)pJob = NULL;
D
dapan1121 已提交
794 795 796
  scheduleFreeJob(job);
  
  SCH_RET(code);
797
}
D
dapan1121 已提交
798

D
dapan1121 已提交
799 800 801 802 803
int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows) {
  *numOfRows = 0;
  
  SCH_ERR_RET(scheduleExecJobImpl(transport, qnodeList, pDag, pJob, true));

D
dapan 已提交
804
  SSchJob *job = *(SSchJob **)pJob;
D
dapan1121 已提交
805 806 807 808 809 810 811 812 813 814 815
  
  *numOfRows = job->resNumOfRows;

  return TSDB_CODE_SUCCESS;
}

int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) {
  return scheduleExecJobImpl(transport, qnodeList, pDag, pJob, false);
}


D
dapan1121 已提交
816 817
int32_t scheduleFetchRows(void *pJob, void **data) {
  if (NULL == pJob || NULL == data) {
D
dapan1121 已提交
818
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan 已提交
819 820
  }

D
dapan 已提交
821
  SSchJob *job = pJob;
D
dapan 已提交
822
  int32_t code = 0;
D
dapan 已提交
823

D
dapan1121 已提交
824 825 826 827 828
  if (!job->attr.needFetch) {
    qError("no need to fetch data");
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

D
dapan 已提交
829 830 831 832 833
  if (job->status == JOB_TASK_STATUS_FAILED) {
    job->res = NULL;
    SCH_RET(job->errCode);
  }

D
dapan1121 已提交
834 835 836 837 838
  if (job->status == JOB_TASK_STATUS_SUCCEED) {
    job->res = NULL;
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
839 840
  if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) {
    qError("prior fetching not finished");
D
dapan1121 已提交
841
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan 已提交
842 843
  }

D
dapan1121 已提交
844
  if (job->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
D
dapan 已提交
845 846
    SCH_ERR_JRET(schFetchFromRemote(job));
  }
D
dapan 已提交
847 848 849

  tsem_wait(&job->rspSem);

D
dapan 已提交
850 851 852 853 854 855 856 857
  if (job->status == JOB_TASK_STATUS_FAILED) {
    code = job->errCode;
  }
  
  if (job->res && ((SRetrieveTableRsp *)job->res)->completed) {
    job->status = JOB_TASK_STATUS_SUCCEED;
  }

D
dapan 已提交
858
  *data = job->res;
D
dapan 已提交
859
  job->res = NULL;
D
dapan 已提交
860

D
dapan 已提交
861 862 863
_return:
  atomic_val_compare_exchange_32(&job->userFetch, 1, 0);

D
dapan1121 已提交
864
  SCH_RET(code);
D
dapan 已提交
865
}
D
dapan1121 已提交
866

D
dapan1121 已提交
867 868
int32_t scheduleCancelJob(void *pJob) {
  //TODO
D
dapan1121 已提交
869

D
dapan1121 已提交
870 871
  //TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST

D
dapan1121 已提交
872 873 874 875 876
  return TSDB_CODE_SUCCESS;
}

void scheduleFreeJob(void *pJob) {
  if (NULL == pJob) {
D
dapan 已提交
877 878
    return;
  }
D
dapan1121 已提交
879

D
dapan 已提交
880
  SSchJob *job = pJob;
D
dapan1121 已提交
881 882

  if (job->status > 0) {
D
dapan1121 已提交
883
    if (0 != taosHashRemove(schMgmt.jobs, &job->queryId, sizeof(job->queryId))) {
D
dapan1121 已提交
884 885 886 887
      qError("remove job:%"PRIx64"from mgmt failed", job->queryId); // maybe already freed
      return;
    }

D
dapan1121 已提交
888
    if (job->status == JOB_TASK_STATUS_EXECUTING) {
D
dapan1121 已提交
889 890
      scheduleCancelJob(pJob);
    }
D
dapan1121 已提交
891 892

    schDropJobAllTasks(job);
D
dapan1121 已提交
893 894 895
  }
  
  //TODO free job
D
dapan1121 已提交
896 897
}

898
void schedulerDestroy(void) {
D
dapan1121 已提交
899 900 901
  if (schMgmt.jobs) {
    taosHashCleanup(schMgmt.jobs); //TODO
    schMgmt.jobs = NULL;
902 903 904
  }
}

D
dapan1121 已提交
905