scheduler.c 82.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15
 */

L
Liu Jicong 已提交
16 17
#include "catalog.h"
#include "query.h"
D
dapan1121 已提交
18
#include "schedulerInt.h"
H
Hongze Cheng 已提交
19
#include "tmsg.h"
D
dapan1121 已提交
20
#include "tref.h"
D
dapan1121 已提交
21
#include "trpc.h"
D
dapan1121 已提交
22
#include "command.h"
23

D
dapan1121 已提交
24 25
SSchedulerMgmt schMgmt = {0};

L
Liu Jicong 已提交
26
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); }
D
dapan1121 已提交
27

L
Liu Jicong 已提交
28
FORCE_INLINE int32_t schReleaseJob(int64_t refId) { return taosReleaseRef(schMgmt.jobRef, refId); }
D
dapan1121 已提交
29

L
Liu Jicong 已提交
30
uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }
D
dapan1121 已提交
31

L
Liu Jicong 已提交
32
#if 0
D
dapan1121 已提交
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
uint64_t schGenUUID(void) {
  static uint64_t hashId = 0;
  static int32_t requestSerialId = 0;

  if (hashId == 0) {
    char    uid[64];
    int32_t code = taosGetSystemUUID(uid, tListLen(uid));
    if (code != TSDB_CODE_SUCCESS) {
      qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    } else {
      hashId = MurmurHash3_32(uid, strlen(uid));
    }
  }

  int64_t ts      = taosGetTimestampMs();
  uint64_t pid    = taosGetPId();
  int32_t val     = atomic_add_fetch_32(&requestSerialId, 1);

  uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
  return id;
}
L
Liu Jicong 已提交
54
#endif
D
dapan1121 已提交
55

L
Liu Jicong 已提交
56 57 58
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
  pTask->plan = pPlan;
  pTask->level = pLevel;
D
dapan1121 已提交
59
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
D
dapan1121 已提交
60
  pTask->taskId = schGenTaskId();
D
dapan1121 已提交
61 62 63
  pTask->execNodes = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SSchNodeInfo));
  if (NULL == pTask->execNodes) {
    SCH_TASK_ELOG("taosArrayInit %d execNodes failed", SCH_MAX_CANDIDATE_EP_NUM);
D
dapan1121 已提交
64 65 66 67 68 69
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql,
                              int64_t startTs, bool syncSchedule) {
  int32_t code = 0;
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
    qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->attr.explainMode = pDag->explainInfo.mode;
  pJob->attr.syncSchedule = syncSchedule;
  pJob->transport = transport;
  pJob->sql = sql;

  if (pNodeList != NULL) {
    pJob->nodeList = taosArrayDup(pNodeList);
  }

  SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_JRET(qExecExplainBegin(pDag, &pJob->explainCtx, startTs));
  }

  pJob->execTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->execTasks) {
    SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->succTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->succTasks) {
    SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->failTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->failTasks) {
    SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  tsem_init(&pJob->rspSem, 0, 0);

  int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
  if (refId < 0) {
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
    SCH_ERR_JRET(terrno);
  }

  if (NULL == schAcquireJob(refId)) {
    SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
    SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  pJob->refId = refId;

  SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);

  pJob->status = JOB_TASK_STATUS_NOT_START;

  *pSchJob = pJob;

  return TSDB_CODE_SUCCESS;

_return:

  schFreeJobImpl(pJob);
  SCH_RET(code);
}


D
dapan1121 已提交
145 146 147 148 149 150 151 152
void schFreeRpcCtx(SRpcCtx *pCtx) {
  if (NULL == pCtx) {
    return;
  }
  void *pIter = taosHashIterate(pCtx->args, NULL);
  while (pIter) {
    SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter;

D
dapan1121 已提交
153
    (*ctxVal->freeFunc)(ctxVal->val);
L
Liu Jicong 已提交
154

D
dapan1121 已提交
155 156
    pIter = taosHashIterate(pCtx->args, pIter);
  }
L
Liu Jicong 已提交
157

D
dapan1121 已提交
158
  taosHashCleanup(pCtx->args);
D
dapan1121 已提交
159

D
dapan1121 已提交
160 161
  if (pCtx->brokenVal.freeFunc) {
    (*pCtx->brokenVal.freeFunc)(pCtx->brokenVal.val);
D
dapan1121 已提交
162
  }
D
dapan1121 已提交
163 164
}

L
Liu Jicong 已提交
165
void schFreeTask(SSchTask *pTask) {
D
dapan1121 已提交
166 167 168 169
  if (pTask->candidateAddrs) {
    taosArrayDestroy(pTask->candidateAddrs);
  }

wafwerar's avatar
wafwerar 已提交
170
  taosMemoryFreeClear(pTask->msg);
D
dapan1121 已提交
171 172 173 174 175 176 177 178

  if (pTask->children) {
    taosArrayDestroy(pTask->children);
  }

  if (pTask->parents) {
    taosArrayDestroy(pTask->parents);
  }
H
Haojun Liao 已提交
179

D
dapan1121 已提交
180 181
  if (pTask->execNodes) {
    taosArrayDestroy(pTask->execNodes);
H
Haojun Liao 已提交
182
  }
D
dapan1121 已提交
183 184
}

D
dapan1121 已提交
185 186 187 188 189 190
static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (pStatus) {
    *pStatus = status;
  }

L
Liu Jicong 已提交
191 192 193
  return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED ||
          status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING ||
          status == JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
194 195
}

D
dapan1121 已提交
196
int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
D
dapan1121 已提交
197
  int32_t lastMsgType = SCH_GET_TASK_LASTMSG_TYPE(pTask);
D
dapan1121 已提交
198
  int32_t taskStatus = SCH_GET_TASK_STATUS(pTask);
D
dapan1121 已提交
199
  int32_t reqMsgType = msgType - 1;
D
dapan1121 已提交
200
  switch (msgType) {
D
dapan1121 已提交
201
    case TDMT_SCH_LINK_BROKEN:
D
dapan1121 已提交
202
    case TDMT_VND_EXPLAIN_RSP:
D
dapan1121 已提交
203
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
204
    case TDMT_VND_QUERY_RSP:  // query_rsp may be processed later than ready_rsp
L
Liu Jicong 已提交
205 206 207
      if (lastMsgType != reqMsgType && -1 != lastMsgType && TDMT_VND_FETCH != lastMsgType) {
        SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
208
      }
L
Liu Jicong 已提交
209

D
dapan1121 已提交
210
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
211 212
        SCH_TASK_DLOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
213
      }
L
Liu Jicong 已提交
214

D
dapan1121 已提交
215
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan1121 已提交
216
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
217 218
    case TDMT_VND_RES_READY_RSP:
      reqMsgType = TDMT_VND_QUERY;
D
dapan1121 已提交
219
      if (lastMsgType != reqMsgType && -1 != lastMsgType) {
L
Liu Jicong 已提交
220 221
        SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s",
                      (lastMsgType > 0 ? TMSG_INFO(lastMsgType) : "null"), TMSG_INFO(msgType));
D
dapan1121 已提交
222 223
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
224

D
dapan1121 已提交
225
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
226 227
        SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
228 229 230
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }

D
dapan1121 已提交
231 232 233
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
      return TSDB_CODE_SUCCESS;
    case TDMT_VND_FETCH_RSP:
L
Liu Jicong 已提交
234 235 236
      if (lastMsgType != reqMsgType && -1 != lastMsgType) {
        SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
237 238
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
239

D
dapan1121 已提交
240
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
241 242
        SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
243 244
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
245

D
dapan1121 已提交
246 247
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
248 249
    case TDMT_VND_CREATE_TABLE_RSP:
    case TDMT_VND_SUBMIT_RSP:
D
dapan1121 已提交
250 251
      break;
    default:
D
dapan1121 已提交
252
      SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus));
D
dapan1121 已提交
253 254 255
      SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
256
  if (lastMsgType != reqMsgType) {
L
Liu Jicong 已提交
257 258
    SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                  TMSG_INFO(msgType));
D
dapan1121 已提交
259 260
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
L
Liu Jicong 已提交
261

D
dapan1121 已提交
262
  if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
263 264
    SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                  TMSG_INFO(msgType));
D
dapan1121 已提交
265 266 267
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

D
dapan1121 已提交
268
  SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan 已提交
269

D
dapan1121 已提交
270 271 272 273
  return TSDB_CODE_SUCCESS;
}

int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
D
dapan 已提交
274 275
  int32_t code = 0;

D
dapan1121 已提交
276
  int8_t oriStatus = 0;
D
dapan1121 已提交
277

D
dapan1121 已提交
278 279 280 281 282 283
  while (true) {
    oriStatus = SCH_GET_JOB_STATUS(pJob);

    if (oriStatus == newStatus) {
      SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
L
Liu Jicong 已提交
284

D
dapan1121 已提交
285 286 287 288 289
    switch (oriStatus) {
      case JOB_TASK_STATUS_NULL:
        if (newStatus != JOB_TASK_STATUS_NOT_START) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
290

D
dapan1121 已提交
291 292 293 294 295
        break;
      case JOB_TASK_STATUS_NOT_START:
        if (newStatus != JOB_TASK_STATUS_EXECUTING) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
296

D
dapan1121 已提交
297 298
        break;
      case JOB_TASK_STATUS_EXECUTING:
L
Liu Jicong 已提交
299 300 301
        if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED && newStatus != JOB_TASK_STATUS_FAILED &&
            newStatus != JOB_TASK_STATUS_CANCELLING && newStatus != JOB_TASK_STATUS_CANCELLED &&
            newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
302 303
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
304

D
dapan1121 已提交
305 306
        break;
      case JOB_TASK_STATUS_PARTIAL_SUCCEED:
L
Liu Jicong 已提交
307 308
        if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_SUCCEED &&
            newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
309 310
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
311

D
dapan1121 已提交
312 313 314 315 316 317 318
        break;
      case JOB_TASK_STATUS_SUCCEED:
      case JOB_TASK_STATUS_FAILED:
      case JOB_TASK_STATUS_CANCELLING:
        if (newStatus != JOB_TASK_STATUS_DROPPING) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
319

D
dapan1121 已提交
320 321 322
        break;
      case JOB_TASK_STATUS_CANCELLED:
      case JOB_TASK_STATUS_DROPPING:
D
dapan1121 已提交
323
        SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
324
        break;
L
Liu Jicong 已提交
325

D
dapan1121 已提交
326
      default:
D
dapan1121 已提交
327
        SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));
D
dapan 已提交
328
        SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
329 330 331 332 333
    }

    if (oriStatus != atomic_val_compare_exchange_8(&pJob->status, oriStatus, newStatus)) {
      continue;
    }
D
dapan 已提交
334

D
dapan1121 已提交
335
    SCH_JOB_DLOG("job status updated from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan1121 已提交
336

D
dapan1121 已提交
337 338
    break;
  }
D
dapan1121 已提交
339

D
dapan 已提交
340 341 342 343
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
344
  SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan 已提交
345
  SCH_ERR_RET(code);
346
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
347 348
}

D
dapan1121 已提交
349
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
D
dapan1121 已提交
350 351
  for (int32_t i = 0; i < pJob->levelNum; ++i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
L
Liu Jicong 已提交
352

D
dapan1121 已提交
353 354 355
    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
      SSubplan *pPlan = pTask->plan;
L
Liu Jicong 已提交
356 357
      int32_t   childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0;
      int32_t   parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0;
D
dapan1121 已提交
358 359

      if (childNum > 0) {
D
dapan1121 已提交
360 361 362 363
        if (pJob->levelIdx == pLevel->level) {
          SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
L
Liu Jicong 已提交
364

D
dapan1121 已提交
365 366 367
        pTask->children = taosArrayInit(childNum, POINTER_BYTES);
        if (NULL == pTask->children) {
          SCH_TASK_ELOG("taosArrayInit %d children failed", childNum);
D
dapan1121 已提交
368 369 370 371 372
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      for (int32_t n = 0; n < childNum; ++n) {
L
Liu Jicong 已提交
373
        SSubplan  *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
X
Xiaoyu Wang 已提交
374
        SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
D
dapan 已提交
375
        if (NULL == childTask || NULL == *childTask) {
D
dapan1121 已提交
376
          SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
377 378 379
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan1121 已提交
380 381
        if (NULL == taosArrayPush(pTask->children, childTask)) {
          SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
382 383 384 385 386
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      if (parentNum > 0) {
D
dapan1121 已提交
387 388 389 390
        if (0 == pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
L
Liu Jicong 已提交
391

D
dapan1121 已提交
392 393 394
        pTask->parents = taosArrayInit(parentNum, POINTER_BYTES);
        if (NULL == pTask->parents) {
          SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
D
dapan1121 已提交
395 396
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
D
dapan1121 已提交
397 398 399 400 401
      } else {
        if (0 != pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:%d, parentNum:%d", pLevel->level, parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
D
dapan1121 已提交
402 403 404
      }

      for (int32_t n = 0; n < parentNum; ++n) {
L
Liu Jicong 已提交
405
        SSubplan  *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
X
Xiaoyu Wang 已提交
406
        SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
D
dapan 已提交
407
        if (NULL == parentTask || NULL == *parentTask) {
D
dapan1121 已提交
408
          SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
409 410 411
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan1121 已提交
412 413
        if (NULL == taosArrayPush(pTask->parents, parentTask)) {
          SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
414 415
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
L
Liu Jicong 已提交
416
      }
D
dapan1121 已提交
417 418

      SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
D
dapan1121 已提交
419 420 421
    }
  }

D
dapan1121 已提交
422
  SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
D
dapan1121 已提交
423
  if (SCH_IS_QUERY_JOB(pJob) && pLevel->taskNum > 1) {
D
dapan1121 已提交
424
    SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
D
dapan 已提交
425 426 427
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

D
dapan1121 已提交
428 429 430
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
431
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
432
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
D
dapan1121 已提交
433
  if (NULL == addr) {
L
Liu Jicong 已提交
434 435
    SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
                  (int32_t)taosArrayGetSize(pTask->candidateAddrs));
D
dapan1121 已提交
436 437
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
D
dapan1121 已提交
438 439

  pTask->succeedAddr = *addr;
440

D
dapan1121 已提交
441
  return TSDB_CODE_SUCCESS;
442 443
}

D
dapan1121 已提交
444 445
int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle) {
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = handle};
L
Liu Jicong 已提交
446

D
dapan1121 已提交
447 448
  if (NULL == taosArrayPush(pTask->execNodes, &nodeInfo)) {
    SCH_TASK_ELOG("taosArrayPush nodeInfo to execNodes list failed, errno:%d", errno);
D
dapan1121 已提交
449 450 451
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
452 453
  SCH_TASK_DLOG("task execNode recorded, handle:%p", handle);

D
dapan1121 已提交
454
  return TSDB_CODE_SUCCESS;
455
}
D
dapan1121 已提交
456

X
Xiaoyu Wang 已提交
457
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
D
dapan1121 已提交
458
  int32_t code = 0;
D
dapan1121 已提交
459
  pJob->queryId = pDag->queryId;
L
Liu Jicong 已提交
460

D
dapan1121 已提交
461 462
  if (pDag->numOfSubplans <= 0) {
    SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
D
dapan 已提交
463 464
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
L
Liu Jicong 已提交
465

X
Xiaoyu Wang 已提交
466
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
467
  if (levelNum <= 0) {
D
dapan1121 已提交
468
    SCH_JOB_ELOG("invalid level num:%d", levelNum);
D
dapan1121 已提交
469
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
470 471
  }

L
Liu Jicong 已提交
472 473 474 475
  SHashObj *planToTask = taosHashInit(
      SCHEDULE_DEFAULT_MAX_TASK_NUM,
      taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
      HASH_NO_LOCK);
D
dapan1121 已提交
476
  if (NULL == planToTask) {
D
dapan1121 已提交
477
    SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
D
dapan1121 已提交
478 479
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
480

D
dapan1121 已提交
481 482
  pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
  if (NULL == pJob->levels) {
D
dapan1121 已提交
483
    SCH_JOB_ELOG("taosArrayInit %d failed", levelNum);
D
dapan1121 已提交
484
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
485 486
  }

D
dapan1121 已提交
487 488
  pJob->levelNum = levelNum;
  pJob->levelIdx = levelNum - 1;
489

D
dapan1121 已提交
490
  pJob->subPlans = pDag->pSubplans;
491

L
Liu Jicong 已提交
492
  SSchLevel      level = {0};
X
Xiaoyu Wang 已提交
493
  SNodeListNode *plans = NULL;
L
Liu Jicong 已提交
494 495
  int32_t        taskNum = 0;
  SSchLevel     *pLevel = NULL;
496

D
dapan1121 已提交
497
  level.status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
498

499
  for (int32_t i = 0; i < levelNum; ++i) {
D
dapan1121 已提交
500
    if (NULL == taosArrayPush(pJob->levels, &level)) {
D
dapan1121 已提交
501
      SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i);
D
dapan1121 已提交
502 503 504
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

D
dapan1121 已提交
505
    pLevel = taosArrayGet(pJob->levels, i);
D
dapan1121 已提交
506
    pLevel->level = i;
L
Liu Jicong 已提交
507 508

    plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
D
dapan1121 已提交
509 510
    if (NULL == plans) {
      SCH_JOB_ELOG("empty level plan, level:%d", i);
D
dapan1121 已提交
511
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
512 513
    }

X
Xiaoyu Wang 已提交
514
    taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
D
dapan1121 已提交
515 516
    if (taskNum <= 0) {
      SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
D
dapan1121 已提交
517
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
518 519
    }

D
dapan1121 已提交
520
    pLevel->taskNum = taskNum;
L
Liu Jicong 已提交
521

D
dapan1121 已提交
522
    pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
D
dapan1121 已提交
523
    if (NULL == pLevel->subTasks) {
D
dapan1121 已提交
524
      SCH_JOB_ELOG("taosArrayInit %d failed", taskNum);
D
dapan1121 已提交
525
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
526
    }
L
Liu Jicong 已提交
527

D
dapan1121 已提交
528
    for (int32_t n = 0; n < taskNum; ++n) {
L
Liu Jicong 已提交
529
      SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
D
dapan 已提交
530

D
dapan1121 已提交
531
      SCH_SET_JOB_TYPE(pJob, plan->subplanType);
D
dapan1121 已提交
532 533 534

      SSchTask  task = {0};
      SSchTask *pTask = &task;
535

D
dapan1121 已提交
536
      SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel));
L
Liu Jicong 已提交
537

D
dapan1121 已提交
538
      void *p = taosArrayPush(pLevel->subTasks, &task);
D
dapan1121 已提交
539
      if (NULL == p) {
D
dapan1121 已提交
540
        SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
D
dapan1121 已提交
541 542
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
543

D
dapan1121 已提交
544
      if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
D
dapan1121 已提交
545
        SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
D
dapan1121 已提交
546 547
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
548 549

      ++pJob->taskNum;
D
dapan1121 已提交
550
    }
D
dapan1121 已提交
551

D
dapan1121 已提交
552
    SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum);
D
dapan1121 已提交
553
  }
D
dapan1121 已提交
554 555

  SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
D
dapan1121 已提交
556 557

_return:
D
dapan1121 已提交
558 559 560 561
  if (planToTask) {
    taosHashCleanup(planToTask);
  }

D
dapan1121 已提交
562
  SCH_RET(code);
563 564
}

D
dapan1121 已提交
565 566
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
  if (NULL != pTask->candidateAddrs) {
D
dapan 已提交
567 568 569
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
570
  pTask->candidateIdx = 0;
571
  pTask->candidateAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
D
dapan1121 已提交
572
  if (NULL == pTask->candidateAddrs) {
573
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CANDIDATE_EP_NUM);
D
dapan1121 已提交
574 575 576
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

L
Liu Jicong 已提交
577
  if (pTask->plan->execNode.epSet.numOfEps > 0) {
D
dapan1121 已提交
578 579
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
D
dapan1121 已提交
580 581 582
      SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

L
Liu Jicong 已提交
583
    SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
D
dapan1121 已提交
584

D
dapan1121 已提交
585 586 587 588
    return TSDB_CODE_SUCCESS;
  }

  int32_t addNum = 0;
D
dapan 已提交
589
  int32_t nodeNum = 0;
590
  if (pJob->nodeList) {
D
dapan 已提交
591
    nodeNum = taosArrayGetSize(pJob->nodeList);
L
Liu Jicong 已提交
592

593
    for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
D
dapan 已提交
594
      SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i);
L
Liu Jicong 已提交
595

596
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
D
dapan 已提交
597 598 599
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
600 601

      ++addNum;
D
dapan1121 已提交
602
    }
D
dapan1121 已提交
603 604
  }

D
dapan1121 已提交
605
  if (addNum <= 0) {
H
Haojun Liao 已提交
606
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
D
dapan1121 已提交
607
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
608 609
  }

L
Liu Jicong 已提交
610 611 612 613 614 615 616 617
  /*
    for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
      strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
      epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];

      ++epSet->numOfEps;
    }
  */
D
dapan 已提交
618 619

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
620
}
D
dapan1121 已提交
621

D
dapan1121 已提交
622
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
623 624 625
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
626
      SCH_TASK_ELOG("task already in execTask list, code:%x", code);
D
dapan1121 已提交
627 628
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }
L
Liu Jicong 已提交
629

D
dapan1121 已提交
630
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
D
dapan 已提交
631 632 633
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
634
  SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
635

D
dapan 已提交
636 637 638
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
639 640
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
641
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan 已提交
642 643
  } else {
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
D
dapan 已提交
644 645
  }

D
dapan1121 已提交
646 647 648 649
  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
D
dapan1121 已提交
650
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
651 652
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
653

D
dapan1121 已提交
654
    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno);
D
dapan1121 已提交
655 656 657 658
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
D
dapan1121 已提交
659 660

  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));
L
Liu Jicong 已提交
661

D
dapan1121 已提交
662 663 664
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
665 666
int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  *moved = false;
L
Liu Jicong 已提交
667

D
dapan1121 已提交
668
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
669
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
670 671
  }

D
dapan1121 已提交
672 673 674 675
  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
L
Liu Jicong 已提交
676

D
dapan1121 已提交
677
      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
678 679
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
680

D
dapan1121 已提交
681
    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno);
D
dapan 已提交
682 683 684 685
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
D
dapan1121 已提交
686 687

  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));
L
Liu Jicong 已提交
688

D
dapan 已提交
689 690 691
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
692 693
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
694
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
695 696 697 698 699 700
  }

  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
L
Liu Jicong 已提交
701

D
dapan1121 已提交
702
      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
703 704
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
705

D
dapan1121 已提交
706 707 708 709 710 711 712
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
L
Liu Jicong 已提交
713

D
dapan1121 已提交
714 715 716
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
717
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
D
dapan1121 已提交
718 719
  int8_t status = 0;
  ++pTask->tryTimes;
L
Liu Jicong 已提交
720

D
dapan1121 已提交
721 722 723 724 725
  if (schJobNeedToStop(pJob, &status)) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry cause of job status, job status:%s", jobTaskStatusStr(status));
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
726

D
dapan1121 已提交
727 728 729 730 731
  if (pTask->tryTimes >= REQUEST_MAX_TRY_TIMES) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry since reach max try times, tryTimes:%d", pTask->tryTimes);
    return TSDB_CODE_SUCCESS;
  }
L
Liu Jicong 已提交
732

D
dapan1121 已提交
733 734 735 736 737
  if (!NEED_SCHEDULER_RETRY_ERROR(errCode)) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
738

L
Liu Jicong 已提交
739
  // TODO CHECK epList/condidateList
D
dapan1121 已提交
740 741 742
  if (SCH_IS_DATA_SRC_TASK(pTask)) {
    if (pTask->tryTimes >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
      *needRetry = false;
L
Liu Jicong 已提交
743 744
      SCH_TASK_DLOG("task no more retry since all ep tried, tryTimes:%d, epNum:%d", pTask->tryTimes,
                    SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
D
dapan1121 已提交
745 746
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
747 748
  } else {
    int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
L
Liu Jicong 已提交
749

D
dapan1121 已提交
750
    if ((pTask->candidateIdx + 1) >= candidateNum) {
D
dapan1121 已提交
751
      *needRetry = false;
L
Liu Jicong 已提交
752 753
      SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
                    pTask->candidateIdx, candidateNum);
D
dapan1121 已提交
754 755 756 757
      return TSDB_CODE_SUCCESS;
    }
  }

D
dapan1121 已提交
758 759
  *needRetry = true;
  SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->tryTimes, errCode, tstrerror(errCode));
L
Liu Jicong 已提交
760

D
dapan1121 已提交
761
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
762 763 764 765 766 767 768
}

int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
  atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);

  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));
D
dapan1121 已提交
769
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
D
dapan1121 已提交
770 771
  }

D
dapan1121 已提交
772
  if (SCH_IS_DATA_SRC_TASK(pTask)) {
D
dapan1121 已提交
773 774 775 776 777 778 779 780
    SCH_SWITCH_EPSET(&pTask->plan->execNode);
  } else {
    ++pTask->candidateIdx;
  }

  SCH_ERR_RET(schLaunchTask(pJob, pTask));

  return TSDB_CODE_SUCCESS;
D
dapan 已提交
781 782
}

D
dapan1121 已提交
783
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
D
dapan1121 已提交
784 785 786 787 788 789
  int32_t      code = 0;
  SSchHbTrans *hb = NULL;

  hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
    qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
D
dapan1121 已提交
790
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
791 792
  }

D
dapan1121 已提交
793
  SCH_LOCK(SCH_WRITE, &hb->lock);
D
dapan1121 已提交
794
  memcpy(&hb->trans, trans, sizeof(*trans));
D
dapan1121 已提交
795
  SCH_UNLOCK(SCH_WRITE, &hb->lock);
D
dapan1121 已提交
796

L
Liu Jicong 已提交
797 798
  qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, handle:%p", schMgmt.sId,
         epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->transInst, trans->transHandle);
L
Liu Jicong 已提交
799

D
dapan1121 已提交
800 801 802
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
  if (TSDB_CODE_SUCCESS == errCode) {
    return;
  }

  int32_t origCode = atomic_load_32(&pJob->errCode);
  if (TSDB_CODE_SUCCESS == origCode) {
    if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) {
      goto _return;
    }

    origCode = atomic_load_32(&pJob->errCode);
  }

  if (NEED_CLIENT_HANDLE_ERROR(origCode)) {
    return;
  }
L
Liu Jicong 已提交
820

D
dapan1121 已提交
821
  if (NEED_CLIENT_HANDLE_ERROR(errCode)) {
D
dapan1121 已提交
822
    atomic_store_32(&pJob->errCode, errCode);
D
dapan1121 已提交
823
    goto _return;
D
dapan1121 已提交
824 825
  }

D
dapan1121 已提交
826
  return;
L
Liu Jicong 已提交
827 828

_return:
D
dapan1121 已提交
829 830 831 832 833 834 835 836 837 838

  SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode));
}

int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) {
  // if already FAILED, no more processing
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, status));

  schUpdateJobErrCode(pJob, errCode);

D
dapan1121 已提交
839
  if (atomic_load_8(&pJob->userFetch) || pJob->attr.syncSchedule) {
D
dapan1121 已提交
840 841 842
    tsem_post(&pJob->rspSem);
  }

D
dapan1121 已提交
843
  int32_t code = atomic_load_32(&pJob->errCode);
D
dapan1121 已提交
844

D
dapan1121 已提交
845
  SCH_JOB_DLOG("job failed with error: %s", tstrerror(code));
L
Liu Jicong 已提交
846

D
dapan1121 已提交
847
  SCH_RET(code);
D
dapan1121 已提交
848 849
}

D
dapan1121 已提交
850
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
851 852 853 854 855 856 857 858 859
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
  SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAILED, errCode));
}

// Note: no more error processing, handled in function internal
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) {
  SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_DROPPING, errCode));
}

D
dapan1121 已提交
860
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
861 862
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
  int32_t code = 0;
L
Liu Jicong 已提交
863

D
dapan1121 已提交
864
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_PARTIAL_SUCCEED));
D
dapan 已提交
865

D
dapan1121 已提交
866
  if (pJob->attr.syncSchedule) {
D
dapan 已提交
867
    tsem_post(&pJob->rspSem);
D
dapan 已提交
868
  }
L
Liu Jicong 已提交
869

D
dapan1121 已提交
870 871 872
  if (atomic_load_8(&pJob->userFetch)) {
    SCH_ERR_JRET(schFetchFromRemote(pJob));
  }
D
dapan 已提交
873

D
dapan 已提交
874
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
875 876 877

_return:

D
dapan1121 已提交
878
  SCH_RET(schProcessOnJobFailure(pJob, code));
D
dapan 已提交
879 880
}

881
void schProcessOnDataFetched(SSchJob *job) {
D
dapan 已提交
882 883
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
  tsem_post(&job->rspSem);
D
dapan 已提交
884 885
}

D
dapan1121 已提交
886
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
887
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
D
dapan1121 已提交
888
  int8_t status = 0;
L
Liu Jicong 已提交
889

D
dapan1121 已提交
890
  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
891
    SCH_TASK_DLOG("task failed not processed cause of job status, job status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
892 893 894
    SCH_RET(atomic_load_32(&pJob->errCode));
  }

L
Liu Jicong 已提交
895 896
  bool    needRetry = false;
  bool    moved = false;
D
dapan1121 已提交
897
  int32_t taskDone = 0;
D
dapan1121 已提交
898
  int32_t code = 0;
D
dapan1121 已提交
899

H
Haojun Liao 已提交
900
  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
L
Liu Jicong 已提交
901

D
dapan1121 已提交
902
  SCH_ERR_JRET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
L
Liu Jicong 已提交
903

D
dapan1121 已提交
904
  if (!needRetry) {
H
Haojun Liao 已提交
905
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
D
dapan1121 已提交
906 907

    if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
D
dapan1121 已提交
908 909
      SCH_ERR_JRET(schMoveTaskToFailList(pJob, pTask, &moved));
    } else {
D
dapan1121 已提交
910
      SCH_TASK_ELOG("task not in executing list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
911
      SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
912 913 914
    }

    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED);
L
Liu Jicong 已提交
915

D
dapan1121 已提交
916
    if (SCH_IS_WAIT_ALL_JOB(pJob)) {
D
dapan1121 已提交
917 918 919 920 921
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskFailed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);

D
dapan1121 已提交
922
      schUpdateJobErrCode(pJob, errCode);
L
Liu Jicong 已提交
923

D
dapan1121 已提交
924
      if (taskDone < pTask->level->taskNum) {
L
Liu Jicong 已提交
925
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
D
dapan 已提交
926
        SCH_RET(errCode);
D
dapan1121 已提交
927 928 929
      }
    }
  } else {
D
dapan1121 已提交
930
    SCH_ERR_JRET(schHandleTaskRetry(pJob, pTask));
L
Liu Jicong 已提交
931

D
dapan 已提交
932 933
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
934

D
dapan1121 已提交
935 936
_return:

D
dapan1121 已提交
937
  SCH_RET(schProcessOnJobFailure(pJob, errCode));
D
dapan1121 已提交
938 939
}

D
dapan1121 已提交
940
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
941
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
942
  bool    moved = false;
D
dapan1121 已提交
943 944
  int32_t code = 0;

D
dapan1121 已提交
945
  SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan 已提交
946

D
dapan1121 已提交
947
  SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved));
D
dapan1121 已提交
948

D
dapan1121 已提交
949
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED);
D
dapan1121 已提交
950

D
dapan1121 已提交
951
  SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask));
D
dapan1121 已提交
952 953

  SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
L
Liu Jicong 已提交
954

D
dapan1121 已提交
955
  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
D
dapan 已提交
956
  if (parentNum == 0) {
L
Liu Jicong 已提交
957
    int32_t taskDone = 0;
D
dapan1121 已提交
958
    if (SCH_IS_WAIT_ALL_JOB(pJob)) {
D
dapan1121 已提交
959 960 961 962
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskSucceed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
L
Liu Jicong 已提交
963

D
dapan1121 已提交
964
      if (taskDone < pTask->level->taskNum) {
S
Shengliang Guan 已提交
965
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
966
        return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
967
      } else if (taskDone > pTask->level->taskNum) {
D
dapan1121 已提交
968
        SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
969 970
      }

D
dapan1121 已提交
971
      if (pTask->level->taskFailed > 0) {
D
dapan1121 已提交
972 973 974
        SCH_RET(schProcessOnJobFailure(pJob, 0));
      } else {
        SCH_RET(schProcessOnJobPartialSuccess(pJob));
D
dapan1121 已提交
975 976
      }
    } else {
D
dapan1121 已提交
977
      pJob->resNode = pTask->succeedAddr;
D
dapan1121 已提交
978
    }
D
dapan 已提交
979

D
dapan1121 已提交
980
    pJob->fetchTask = pTask;
D
dapan1121 已提交
981

D
dapan1121 已提交
982
    SCH_ERR_JRET(schMoveTaskToExecList(pJob, pTask, &moved));
L
Liu Jicong 已提交
983

D
dapan1121 已提交
984
    SCH_RET(schProcessOnJobPartialSuccess(pJob));
D
dapan 已提交
985 986
  }

L
Liu Jicong 已提交
987 988 989 990
  /*
    if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
      strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
      job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
D
dapan 已提交
991

L
Liu Jicong 已提交
992 993 994
      ++job->dataSrcEps.numOfEps;
    }
  */
D
dapan 已提交
995

D
dapan 已提交
996
  for (int32_t i = 0; i < parentNum; ++i) {
D
dapan1121 已提交
997
    SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i);
L
Liu Jicong 已提交
998
    int32_t   readyNum = atomic_add_fetch_32(&par->childReady, 1);
D
dapan 已提交
999

D
dapan1121 已提交
1000
    SCH_LOCK(SCH_WRITE, &par->lock);
L
Liu Jicong 已提交
1001 1002 1003 1004
    SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE,
                                    .taskId = pTask->taskId,
                                    .schedId = schMgmt.sId,
                                    .addr = pTask->succeedAddr};
X
Xiaoyu Wang 已提交
1005
    qSetSubplanExecutionNode(par->plan, pTask->plan->id.groupId, &source);
D
dapan1121 已提交
1006
    SCH_UNLOCK(SCH_WRITE, &par->lock);
L
Liu Jicong 已提交
1007

D
dapan 已提交
1008
    if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) {
D
dapan1121 已提交
1009
      SCH_ERR_RET(schLaunchTaskImpl(pJob, par));
D
dapan 已提交
1010 1011 1012 1013 1014
    }
  }

  return TSDB_CODE_SUCCESS;

D
dapan1121 已提交
1015
_return:
D
dapan 已提交
1016

D
dapan1121 已提交
1017 1018
  SCH_RET(schProcessOnJobFailure(pJob, code));
}
D
dapan 已提交
1019

D
dapan1121 已提交
1020 1021 1022
// Note: no more error processing, handled in function internal
int32_t schFetchFromRemote(SSchJob *pJob) {
  int32_t code = 0;
L
Liu Jicong 已提交
1023

D
dapan1121 已提交
1024 1025 1026 1027 1028
  if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) {
    SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch));
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1029 1030
  void *resData = atomic_load_ptr(&pJob->resData);
  if (resData) {
D
dapan1121 已提交
1031 1032
    atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1033
    SCH_JOB_DLOG("res already fetched, res:%p", resData);
D
dapan1121 已提交
1034 1035 1036 1037 1038 1039
    return TSDB_CODE_SUCCESS;
  }

  SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH));

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1040

D
dapan1121 已提交
1041 1042 1043 1044
_return:

  atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1045
  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
D
dapan1121 已提交
1046 1047
}

D
dapan1121 已提交
1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
  SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
  
  atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
  atomic_store_ptr(&pJob->resData, pRsp);

  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
  
  schProcessOnDataFetched(pJob);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1061
// Note: no more task error processing, handled in function internal
L
Liu Jicong 已提交
1062 1063
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize,
                             int32_t rspCode) {
D
dapan1121 已提交
1064
  int32_t code = 0;
L
Liu Jicong 已提交
1065 1066
  int8_t  status = 0;

D
dapan1121 已提交
1067
  if (schJobNeedToStop(pJob, &status)) {
L
Liu Jicong 已提交
1068 1069
    SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status),
                  rspCode);
D
dapan1121 已提交
1070 1071
    SCH_RET(atomic_load_32(&pJob->errCode));
  }
H
Haojun Liao 已提交
1072

D
dapan1121 已提交
1073 1074
  SCH_ERR_JRET(schValidateTaskReceivedMsgType(pJob, pTask, msgType));

D
dapan1121 已提交
1075
  switch (msgType) {
H
Haojun Liao 已提交
1076
    case TDMT_VND_CREATE_TABLE_RSP: {
X
Xiaoyu Wang 已提交
1077 1078
      SVCreateTbBatchRsp batchRsp = {0};
      if (msg) {
D
dapan1121 已提交
1079
        SCH_ERR_JRET(tDeserializeSVCreateTbBatchRsp(msg, msgSize, &batchRsp));
X
Xiaoyu Wang 已提交
1080 1081 1082 1083 1084 1085 1086
        if (batchRsp.rspList) {
          int32_t num = taosArrayGetSize(batchRsp.rspList);
          for (int32_t i = 0; i < num; ++i) {
            SVCreateTbRsp *rsp = taosArrayGet(batchRsp.rspList, i);
            if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) {
              taosArrayDestroy(batchRsp.rspList);
              SCH_ERR_JRET(rsp->code);
D
dapan 已提交
1087 1088
            }
          }
L
Liu Jicong 已提交
1089

X
Xiaoyu Wang 已提交
1090
          taosArrayDestroy(batchRsp.rspList);
D
dapan1121 已提交
1091
        }
L
Liu Jicong 已提交
1092 1093
      }

L
Liu Jicong 已提交
1094 1095 1096 1097
      SCH_ERR_JRET(rspCode);
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
      break;
    }
D
dapan1121 已提交
1098
    case TDMT_VND_SUBMIT_RSP: {
X
Xiaoyu Wang 已提交
1099 1100 1101 1102
      if (msg) {
        SSubmitRsp *rsp = (SSubmitRsp *)msg;
        SCH_ERR_JRET(rsp->code);
      }
1103

L
Liu Jicong 已提交
1104
      SCH_ERR_JRET(rspCode);
D
dapan1121 已提交
1105

L
Liu Jicong 已提交
1106 1107 1108 1109
      SSubmitRsp *rsp = (SSubmitRsp *)msg;
      if (rsp) {
        pJob->resNumOfRows += rsp->affectedRows;
      }
D
dapan1121 已提交
1110

L
Liu Jicong 已提交
1111
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
D
dapan1121 已提交
1112

L
Liu Jicong 已提交
1113 1114
      break;
    }
D
dapan1121 已提交
1115
    case TDMT_VND_QUERY_RSP: {
L
Liu Jicong 已提交
1116 1117
      SQueryTableRsp rsp = {0};
      if (msg) {
D
dapan1121 已提交
1118
        SCH_ERR_JRET(tDeserializeSQueryTableRsp(msg, msgSize, &rsp));
L
Liu Jicong 已提交
1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130
        SCH_ERR_JRET(rsp.code);
      }

      SCH_ERR_JRET(rspCode);

      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }

      // SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY));

      break;
L
Liu Jicong 已提交
1131
    }
D
dapan1121 已提交
1132
    case TDMT_VND_RES_READY_RSP: {
L
Liu Jicong 已提交
1133 1134 1135 1136 1137
      SResReadyRsp *rsp = (SResReadyRsp *)msg;

      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
1138
      }
L
Liu Jicong 已提交
1139 1140
      SCH_ERR_JRET(rsp->code);
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
D
dapan1121 已提交
1141

L
Liu Jicong 已提交
1142 1143
      break;
    }
D
dapan1121 已提交
1144 1145 1146 1147 1148
    case TDMT_VND_EXPLAIN_RSP: {
      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }
D
dapan1121 已提交
1149
      
D
dapan1121 已提交
1150 1151 1152 1153 1154 1155 1156 1157 1158 1159
      if (!SCH_IS_EXPLAIN_JOB(pJob)) {
        SCH_TASK_ELOG("invalid msg received for none explain query, msg type:%s", TMSG_INFO(msgType));
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }

      if (pJob->resData) {
        SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->resData);
        SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
      }

D
dapan1121 已提交
1160 1161 1162 1163 1164 1165
      SExplainRsp rsp = {0};
      if (tDeserializeSExplainRsp(msg, msgSize, &rsp)) {
        taosMemoryFree(rsp.subplanInfo);
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

D
dapan1121 已提交
1166
      SRetrieveTableRsp *pRsp = NULL;
D
dapan1121 已提交
1167
      SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, &rsp, pTask->plan->id.groupId, &pRsp));
D
dapan1121 已提交
1168 1169

      if (pRsp) {
D
dapan1121 已提交
1170
        SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
D
dapan1121 已提交
1171 1172 1173
      }
      break;
    }
L
Liu Jicong 已提交
1174 1175
    case TDMT_VND_FETCH_RSP: {
      SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
D
dapan1121 已提交
1176

L
Liu Jicong 已提交
1177 1178 1179 1180
      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }
D
dapan1121 已提交
1181

D
dapan1121 已提交
1182
      if (SCH_IS_EXPLAIN_JOB(pJob)) {
D
dapan1121 已提交
1183 1184 1185 1186 1187 1188 1189
        if (rsp->completed) {          
          SRetrieveTableRsp *pRsp = NULL;
          SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
          if (pRsp) {
            SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
          }
          
D
dapan1121 已提交
1190 1191 1192
          return TSDB_CODE_SUCCESS;
        }

D
dapan1121 已提交
1193 1194
        atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1195 1196 1197 1198 1199
        SCH_ERR_JRET(schFetchFromRemote(pJob));

        return TSDB_CODE_SUCCESS;
      }

X
Xiaoyu Wang 已提交
1200 1201
      if (pJob->resData) {
        SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->resData);
wafwerar's avatar
wafwerar 已提交
1202
        taosMemoryFreeClear(rsp);
L
Liu Jicong 已提交
1203 1204
        SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
      }
H
Haojun Liao 已提交
1205

X
Xiaoyu Wang 已提交
1206
      atomic_store_ptr(&pJob->resData, rsp);
L
Liu Jicong 已提交
1207
      atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
D
dapan1121 已提交
1208

L
Liu Jicong 已提交
1209 1210
      if (rsp->completed) {
        SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
1211
      }
L
Liu Jicong 已提交
1212 1213 1214 1215 1216 1217

      SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);

      schProcessOnDataFetched(pJob);
      break;
    }
D
dapan1121 已提交
1218
    case TDMT_VND_DROP_TASK_RSP: {
L
Liu Jicong 已提交
1219 1220 1221 1222 1223
      // SHOULD NEVER REACH HERE
      SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:%" PRIx64, pJob->refId);
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
      break;
    }
D
dapan1121 已提交
1224 1225 1226 1227
    case TDMT_SCH_LINK_BROKEN:
      SCH_TASK_ELOG("link broken received, error:%x - %s", rspCode, tstrerror(rspCode));
      SCH_ERR_JRET(rspCode);
      break;
D
dapan1121 已提交
1228
    default:
D
dapan1121 已提交
1229
      SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%s", msgType, SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
1230
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
1231 1232 1233 1234 1235
  }

  return TSDB_CODE_SUCCESS;

_return:
D
dapan1121 已提交
1236

D
dapan1121 已提交
1237
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
1238 1239
}

D
dapan1121 已提交
1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
   int32_t s = taosHashGetSize(pTaskList);
   if (s <= 0) {
     return TSDB_CODE_SUCCESS;
   }
   
   SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
   if (NULL == task || NULL == (*task)) {
     return TSDB_CODE_SUCCESS;
   }

   *pTask = *task;

   return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1256 1257
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) {
  if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 || taosArrayGetSize(pTask->execNodes) <= 0) {
D
dapan1121 已提交
1258 1259 1260 1261 1262 1263 1264 1265 1266 1267
    return TSDB_CODE_SUCCESS;
  }

  SSchNodeInfo *nodeInfo = taosArrayGet(pTask->execNodes, 0);
  nodeInfo->handle = handle;

  return TSDB_CODE_SUCCESS;
}


L
Liu Jicong 已提交
1268
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
L
Liu Jicong 已提交
1269
  int32_t                code = 0;
D
dapan1121 已提交
1270
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
L
Liu Jicong 已提交
1271
  SSchTask              *pTask = NULL;
L
Liu Jicong 已提交
1272

D
dapan1121 已提交
1273
  SSchJob *pJob = schAcquireJob(pParam->refId);
D
dapan1121 已提交
1274
  if (NULL == pJob) {
D
dapan1121 已提交
1275
    qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:%" PRIx64,
L
Liu Jicong 已提交
1276
          pParam->queryId, pParam->taskId, pParam->refId);
D
dapan1121 已提交
1277
    SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
1278 1279
  }

D
dapan1121 已提交
1280 1281 1282 1283 1284 1285 1286 1287
  schGetTaskFromTaskList(pJob->execTasks, pParam->taskId, &pTask);
  if (NULL == pTask) {
    if (TDMT_VND_EXPLAIN_RSP == msgType) {
      schGetTaskFromTaskList(pJob->succTasks, pParam->taskId, &pTask);
    } else {
      SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }
D
dapan1121 已提交
1288
  }
D
dapan1121 已提交
1289 1290 1291
  
  if (NULL == pTask) {
    SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
D
dapan1121 已提交
1292 1293
    SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
D
dapan1121 已提交
1294

D
dapan1121 已提交
1295
  SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
L
Liu Jicong 已提交
1296

L
Liu Jicong 已提交
1297
  SCH_SET_TASK_HANDLE(pTask, pMsg->handle);
D
dapan1121 已提交
1298
  schUpdateTaskExecNodeHandle(pTask, pMsg->handle, rspCode);
D
dapan1121 已提交
1299
  SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
D
dapan1121 已提交
1300

H
Haojun Liao 已提交
1301
_return:
D
dapan1121 已提交
1302
  if (pJob) {
D
dapan1121 已提交
1303
    schReleaseJob(pParam->refId);
D
dapan1121 已提交
1304 1305
  }

wafwerar's avatar
wafwerar 已提交
1306
  taosMemoryFreeClear(param);
D
dapan1121 已提交
1307 1308 1309
  SCH_RET(code);
}

L
Liu Jicong 已提交
1310
int32_t schHandleSubmitCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1311 1312
  return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
}
H
Haojun Liao 已提交
1313

L
Liu Jicong 已提交
1314
int32_t schHandleCreateTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
H
Haojun Liao 已提交
1315 1316 1317
  return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
}

L
Liu Jicong 已提交
1318
int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1319 1320
  return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
H
Haojun Liao 已提交
1321

L
Liu Jicong 已提交
1322
int32_t schHandleFetchCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1323 1324
  return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
}
H
Haojun Liao 已提交
1325

L
Liu Jicong 已提交
1326
int32_t schHandleReadyCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1327 1328
  return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
}
H
Haojun Liao 已提交
1329

D
dapan1121 已提交
1330 1331 1332 1333
int32_t schHandleExplainCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_EXPLAIN_RSP, code);
}

L
Liu Jicong 已提交
1334
int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1335
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
L
Liu Jicong 已提交
1336
  qDebug("QID:%" PRIx64 ",TID:%" PRIx64 " drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code);
1337
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1338 1339
}

L
Liu Jicong 已提交
1340
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1341 1342 1343 1344
  if (code) {
    qError("hb rsp error:%s", tstrerror(code));
    SCH_ERR_RET(code);
  }
L
Liu Jicong 已提交
1345

D
dapan1121 已提交
1346 1347 1348 1349 1350 1351
  SSchedulerHbRsp rsp = {0};
  if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) {
    qError("invalid hb rsp msg, size:%d", pMsg->len);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
1352 1353
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;

D
dapan1121 已提交
1354 1355 1356
  SSchTrans trans = {0};
  trans.transInst = pParam->transport;
  trans.transHandle = pMsg->handle;
L
Liu Jicong 已提交
1357

D
dapan1121 已提交
1358
  SCH_ERR_RET(schUpdateHbConnection(&rsp.epId, &trans));
D
dapan1121 已提交
1359 1360

  int32_t taskNum = (int32_t)taosArrayGetSize(rsp.taskStatus);
L
Liu Jicong 已提交
1361 1362
  qDebug("%d task status in hb rsp, nodeId:%d, fqdn:%s, port:%d", taskNum, rsp.epId.nodeId, rsp.epId.ep.fqdn,
         rsp.epId.ep.port);
D
dapan1121 已提交
1363

D
dapan1121 已提交
1364 1365
  for (int32_t i = 0; i < taskNum; ++i) {
    STaskStatus *taskStatus = taosArrayGet(rsp.taskStatus, i);
L
Liu Jicong 已提交
1366

D
dapan1121 已提交
1367 1368
    SSchJob *pJob = schAcquireJob(taskStatus->refId);
    if (NULL == pJob) {
L
Liu Jicong 已提交
1369 1370 1371
      qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId,
            taskStatus->queryId, taskStatus->taskId);
      // TODO DROP TASK FROM SERVER!!!!
D
dapan1121 已提交
1372 1373
      continue;
    }
L
Liu Jicong 已提交
1374

D
dapan1121 已提交
1375
    // TODO
L
Liu Jicong 已提交
1376 1377 1378

    SCH_JOB_DLOG("TID:0x%" PRIx64 " task status in server: %s", taskStatus->taskId,
                 jobTaskStatusStr(taskStatus->status));
D
dapan1121 已提交
1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389

    schReleaseJob(taskStatus->refId);
  }

_return:

  tFreeSSchedulerHbRsp(&rsp);

  SCH_RET(code);
}

D
dapan1121 已提交
1390 1391 1392 1393
int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param;
  rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);

D
dapan1121 已提交
1394 1395
  qDebug("handle %p is broken", pMsg->handle);

D
dapan1121 已提交
1396 1397
  if (head->isHbParam) {
    SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param;
L
Liu Jicong 已提交
1398
    SSchTrans            trans = {.transInst = hbParam->transport, .transHandle = NULL};
D
dapan1121 已提交
1399 1400 1401 1402 1403 1404 1405 1406 1407 1408
    SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans));

    SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId));
  } else {
    SCH_ERR_RET(schHandleCallback(param, pMsg, TDMT_SCH_LINK_BROKEN, code));
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1409
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
D
dapan1121 已提交
1410
  switch (msgType) {
H
Haojun Liao 已提交
1411 1412 1413
    case TDMT_VND_CREATE_TABLE:
      *fp = schHandleCreateTableCallback;
      break;
L
Liu Jicong 已提交
1414
    case TDMT_VND_SUBMIT:
D
dapan1121 已提交
1415 1416
      *fp = schHandleSubmitCallback;
      break;
L
Liu Jicong 已提交
1417
    case TDMT_VND_QUERY:
D
dapan1121 已提交
1418 1419
      *fp = schHandleQueryCallback;
      break;
L
Liu Jicong 已提交
1420
    case TDMT_VND_RES_READY:
D
dapan1121 已提交
1421 1422
      *fp = schHandleReadyCallback;
      break;
D
dapan1121 已提交
1423 1424 1425
    case TDMT_VND_EXPLAIN:
      *fp = schHandleExplainCallback;
      break;
L
Liu Jicong 已提交
1426
    case TDMT_VND_FETCH:
D
dapan1121 已提交
1427 1428 1429 1430 1431
      *fp = schHandleFetchCallback;
      break;
    case TDMT_VND_DROP_TASK:
      *fp = schHandleDropCallback;
      break;
D
dapan1121 已提交
1432 1433 1434
    case TDMT_VND_QUERY_HEARTBEAT:
      *fp = schHandleHbCallback;
      break;
D
dapan1121 已提交
1435 1436 1437
    case TDMT_SCH_LINK_BROKEN:
      *fp = schHandleLinkBrokenCallback;
      break;
D
dapan1121 已提交
1438
    default:
D
dapan1121 已提交
1439
      qError("unknown msg type for callback, msgType:%d", msgType);
D
dapan1121 已提交
1440 1441 1442 1443 1444 1445
      SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482
int32_t schGenerateTaskCallBackAHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, SMsgSendInfo **pMsgSendInfo) {
  int32_t code = 0;
  SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (NULL == msgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));

  param->queryId = pJob->queryId;
  param->refId = pJob->refId;
  param->taskId = SCH_TASK_ID(pTask);
  param->transport = pJob->transport;

  msgSendInfo->param = param;
  msgSendInfo->fp = fp;

  *pMsgSendInfo = msgSendInfo;

  return TSDB_CODE_SUCCESS;

_return:

  taosMemoryFree(param);
  taosMemoryFree(msgSendInfo);

  SCH_RET(code);
}

D
dapan1121 已提交
1483
void schFreeRpcCtxVal(const void *arg) {
D
dapan1121 已提交
1484 1485 1486
  if (NULL == arg) {
    return;
  }
L
Liu Jicong 已提交
1487 1488

  SMsgSendInfo *pMsgSendInfo = (SMsgSendInfo *)arg;
D
dapan1121 已提交
1489 1490
  taosMemoryFreeClear(pMsgSendInfo->param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1491
}
D
dapan1121 已提交
1492

D
dapan1121 已提交
1493
int32_t schMakeTaskCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
D
dapan1121 已提交
1494
  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  param->queryId = pJob->queryId;
  param->refId = pJob->refId;
  param->taskId = SCH_TASK_ID(pTask);
  param->transport = pJob->transport;

  *pParam = param;

  return TSDB_CODE_SUCCESS;
}

int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
D
dapan1121 已提交
1511
  SSchHbCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  param->head.isHbParam = true;

  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);

  param->nodeEpId.nodeId = addr->nodeId;
  memcpy(&param->nodeEpId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
  param->transport = pJob->transport;

  *pParam = param;

  return TSDB_CODE_SUCCESS;
}

int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb) {
L
Liu Jicong 已提交
1531 1532 1533
  int32_t       code = 0;
  SMsgSendInfo *pMsgSendInfo = NULL;

D
dapan1121 已提交
1534
  pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545
  if (NULL == pMsgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  if (isHb) {
    SCH_ERR_JRET(schMakeHbCallbackParam(pJob, pTask, &pMsgSendInfo->param));
  } else {
    SCH_ERR_JRET(schMakeTaskCallbackParam(pJob, pTask, &pMsgSendInfo->param));
  }

L
Liu Jicong 已提交
1546
  int32_t              msgType = TDMT_SCH_LINK_BROKEN;
D
dapan1121 已提交
1547 1548
  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
L
Liu Jicong 已提交
1549

D
dapan1121 已提交
1550 1551 1552 1553
  pMsgSendInfo->fp = fp;

  brokenVal->msgType = msgType;
  brokenVal->val = pMsgSendInfo;
D
dapan1121 已提交
1554
  brokenVal->clone = schCloneSMsgSendInfo;
D
dapan1121 已提交
1555
  brokenVal->freeFunc = schFreeRpcCtxVal;
L
Liu Jicong 已提交
1556

D
dapan1121 已提交
1557 1558 1559 1560
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
1561 1562
  taosMemoryFreeClear(pMsgSendInfo->param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1563 1564 1565 1566

  SCH_RET(code);
}

D
dapan1121 已提交
1567
int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
L
Liu Jicong 已提交
1568
  int32_t                code = 0;
D
dapan1121 已提交
1569 1570
  SMsgSendInfo *pReadyMsgSendInfo = NULL;
  SMsgSendInfo *pExplainMsgSendInfo = NULL;
D
dapan1121 已提交
1571 1572 1573 1574 1575 1576

  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pCtx->args) {
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1577

D
dapan1121 已提交
1578 1579
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_RES_READY, &pReadyMsgSendInfo));
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_EXPLAIN, &pExplainMsgSendInfo));
D
dapan1121 已提交
1580

D
dapan1121 已提交
1581 1582 1583 1584
  int32_t msgType = TDMT_VND_RES_READY_RSP;
  SRpcCtxVal ctxVal = {.val = pReadyMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
D
dapan1121 已提交
1585 1586 1587
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1588 1589
  msgType = TDMT_VND_EXPLAIN_RSP;
  ctxVal.val = pExplainMsgSendInfo;
D
dapan1121 已提交
1590 1591 1592 1593
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
1594

D
dapan1121 已提交
1595 1596
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, false));

D
dapan1121 已提交
1597 1598 1599 1600 1601
  return TSDB_CODE_SUCCESS;

_return:

  taosHashCleanup(pCtx->args);
D
dapan1121 已提交
1602 1603 1604 1605 1606 1607 1608 1609 1610 1611
  
  if (pReadyMsgSendInfo) {
    taosMemoryFreeClear(pReadyMsgSendInfo->param);
    taosMemoryFreeClear(pReadyMsgSendInfo);
  }

  if (pExplainMsgSendInfo) {
    taosMemoryFreeClear(pExplainMsgSendInfo->param);
    taosMemoryFreeClear(pExplainMsgSendInfo);
  }
D
dapan1121 已提交
1612 1613 1614 1615 1616

  SCH_RET(code);
}

int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
L
Liu Jicong 已提交
1617
  int32_t              code = 0;
D
dapan1121 已提交
1618
  SSchHbCallbackParam *param = NULL;
L
Liu Jicong 已提交
1619 1620 1621
  SMsgSendInfo        *pMsgSendInfo = NULL;
  SQueryNodeAddr      *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
  SQueryNodeEpId       epId = {0};
D
dapan1121 已提交
1622 1623 1624

  epId.nodeId = addr->nodeId;
  memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
D
dapan1121 已提交
1625 1626 1627 1628 1629 1630

  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pCtx->args) {
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1631

D
dapan1121 已提交
1632
  pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1633 1634 1635 1636 1637
  if (NULL == pMsgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1638
  param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1639
  if (NULL == param) {
D
dapan1121 已提交
1640
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1641 1642 1643
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

L
Liu Jicong 已提交
1644
  int32_t              msgType = TDMT_VND_QUERY_HEARTBEAT_RSP;
D
dapan1121 已提交
1645 1646 1647
  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_QUERY_HEARTBEAT, &fp));

D
dapan1121 已提交
1648
  param->nodeEpId = epId;
D
dapan1121 已提交
1649
  param->transport = pJob->transport;
L
Liu Jicong 已提交
1650

D
dapan1121 已提交
1651 1652 1653
  pMsgSendInfo->param = param;
  pMsgSendInfo->fp = fp;

D
dapan1121 已提交
1654
  SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
D
dapan1121 已提交
1655 1656 1657 1658 1659
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1660 1661
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, true));

D
dapan1121 已提交
1662 1663 1664 1665 1666
  return TSDB_CODE_SUCCESS;

_return:

  taosHashCleanup(pCtx->args);
D
dapan1121 已提交
1667 1668
  taosMemoryFreeClear(param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1669 1670 1671 1672

  SCH_RET(code);
}

D
dapan1121 已提交
1673
int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
L
Liu Jicong 已提交
1674
  int32_t     code = 0;
D
dapan1121 已提交
1675 1676 1677
  SSchHbTrans hb = {0};

  hb.trans.transInst = pJob->transport;
L
Liu Jicong 已提交
1678

D
dapan1121 已提交
1679 1680 1681 1682 1683
  SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx));

  code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans));
  if (code) {
    schFreeRpcCtx(&hb.rpcCtx);
L
Liu Jicong 已提交
1684

D
dapan1121 已提交
1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698
    if (HASH_NODE_EXIST(code)) {
      *exist = true;
      return TSDB_CODE_SUCCESS;
    }

    qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
    SCH_ERR_RET(code);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHeader **pDst) {
  if (pSrc->isHbParam) {
D
dapan1121 已提交
1699
    SSchHbCallbackParam *dst = taosMemoryMalloc(sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710
    if (NULL == dst) {
      qError("malloc SSchHbCallbackParam failed");
      SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    memcpy(dst, pSrc, sizeof(*dst));
    *pDst = (SSchCallbackParamHeader *)dst;

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1711
  SSchTaskCallbackParam *dst = taosMemoryMalloc(sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
1712 1713 1714 1715
  if (NULL == dst) {
    qError("malloc SSchTaskCallbackParam failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1716

D
dapan1121 已提交
1717 1718
  memcpy(dst, pSrc, sizeof(*dst));
  *pDst = (SSchCallbackParamHeader *)dst;
L
Liu Jicong 已提交
1719

D
dapan1121 已提交
1720 1721 1722
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1723 1724
int32_t schCloneSMsgSendInfo(void *src, void **dst) {
  SMsgSendInfo *pSrc = src;
L
Liu Jicong 已提交
1725
  int32_t       code = 0;
D
dapan1121 已提交
1726
  SMsgSendInfo *pDst = taosMemoryMalloc(sizeof(*pSrc));
D
dapan1121 已提交
1727
  if (NULL == pDst) {
D
dapan1121 已提交
1728 1729 1730 1731
    qError("malloc SMsgSendInfo for rpcCtx failed, len:%d", (int32_t)sizeof(*pSrc));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1732 1733
  memcpy(pDst, pSrc, sizeof(*pSrc));
  pDst->param = NULL;
D
dapan1121 已提交
1734

D
dapan1121 已提交
1735
  SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
D
dapan1121 已提交
1736

D
dapan1121 已提交
1737
  *dst = pDst;
D
dapan1121 已提交
1738

D
dapan1121 已提交
1739
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1740

D
dapan1121 已提交
1741 1742
_return:

D
dapan1121 已提交
1743
  taosMemoryFreeClear(pDst);
D
dapan1121 已提交
1744 1745 1746 1747 1748 1749 1750
  SCH_RET(code);
}

int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
  int32_t code = 0;
  memcpy(&pDst->brokenVal, &pSrc->brokenVal, sizeof(pSrc->brokenVal));
  pDst->brokenVal.val = NULL;
L
Liu Jicong 已提交
1751

D
dapan1121 已提交
1752
  SCH_ERR_RET(schCloneSMsgSendInfo(pSrc->brokenVal.val, &pDst->brokenVal.val));
D
dapan1121 已提交
1753 1754 1755 1756 1757 1758 1759 1760

  pDst->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pDst->args) {
    qError("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SRpcCtxVal dst = {0};
L
Liu Jicong 已提交
1761
  void      *pIter = taosHashIterate(pSrc->args, NULL);
D
dapan1121 已提交
1762 1763
  while (pIter) {
    SRpcCtxVal *pVal = (SRpcCtxVal *)pIter;
L
Liu Jicong 已提交
1764
    int32_t    *msgType = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
1765

D
dapan1121 已提交
1766 1767
    dst = *pVal;
    dst.val = NULL;
L
Liu Jicong 已提交
1768

D
dapan1121 已提交
1769
    SCH_ERR_JRET(schCloneSMsgSendInfo(pVal->val, &dst.val));
L
Liu Jicong 已提交
1770

D
dapan1121 已提交
1771
    if (taosHashPut(pDst->args, msgType, sizeof(*msgType), &dst, sizeof(dst))) {
D
dapan1121 已提交
1772
      qError("taosHashPut msg %d to rpcCtx failed", *msgType);
D
dapan1121 已提交
1773
      (*dst.freeFunc)(dst.val);
D
dapan1121 已提交
1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    pIter = taosHashIterate(pSrc->args, pIter);
  }

  return TSDB_CODE_SUCCESS;

_return:

  schFreeRpcCtx(pDst);
  SCH_RET(code);
}

L
Liu Jicong 已提交
1788 1789
int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet *epSet, int32_t msgType, void *msg,
                        uint32_t msgSize, bool persistHandle, SRpcCtx *ctx) {
D
dapan1121 已提交
1790
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
1791 1792 1793

  SSchTrans *trans = (SSchTrans *)transport;

D
dapan1121 已提交
1794 1795
  SMsgSendInfo *pMsgSendInfo = NULL;
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, msgType, &pMsgSendInfo));
D
dapan1121 已提交
1796 1797 1798

  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
L
Liu Jicong 已提交
1799
  pMsgSendInfo->msgInfo.handle = trans->transHandle;
D
dapan1121 已提交
1800
  pMsgSendInfo->msgType = msgType;
D
dapan1121 已提交
1801

L
Liu Jicong 已提交
1802 1803 1804 1805 1806
  qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "instance:%p, handle:%p", TMSG_INFO(msgType),
         ntohl(((SMsgHead *)msg)->vgId), epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, pJob->refId,
         trans->transInst, trans->transHandle);

  int64_t transporterId = 0;
D
dapan1121 已提交
1807
  code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
D
dapan1121 已提交
1808 1809 1810
  if (code) {
    SCH_ERR_JRET(code);
  }
D
dapan1121 已提交
1811

D
dapan1121 已提交
1812
  SCH_TASK_DLOG("req msg sent, refId:%" PRIx64 ", type:%d, %s", pJob->refId, msgType, TMSG_INFO(msgType));
D
dapan1121 已提交
1813 1814 1815
  return TSDB_CODE_SUCCESS;

_return:
L
Liu Jicong 已提交
1816

D
dapan1121 已提交
1817 1818 1819 1820 1821
  if (pMsgSendInfo) {
    taosMemoryFreeClear(pMsgSendInfo->param);
    taosMemoryFreeClear(pMsgSendInfo);
  }
  
D
dapan1121 已提交
1822 1823 1824
  SCH_RET(code);
}

D
dapan1121 已提交
1825 1826
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
  SSchedulerHbReq req = {0};
L
Liu Jicong 已提交
1827 1828 1829 1830
  int32_t         code = 0;
  SRpcCtx         rpcCtx = {0};
  SSchTrans       trans = {0};
  int32_t         msgType = TDMT_VND_QUERY_HEARTBEAT;
D
dapan1121 已提交
1831

L
Liu Jicong 已提交
1832
  req.header.vgId = nodeEpId->nodeId;
D
dapan1121 已提交
1833 1834 1835 1836 1837
  req.sId = schMgmt.sId;
  memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));

  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
L
Liu Jicong 已提交
1838 1839
    qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
           nodeEpId->ep.port);
D
dapan1121 已提交
1840 1841 1842 1843 1844 1845 1846
    SCH_ERR_RET(code);
  }

  SCH_LOCK(SCH_WRITE, &hb->lock);
  code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx);
  memcpy(&trans, &hb->trans, sizeof(trans));
  SCH_UNLOCK(SCH_WRITE, &hb->lock);
L
Liu Jicong 已提交
1847

D
dapan1121 已提交
1848
  SCH_ERR_RET(code);
L
Liu Jicong 已提交
1849

D
dapan1121 已提交
1850 1851 1852 1853 1854
  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
  if (msgSize < 0) {
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
1855
  void *msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1856 1857 1858 1859
  if (NULL == msg) {
    qError("calloc hb req %d failed", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1860

D
dapan1121 已提交
1861 1862 1863 1864 1865
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1866
  SMsgSendInfo *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1867 1868 1869 1870 1871
  if (NULL == pMsgSendInfo) {
    qError("calloc SMsgSendInfo failed");
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1872
  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888
  if (NULL == param) {
    qError("calloc SSchTaskCallbackParam failed");
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));

  param->transport = trans.transInst;

  pMsgSendInfo->param = param;
  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
  pMsgSendInfo->msgInfo.handle = trans.transHandle;
  pMsgSendInfo->msgType = msgType;
  pMsgSendInfo->fp = fp;
L
Liu Jicong 已提交
1889 1890 1891

  int64_t transporterId = 0;
  SEpSet  epSet = {.inUse = 0, .numOfEps = 1};
D
dapan1121 已提交
1892
  memcpy(&epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
D
dapan1121 已提交
1893

L
Liu Jicong 已提交
1894 1895 1896
  qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle,
         nodeEpId->ep.fqdn, nodeEpId->ep.port);

D
dapan1121 已提交
1897 1898
  code = asyncSendMsgToServerExt(trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx);
  if (code) {
L
Liu Jicong 已提交
1899 1900
    qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s", trans.transInst,
           trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code));
D
dapan1121 已提交
1901 1902 1903
    SCH_ERR_JRET(code);
  }

D
dapan1121 已提交
1904
  qDebug("hb msg sent");
D
dapan1121 已提交
1905 1906 1907 1908
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
1909 1910 1911
  taosMemoryFreeClear(msg);
  taosMemoryFreeClear(param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1912 1913 1914 1915
  schFreeRpcCtx(&rpcCtx);
  SCH_RET(code);
}

D
dapan1121 已提交
1916
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
D
dapan1121 已提交
1917
  uint32_t msgSize = 0;
L
Liu Jicong 已提交
1918 1919 1920 1921 1922 1923
  void    *msg = NULL;
  int32_t  code = 0;
  bool     isCandidateAddr = false;
  bool     persistHandle = false;
  SRpcCtx  rpcCtx = {0};

D
dapan1121 已提交
1924
  if (NULL == addr) {
D
dapan1121 已提交
1925
    addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
D
dapan1121 已提交
1926 1927 1928
    isCandidateAddr = true;
  }

L
Liu Jicong 已提交
1929
  SEpSet epSet = addr->epSet;
H
Haojun Liao 已提交
1930

D
dapan1121 已提交
1931
  switch (msgType) {
H
Haojun Liao 已提交
1932
    case TDMT_VND_CREATE_TABLE:
D
dapan1121 已提交
1933
    case TDMT_VND_SUBMIT: {
D
dapan1121 已提交
1934
      msgSize = pTask->msgLen;
wafwerar's avatar
wafwerar 已提交
1935
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1936 1937 1938 1939 1940 1941
      if (NULL == msg) {
        SCH_TASK_ELOG("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      memcpy(msg, pTask->msg, msgSize);
D
dapan1121 已提交
1942 1943
      break;
    }
1944

D
dapan1121 已提交
1945
    case TDMT_VND_QUERY: {
D
dapan1121 已提交
1946
      SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
L
Liu Jicong 已提交
1947

1948 1949
      uint32_t len = strlen(pJob->sql);
      msgSize = sizeof(SSubQueryMsg) + pTask->msgLen + len;
wafwerar's avatar
wafwerar 已提交
1950
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1951
      if (NULL == msg) {
D
dapan 已提交
1952
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
1953 1954 1955 1956
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      SSubQueryMsg *pMsg = msg;
D
dapan 已提交
1957
      pMsg->header.vgId = htonl(addr->nodeId);
L
Liu Jicong 已提交
1958 1959 1960 1961 1962
      pMsg->sId = htobe64(schMgmt.sId);
      pMsg->queryId = htobe64(pJob->queryId);
      pMsg->taskId = htobe64(pTask->taskId);
      pMsg->refId = htobe64(pJob->refId);
      pMsg->taskType = TASK_TYPE_TEMP;
D
dapan1121 已提交
1963
      pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob);
L
Liu Jicong 已提交
1964 1965
      pMsg->phyLen = htonl(pTask->msgLen);
      pMsg->sqlLen = htonl(len);
1966 1967 1968

      memcpy(pMsg->msg, pJob->sql, len);
      memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
D
dapan1121 已提交
1969 1970

      persistHandle = true;
D
dapan1121 已提交
1971
      break;
1972 1973
    }

D
dapan1121 已提交
1974
    case TDMT_VND_RES_READY: {
S
Shengliang Guan 已提交
1975
      msgSize = sizeof(SResReadyReq);
wafwerar's avatar
wafwerar 已提交
1976
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1977
      if (NULL == msg) {
D
dapan 已提交
1978
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
1979 1980 1981
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

S
Shengliang Guan 已提交
1982
      SResReadyReq *pMsg = msg;
L
Liu Jicong 已提交
1983 1984 1985 1986

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
1987
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
1988
      pMsg->taskId = htobe64(pTask->taskId);
D
dapan1121 已提交
1989 1990 1991
      break;
    }
    case TDMT_VND_FETCH: {
S
Shengliang Guan 已提交
1992
      msgSize = sizeof(SResFetchReq);
wafwerar's avatar
wafwerar 已提交
1993
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1994
      if (NULL == msg) {
D
dapan 已提交
1995
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
1996 1997
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
1998

S
Shengliang Guan 已提交
1999
      SResFetchReq *pMsg = msg;
L
Liu Jicong 已提交
2000 2001 2002 2003

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
2004
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
2005
      pMsg->taskId = htobe64(pTask->taskId);
D
dapan1121 已提交
2006

D
dapan1121 已提交
2007 2008
      break;
    }
L
Liu Jicong 已提交
2009
    case TDMT_VND_DROP_TASK: {
S
Shengliang Guan 已提交
2010
      msgSize = sizeof(STaskDropReq);
wafwerar's avatar
wafwerar 已提交
2011
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2012
      if (NULL == msg) {
D
dapan 已提交
2013
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2014 2015
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
2016

S
Shengliang Guan 已提交
2017
      STaskDropReq *pMsg = msg;
L
Liu Jicong 已提交
2018 2019 2020 2021

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
2022
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
2023 2024
      pMsg->taskId = htobe64(pTask->taskId);
      pMsg->refId = htobe64(pJob->refId);
D
dapan1121 已提交
2025 2026 2027
      break;
    }
    case TDMT_VND_QUERY_HEARTBEAT: {
D
dapan1121 已提交
2028
      SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &rpcCtx));
L
Liu Jicong 已提交
2029

D
dapan1121 已提交
2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040
      SSchedulerHbReq req = {0};
      req.sId = schMgmt.sId;
      req.header.vgId = addr->nodeId;
      req.epId.nodeId = addr->nodeId;
      memcpy(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));

      msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
      if (msgSize < 0) {
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
wafwerar's avatar
wafwerar 已提交
2041
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2042 2043 2044 2045 2046 2047 2048 2049
      if (NULL == msg) {
        SCH_JOB_ELOG("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
      if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
2050 2051

      persistHandle = true;
D
dapan1121 已提交
2052 2053 2054
      break;
    }
    default:
D
dapan1121 已提交
2055
      SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType);
D
dapan1121 已提交
2056 2057 2058 2059
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      break;
  }

D
dapan1121 已提交
2060
  SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType);
D
dapan1121 已提交
2061

D
dapan1121 已提交
2062
  SSchTrans trans = {.transInst = pJob->transport, .transHandle = SCH_GET_TASK_HANDLE(pTask)};
L
Liu Jicong 已提交
2063 2064
  SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle,
                               (rpcCtx.args ? &rpcCtx : NULL)));
D
dapan1121 已提交
2065

D
dapan1121 已提交
2066 2067
  if (msgType == TDMT_VND_QUERY) {
    SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.transHandle));
D
dapan1121 已提交
2068
  }
L
Liu Jicong 已提交
2069

D
dapan1121 已提交
2070 2071 2072 2073
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
2074
  SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan1121 已提交
2075
  schFreeRpcCtx(&rpcCtx);
L
Liu Jicong 已提交
2076

wafwerar's avatar
wafwerar 已提交
2077
  taosMemoryFreeClear(msg);
D
dapan1121 已提交
2078 2079
  SCH_RET(code);
}
D
dapan1121 已提交
2080

D
dapan1121 已提交
2081 2082
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
L
Liu Jicong 已提交
2083
  SQueryNodeEpId  epId = {0};
D
dapan1121 已提交
2084 2085 2086

  epId.nodeId = addr->nodeId;
  memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
L
Liu Jicong 已提交
2087

D
dapan1121 已提交
2088
#if 1
D
dapan1121 已提交
2089 2090
  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
D
dapan1121 已提交
2091
    bool exist = false;
D
dapan1121 已提交
2092
    SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist));
D
dapan1121 已提交
2093
    if (!exist) {
D
dapan1121 已提交
2094
      SCH_ERR_RET(schBuildAndSendHbMsg(&epId));
D
dapan1121 已提交
2095
    }
D
dapan1121 已提交
2096
  }
D
dapan1121 已提交
2097
#endif
D
dapan1121 已提交
2098 2099 2100

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
2101

D
dapan1121 已提交
2102
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
2103
  int8_t  status = 0;
D
dapan1121 已提交
2104
  int32_t code = 0;
D
dapan1121 已提交
2105 2106

  atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
L
Liu Jicong 已提交
2107

D
dapan1121 已提交
2108
  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
2109
    SCH_TASK_DLOG("no need to launch task cause of job status, job status:%s", jobTaskStatusStr(status));
L
Liu Jicong 已提交
2110

D
dapan1121 已提交
2111
    SCH_RET(atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
2112
  }
D
dapan1121 已提交
2113 2114 2115 2116 2117 2118

  // NOTE: race condition: the task should be put into the hash table before send msg to server
  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) {
    SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING);
  }
L
Liu Jicong 已提交
2119

D
dapan1121 已提交
2120
  SSubplan *plan = pTask->plan;
D
dapan1121 已提交
2121

L
Liu Jicong 已提交
2122
  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
D
dapan1121 已提交
2123
    code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen);
D
dapan1121 已提交
2124
    if (TSDB_CODE_SUCCESS != code) {
L
Liu Jicong 已提交
2125 2126
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
                    pTask->msgLen);
D
dapan1121 已提交
2127
      SCH_ERR_RET(code);
2128
    } else {
D
dapan1121 已提交
2129
      SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg);
D
dapan1121 已提交
2130
    }
D
dapan1121 已提交
2131
  }
L
Liu Jicong 已提交
2132

D
dapan1121 已提交
2133
  SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
D
dapan1121 已提交
2134

D
dapan1121 已提交
2135 2136 2137
  if (SCH_IS_QUERY_JOB(pJob)) {
    SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
  }
L
Liu Jicong 已提交
2138

D
dapan1121 已提交
2139
  SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
L
Liu Jicong 已提交
2140

D
dapan1121 已提交
2141 2142 2143 2144 2145
  return TSDB_CODE_SUCCESS;
}

// Note: no more error processing, handled in function internal
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
2146
  bool    enough = false;
D
dapan1121 已提交
2147 2148
  int32_t code = 0;

D
dapan1121 已提交
2149 2150
  SCH_SET_TASK_HANDLE(pTask, NULL);

D
dapan1121 已提交
2151 2152 2153 2154 2155 2156 2157 2158 2159 2160
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));

    if (enough) {
      SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
    }
  } else {
    SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
  }

D
dapan1121 已提交
2161
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2162 2163

_return:
D
dapan1121 已提交
2164

D
dapan1121 已提交
2165
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
2166
}
D
dapan1121 已提交
2167

D
dapan1121 已提交
2168
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
D
dapan 已提交
2169
  for (int32_t i = 0; i < level->taskNum; ++i) {
D
dapan1121 已提交
2170
    SSchTask *pTask = taosArrayGet(level->subTasks, i);
D
dapan1121 已提交
2171

D
dapan1121 已提交
2172
    SCH_ERR_RET(schLaunchTask(pJob, pTask));
D
dapan1121 已提交
2173
  }
D
dapan1121 已提交
2174 2175 2176 2177 2178 2179

  return TSDB_CODE_SUCCESS;
}

int32_t schLaunchJob(SSchJob *pJob) {
  SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
L
Liu Jicong 已提交
2180

D
dapan1121 已提交
2181 2182 2183 2184 2185 2186
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_EXECUTING));

  SCH_ERR_RET(schCheckJobNeedFlowCtrl(pJob, level));

  SCH_ERR_RET(schLaunchLevelTasks(pJob, level));

D
dapan1121 已提交
2187
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2188 2189
}

D
dapan1121 已提交
2190
void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
2191
  if (NULL == pTask->execNodes) {
D
dapan1121 已提交
2192
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
2193 2194
    return;
  }
H
Haojun Liao 已提交
2195

D
dapan1121 已提交
2196
  int32_t size = (int32_t)taosArrayGetSize(pTask->execNodes);
L
Liu Jicong 已提交
2197

D
dapan1121 已提交
2198
  if (size <= 0) {
D
dapan1121 已提交
2199
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
2200 2201
    return;
  }
H
Haojun Liao 已提交
2202

D
dapan1121 已提交
2203
  SSchNodeInfo *nodeInfo = NULL;
D
dapan1121 已提交
2204
  for (int32_t i = 0; i < size; ++i) {
D
dapan1121 已提交
2205 2206
    nodeInfo = (SSchNodeInfo *)taosArrayGet(pTask->execNodes, i);
    SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
D
dapan1121 已提交
2207

D
dapan1121 已提交
2208
    schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK);
D
dapan1121 已提交
2209
  }
D
dapan1121 已提交
2210 2211

  SCH_TASK_DLOG("task has %d exec address", size);
D
dapan1121 已提交
2212 2213 2214
}

void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
D
dapan1121 已提交
2215 2216 2217 2218
  if (!SCH_IS_NEED_DROP_JOB(pJob)) {
    return;
  }

D
dapan1121 已提交
2219
  void *pIter = taosHashIterate(list, NULL);
D
dapan1121 已提交
2220
  while (pIter) {
D
dapan1121 已提交
2221
    SSchTask *pTask = *(SSchTask **)pIter;
H
Haojun Liao 已提交
2222

D
dapan1121 已提交
2223
    schDropTaskOnExecutedNode(pJob, pTask);
L
Liu Jicong 已提交
2224

D
dapan1121 已提交
2225
    pIter = taosHashIterate(list, pIter);
L
Liu Jicong 已提交
2226
  }
D
dapan1121 已提交
2227
}
H
Haojun Liao 已提交
2228

D
dapan1121 已提交
2229 2230 2231 2232
void schDropJobAllTasks(SSchJob *pJob) {
  schDropTaskInHashList(pJob, pJob->execTasks);
  schDropTaskInHashList(pJob, pJob->succTasks);
  schDropTaskInHashList(pJob, pJob->failTasks);
D
dapan1121 已提交
2233
}
2234

D
dapan1121 已提交
2235
int32_t schCancelJob(SSchJob *pJob) {
L
Liu Jicong 已提交
2236
  // TODO
2237
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
2238
  // TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
D
dapan1121 已提交
2239 2240 2241 2242 2243 2244 2245 2246 2247
}

void schFreeJobImpl(void *job) {
  if (NULL == job) {
    return;
  }

  SSchJob *pJob = job;
  uint64_t queryId = pJob->queryId;
L
Liu Jicong 已提交
2248
  int64_t  refId = pJob->refId;
D
dapan1121 已提交
2249 2250 2251 2252 2253 2254 2255

  if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
    schCancelJob(pJob);
  }

  schDropJobAllTasks(pJob);

L
Liu Jicong 已提交
2256 2257
  pJob->subPlans = NULL;  // it is a reference to pDag->pSubplans

D
dapan1121 已提交
2258
  int32_t numOfLevels = taosArrayGetSize(pJob->levels);
L
Liu Jicong 已提交
2259
  for (int32_t i = 0; i < numOfLevels; ++i) {
D
dapan1121 已提交
2260 2261
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);

D
dapan1121 已提交
2262
    schFreeFlowCtrl(pLevel);
L
Liu Jicong 已提交
2263

D
dapan1121 已提交
2264
    int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
L
Liu Jicong 已提交
2265 2266
    for (int32_t j = 0; j < numOfTasks; ++j) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
D
dapan1121 已提交
2267 2268 2269 2270 2271
      schFreeTask(pTask);
    }

    taosArrayDestroy(pLevel->subTasks);
  }
L
Liu Jicong 已提交
2272

D
dapan1121 已提交
2273 2274 2275
  taosHashCleanup(pJob->execTasks);
  taosHashCleanup(pJob->failTasks);
  taosHashCleanup(pJob->succTasks);
L
Liu Jicong 已提交
2276

D
dapan1121 已提交
2277 2278
  taosArrayDestroy(pJob->levels);
  taosArrayDestroy(pJob->nodeList);
L
Liu Jicong 已提交
2279

D
dapan1121 已提交
2280 2281
  qExplainFreeCtx(pJob->explainCtx);

wafwerar's avatar
wafwerar 已提交
2282 2283
  taosMemoryFreeClear(pJob->resData);
  taosMemoryFreeClear(pJob);
D
dapan1121 已提交
2284

L
Liu Jicong 已提交
2285
  qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob);
D
dapan1121 已提交
2286 2287
}

L
Liu Jicong 已提交
2288
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
D
dapan1121 已提交
2289
                              int64_t startTs, bool syncSchedule) {
L
Liu Jicong 已提交
2290
  qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
H
Haojun Liao 已提交
2291

D
dapan1121 已提交
2292
  if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) {
L
Liu Jicong 已提交
2293
    qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pDag->queryId);
D
dapan1121 已提交
2294 2295
  }

L
Liu Jicong 已提交
2296
  int32_t  code = 0;
D
dapan1121 已提交
2297 2298
  SSchJob *pJob = NULL;
  SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, syncSchedule));
D
dapan1121 已提交
2299

D
dapan1121 已提交
2300
  SCH_ERR_JRET(schLaunchJob(pJob));
2301

D
dapan1121 已提交
2302
  *job = pJob->refId;
L
Liu Jicong 已提交
2303

D
dapan 已提交
2304
  if (syncSchedule) {
D
dapan1121 已提交
2305
    SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2306
    tsem_wait(&pJob->rspSem);
D
dapan1121 已提交
2307 2308
  }

D
dapan1121 已提交
2309
  SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2310

D
dapan1121 已提交
2311
  schReleaseJob(pJob->refId);
L
Liu Jicong 已提交
2312

D
dapan1121 已提交
2313
  return TSDB_CODE_SUCCESS;
2314

D
dapan1121 已提交
2315
_return:
D
dapan1121 已提交
2316

D
dapan1121 已提交
2317
  schFreeJobImpl(pJob);
D
dapan1121 已提交
2318
  SCH_RET(code);
2319
}
D
dapan1121 已提交
2320

D
dapan1121 已提交
2321
int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
D
dapan1121 已提交
2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333
                              bool syncSchedule) {
  qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);

  int32_t  code = 0;
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
    qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->sql = sql;
  pJob->attr.queryJob = true;
D
dapan1121 已提交
2334
  pJob->attr.explainMode = pDag->explainInfo.mode;
D
dapan1121 已提交
2335 2336
  pJob->queryId = pDag->queryId;
  pJob->subPlans = pDag->pSubplans;
D
dapan1121 已提交
2337

D
dapan1121 已提交
2338
  SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData));
D
dapan1121 已提交
2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368

  int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
  if (refId < 0) {
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
    SCH_ERR_JRET(terrno);
  }

  if (NULL == schAcquireJob(refId)) {
    SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
    SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  pJob->refId = refId;

  SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);

  pJob->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
  *job = pJob->refId;
  SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));

  schReleaseJob(pJob->refId);

  return TSDB_CODE_SUCCESS;

_return:

  schFreeJobImpl(pJob);
  SCH_RET(code);
}

D
dapan1121 已提交
2369
int32_t schedulerInit(SSchedulerCfg *cfg) {
D
dapan1121 已提交
2370
  if (schMgmt.jobRef) {
D
dapan1121 已提交
2371 2372 2373 2374 2375 2376
    qError("scheduler already initialized");
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  if (cfg) {
    schMgmt.cfg = *cfg;
L
Liu Jicong 已提交
2377

D
dapan1121 已提交
2378
    if (schMgmt.cfg.maxJobNum == 0) {
D
dapan1121 已提交
2379
      schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
D
dapan1121 已提交
2380
    }
D
dapan1121 已提交
2381 2382 2383
    if (schMgmt.cfg.maxNodeTableNum <= 0) {
      schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
    }
D
dapan1121 已提交
2384
  } else {
D
dapan1121 已提交
2385 2386
    schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
    schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
D
dapan1121 已提交
2387
  }
L
Liu Jicong 已提交
2388

D
dapan1121 已提交
2389 2390
  schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl);
  if (schMgmt.jobRef < 0) {
D
dapan1121 已提交
2391 2392 2393 2394 2395 2396 2397
    qError("init schduler jobRef failed, num:%u", schMgmt.cfg.maxJobNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  schMgmt.hbConnections = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
  if (NULL == schMgmt.hbConnections) {
    qError("taosHashInit hb connections failed");
D
dapan1121 已提交
2398 2399 2400
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
2401
  if (taosGetSystemUUID((char *)&schMgmt.sId, sizeof(schMgmt.sId))) {
D
dapan1121 已提交
2402 2403 2404 2405
    qError("generate schdulerId failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR);
  }

L
Liu Jicong 已提交
2406 2407
  qInfo("scheduler %" PRIx64 " initizlized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum);

D
dapan1121 已提交
2408 2409 2410
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2411
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
D
dapan1121 已提交
2412
                         int64_t startTs, SQueryResult *pRes) {
H
Haojun Liao 已提交
2413
  if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
D
dapan1121 已提交
2414 2415 2416
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
2417
  if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
D
dapan1121 已提交
2418
    SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true));
D
dapan1121 已提交
2419
  } else {
D
dapan1121 已提交
2420
    SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true));
D
dapan1121 已提交
2421
  }
D
dapan1121 已提交
2422

D
dapan1121 已提交
2423
  SSchJob *job = schAcquireJob(*pJob);
D
dapan1121 已提交
2424

D
dapan1121 已提交
2425 2426
  pRes->code = atomic_load_32(&job->errCode);
  pRes->numOfRows = job->resNumOfRows;
L
Liu Jicong 已提交
2427

D
dapan1121 已提交
2428
  schReleaseJob(*pJob);
L
Liu Jicong 已提交
2429

D
dapan1121 已提交
2430 2431 2432
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2433
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, const char *sql, int64_t *pJob) {
2434
  if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
D
dapan1121 已提交
2435 2436 2437
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
2438 2439 2440 2441 2442
  if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
    SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false));
  } else {
    SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false));
  }
L
Liu Jicong 已提交
2443

D
dapan1121 已提交
2444
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2445 2446
}

L
Liu Jicong 已提交
2447
#if 0
X
Xiaoyu Wang 已提交
2448
int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks) {
X
Xiaoyu Wang 已提交
2449
  if (NULL == pDag || pDag->numOfSubplans <= 0 || LIST_LENGTH(pDag->pSubplans) == 0) {
D
dapan1121 已提交
2450 2451 2452
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

X
Xiaoyu Wang 已提交
2453
  int32_t levelNum = LIST_LENGTH(pDag->pSubplans);
D
dapan1121 已提交
2454 2455 2456 2457 2458
  if (1 != levelNum) {
    qError("invalid level num: %d", levelNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

X
Xiaoyu Wang 已提交
2459 2460
  SNodeListNode *plans = (SNodeListNode*)nodesListGetNode(pDag->pSubplans, 0);
  int32_t taskNum = LIST_LENGTH(plans->pNodeList);
D
dapan1121 已提交
2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477
  if (taskNum <= 0) {
    qError("invalid task num: %d", taskNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SArray *info = taosArrayInit(taskNum, sizeof(STaskInfo));
  if (NULL == info) {
    qError("taosArrayInit %d taskInfo failed", taskNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  STaskInfo tInfo = {0};
  char *msg = NULL;
  int32_t msgLen = 0;
  int32_t code = 0;
  
  for (int32_t i = 0; i < taskNum; ++i) {
X
Xiaoyu Wang 已提交
2478
    SSubplan *plan = (SSubplan*)nodesListGetNode(plans->pNodeList, i);
D
dapan1121 已提交
2479 2480 2481
    tInfo.addr = plan->execNode;

    code = qSubPlanToString(plan, &msg, &msgLen);
D
dapan1121 已提交
2482
    if (TSDB_CODE_SUCCESS != code) {
D
dapan1121 已提交
2483 2484 2485 2486 2487 2488 2489 2490 2491 2492
      qError("subplanToString error, code:%x, msg:%p, len:%d", code, msg, msgLen);
      SCH_ERR_JRET(code);
    }

    int32_t msgSize = sizeof(SSubQueryMsg) + msgLen;
    if (NULL == msg) {
      qError("calloc %d failed", msgSize);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
    
wafwerar's avatar
wafwerar 已提交
2493
    SSubQueryMsg* pMsg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2494
    
L
Liu Jicong 已提交
2495
    pMsg->header.vgId = tInfo.addr.nodeId;
D
dapan1121 已提交
2496
    
2497 2498 2499
    pMsg->sId      = schMgmt.sId;
    pMsg->queryId  = plan->id.queryId;
    pMsg->taskId   = schGenUUID();
D
dapan1121 已提交
2500
    pMsg->taskType = TASK_TYPE_PERSISTENT;
2501 2502
    pMsg->phyLen   = msgLen;
    pMsg->sqlLen   = 0;
L
Liu Jicong 已提交
2503
    memcpy(pMsg->msg, msg, msgLen);
L
fix tq  
Liu Jicong 已提交
2504
    /*memcpy(pMsg->msg, ((SSubQueryMsg*)msg)->msg, msgLen);*/
D
dapan1121 已提交
2505 2506 2507 2508 2509

    tInfo.msg = pMsg;

    if (NULL == taosArrayPush(info, &tInfo)) {
      qError("taosArrayPush failed, idx:%d", i);
wafwerar's avatar
wafwerar 已提交
2510
      taosMemoryFree(msg);
D
dapan1121 已提交
2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

  *pTasks = info;
  info = NULL;
  
_return:
  schedulerFreeTaskList(info);
  SCH_RET(code);
}

D
dapan1121 已提交
2523 2524 2525 2526 2527 2528
int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) {
  if (NULL == src || NULL == dst || copyNum <= 0) {
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
H
Haojun Liao 已提交
2529

D
dapan1121 已提交
2530 2531 2532 2533 2534 2535
  *dst = taosArrayInit(copyNum, sizeof(STaskInfo));
  if (NULL == *dst) {
    qError("taosArrayInit %d taskInfo failed", copyNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

2536
  int32_t msgSize = src->msg->phyLen + sizeof(*src->msg);
D
dapan1121 已提交
2537 2538 2539
  STaskInfo info = {0};

  info.addr = src->addr;
H
Haojun Liao 已提交
2540

D
dapan1121 已提交
2541
  for (int32_t i = 0; i < copyNum; ++i) {
wafwerar's avatar
wafwerar 已提交
2542
    info.msg = taosMemoryMalloc(msgSize);
D
dapan1121 已提交
2543 2544 2545 2546 2547 2548 2549 2550
    if (NULL == info.msg) {
      qError("malloc %d failed", msgSize);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    memcpy(info.msg, src->msg, msgSize);

    info.msg->taskId = schGenUUID();
H
Haojun Liao 已提交
2551

D
dapan1121 已提交
2552 2553
    if (NULL == taosArrayPush(*dst, &info)) {
      qError("taosArrayPush failed, idx:%d", i);
wafwerar's avatar
wafwerar 已提交
2554
      taosMemoryFree(info.msg);
D
dapan1121 已提交
2555 2556 2557 2558 2559 2560 2561 2562 2563 2564
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

  return TSDB_CODE_SUCCESS;

_return:

  schedulerFreeTaskList(*dst);
  *dst = NULL;
H
Haojun Liao 已提交
2565

D
dapan1121 已提交
2566 2567
  SCH_RET(code);
}
L
Liu Jicong 已提交
2568
#endif
D
dapan1121 已提交
2569

L
Liu Jicong 已提交
2570
int32_t schedulerFetchRows(int64_t job, void **pData) {
D
dapan1121 已提交
2571
  if (NULL == pData) {
D
dapan1121 已提交
2572
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan 已提交
2573 2574
  }

L
Liu Jicong 已提交
2575
  int32_t  code = 0;
D
dapan1121 已提交
2576
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2577 2578 2579 2580
  if (NULL == pJob) {
    qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
D
dapan1121 已提交
2581

D
dapan1121 已提交
2582 2583
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (status == JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
2584
    SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2585
    schReleaseJob(job);
D
dapan1121 已提交
2586
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
2587 2588
  }

D
dapan1121 已提交
2589
  if (!SCH_JOB_NEED_FETCH(pJob)) {
D
dapan1121 已提交
2590
    SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2591
    schReleaseJob(job);
D
dapan1121 已提交
2592
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
2593 2594
  }

D
dapan1121 已提交
2595 2596
  if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) {
    SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch));
D
dapan1121 已提交
2597
    schReleaseJob(job);
D
dapan1121 已提交
2598
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan 已提交
2599 2600
  }

D
dapan1121 已提交
2601
  if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
D
dapan1121 已提交
2602
    SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2603 2604
    SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
  } else if (status == JOB_TASK_STATUS_SUCCEED) {
D
dapan1121 已提交
2605
    SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2606 2607
    goto _return;
  } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
D
dapan1121 已提交
2608
    if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
D
dapan1121 已提交
2609 2610 2611 2612 2613 2614
      SCH_ERR_JRET(schFetchFromRemote(pJob));
      tsem_wait(&pJob->rspSem);
    } 
  } else {
    SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
    SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan 已提交
2615 2616
  }

D
dapan1121 已提交
2617
  status = SCH_GET_JOB_STATUS(pJob);
D
dapan 已提交
2618

D
dapan1121 已提交
2619
  if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
D
dapan1121 已提交
2620
    SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2621
    SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
D
dapan 已提交
2622
  }
L
Liu Jicong 已提交
2623

D
dapan1121 已提交
2624
  if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) {
D
dapan1121 已提交
2625
    SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED));
D
dapan 已提交
2626 2627
  }

D
dapan1121 已提交
2628
  while (true) {
D
dapan1121 已提交
2629 2630
    *pData = atomic_load_ptr(&pJob->resData);
    if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) {
D
dapan1121 已提交
2631 2632 2633 2634 2635
      continue;
    }

    break;
  }
D
dapan 已提交
2636

D
dapan1121 已提交
2637
  if (NULL == *pData) {
wafwerar's avatar
wafwerar 已提交
2638
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
D
dapan1121 已提交
2639 2640 2641 2642 2643
    if (rsp) {
      rsp->completed = 1;
    }

    *pData = rsp;
D
dapan1121 已提交
2644
    SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
D
dapan1121 已提交
2645
  }
D
dapan1121 已提交
2646

2647
  SCH_JOB_DLOG("fetch done, totalRows:%d, code:%s", pJob->resNumOfRows, tstrerror(code));
D
dapan1121 已提交
2648 2649 2650 2651

_return:

  atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
L
Liu Jicong 已提交
2652

D
dapan1121 已提交
2653
  schReleaseJob(job);
D
dapan 已提交
2654

D
dapan1121 已提交
2655
  SCH_RET(code);
D
dapan 已提交
2656
}
D
dapan1121 已提交
2657

D
dapan1121 已提交
2658
int32_t scheduleCancelJob(int64_t job) {
D
dapan1121 已提交
2659
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2660 2661 2662 2663
  if (NULL == pJob) {
    qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
D
dapan1121 已提交
2664

D
dapan1121 已提交
2665 2666
  int32_t code = schCancelJob(pJob);

D
dapan1121 已提交
2667
  schReleaseJob(job);
D
dapan1121 已提交
2668 2669

  SCH_RET(code);
D
dapan1121 已提交
2670 2671
}

D
dapan1121 已提交
2672
void schedulerFreeJob(int64_t job) {
D
dapan1121 已提交
2673
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2674 2675
  if (NULL == pJob) {
    qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
D
dapan 已提交
2676 2677
    return;
  }
D
dapan1121 已提交
2678

D
dapan1121 已提交
2679 2680
  if (atomic_load_8(&pJob->userFetch) > 0) {
    schProcessOnJobDropped(pJob, TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
2681
  }
D
dapan1121 已提交
2682

D
dapan1121 已提交
2683
  SCH_JOB_DLOG("start to remove job from jobRef list, refId:%" PRIx64, job);
2684

D
dapan1121 已提交
2685 2686
  if (taosRemoveRef(schMgmt.jobRef, job)) {
    SCH_JOB_ELOG("remove job from job list failed, refId:%" PRIx64, job);
2687
  }
D
dapan1121 已提交
2688 2689

  schReleaseJob(job);
D
dapan1121 已提交
2690
}
D
dapan1121 已提交
2691 2692 2693 2694 2695 2696 2697 2698 2699

void schedulerFreeTaskList(SArray *taskList) {
  if (NULL == taskList) {
    return;
  }

  int32_t taskNum = taosArrayGetSize(taskList);
  for (int32_t i = 0; i < taskNum; ++i) {
    STaskInfo *info = taosArrayGet(taskList, i);
wafwerar's avatar
wafwerar 已提交
2700
    taosMemoryFreeClear(info->msg);
D
dapan1121 已提交
2701 2702 2703 2704
  }

  taosArrayDestroy(taskList);
}
L
Liu Jicong 已提交
2705

D
dapan1121 已提交
2706
void schedulerDestroy(void) {
D
dapan1121 已提交
2707 2708
  if (schMgmt.jobRef) {
    SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0);
L
Liu Jicong 已提交
2709

D
dapan1121 已提交
2710 2711
    while (pJob) {
      taosRemoveRef(schMgmt.jobRef, pJob->refId);
L
Liu Jicong 已提交
2712

D
dapan1121 已提交
2713 2714
      pJob = taosIterateRef(schMgmt.jobRef, pJob->refId);
    }
L
Liu Jicong 已提交
2715

D
dapan1121 已提交
2716 2717
    taosCloseRef(schMgmt.jobRef);
    schMgmt.jobRef = 0;
D
dapan1121 已提交
2718 2719 2720
  }
}