scheduler.c 82.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15
 */

L
Liu Jicong 已提交
16
#include "catalog.h"
H
Hongze Cheng 已提交
17
#include "command.h"
L
Liu Jicong 已提交
18
#include "query.h"
D
dapan1121 已提交
19
#include "schedulerInt.h"
H
Hongze Cheng 已提交
20
#include "tmsg.h"
D
dapan1121 已提交
21
#include "tref.h"
D
dapan1121 已提交
22
#include "trpc.h"
23

D
dapan1121 已提交
24 25
SSchedulerMgmt schMgmt = {0};

L
Liu Jicong 已提交
26
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); }
D
dapan1121 已提交
27

L
Liu Jicong 已提交
28
FORCE_INLINE int32_t schReleaseJob(int64_t refId) { return taosReleaseRef(schMgmt.jobRef, refId); }
D
dapan1121 已提交
29

L
Liu Jicong 已提交
30
uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }
D
dapan1121 已提交
31

L
Liu Jicong 已提交
32
#if 0
D
dapan1121 已提交
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
uint64_t schGenUUID(void) {
  static uint64_t hashId = 0;
  static int32_t requestSerialId = 0;

  if (hashId == 0) {
    char    uid[64];
    int32_t code = taosGetSystemUUID(uid, tListLen(uid));
    if (code != TSDB_CODE_SUCCESS) {
      qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    } else {
      hashId = MurmurHash3_32(uid, strlen(uid));
    }
  }

  int64_t ts      = taosGetTimestampMs();
  uint64_t pid    = taosGetPId();
  int32_t val     = atomic_add_fetch_32(&requestSerialId, 1);

  uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
  return id;
}
L
Liu Jicong 已提交
54
#endif
D
dapan1121 已提交
55

L
Liu Jicong 已提交
56 57 58
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
  pTask->plan = pPlan;
  pTask->level = pLevel;
D
dapan1121 已提交
59
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
D
dapan1121 已提交
60
  pTask->taskId = schGenTaskId();
D
dapan1121 已提交
61 62 63
  pTask->execNodes = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SSchNodeInfo));
  if (NULL == pTask->execNodes) {
    SCH_TASK_ELOG("taosArrayInit %d execNodes failed", SCH_MAX_CANDIDATE_EP_NUM);
D
dapan1121 已提交
64 65 66 67 68 69
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
70
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql,
H
Hongze Cheng 已提交
71 72
                   int64_t startTs, bool syncSchedule) {
  int32_t  code = 0;
D
dapan1121 已提交
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
    qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->attr.explainMode = pDag->explainInfo.mode;
  pJob->attr.syncSchedule = syncSchedule;
  pJob->transport = transport;
  pJob->sql = sql;

  if (pNodeList != NULL) {
    pJob->nodeList = taosArrayDup(pNodeList);
  }

  SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_JRET(qExecExplainBegin(pDag, &pJob->explainCtx, startTs));
  }

  pJob->execTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->execTasks) {
    SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->succTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->succTasks) {
    SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->failTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->failTasks) {
    SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  tsem_init(&pJob->rspSem, 0, 0);

  int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
  if (refId < 0) {
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
    SCH_ERR_JRET(terrno);
  }

  if (NULL == schAcquireJob(refId)) {
    SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
    SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  pJob->refId = refId;

  SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);

  pJob->status = JOB_TASK_STATUS_NOT_START;

  *pSchJob = pJob;

  return TSDB_CODE_SUCCESS;

_return:

  schFreeJobImpl(pJob);
  SCH_RET(code);
}

D
dapan1121 已提交
144 145 146 147 148 149 150 151
void schFreeRpcCtx(SRpcCtx *pCtx) {
  if (NULL == pCtx) {
    return;
  }
  void *pIter = taosHashIterate(pCtx->args, NULL);
  while (pIter) {
    SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter;

D
dapan1121 已提交
152
    (*ctxVal->freeFunc)(ctxVal->val);
L
Liu Jicong 已提交
153

D
dapan1121 已提交
154 155
    pIter = taosHashIterate(pCtx->args, pIter);
  }
L
Liu Jicong 已提交
156

D
dapan1121 已提交
157
  taosHashCleanup(pCtx->args);
D
dapan1121 已提交
158

D
dapan1121 已提交
159 160
  if (pCtx->brokenVal.freeFunc) {
    (*pCtx->brokenVal.freeFunc)(pCtx->brokenVal.val);
D
dapan1121 已提交
161
  }
D
dapan1121 已提交
162 163
}

L
Liu Jicong 已提交
164
void schFreeTask(SSchTask *pTask) {
D
dapan1121 已提交
165 166 167 168
  if (pTask->candidateAddrs) {
    taosArrayDestroy(pTask->candidateAddrs);
  }

wafwerar's avatar
wafwerar 已提交
169
  taosMemoryFreeClear(pTask->msg);
D
dapan1121 已提交
170 171 172 173 174 175 176 177

  if (pTask->children) {
    taosArrayDestroy(pTask->children);
  }

  if (pTask->parents) {
    taosArrayDestroy(pTask->parents);
  }
H
Haojun Liao 已提交
178

D
dapan1121 已提交
179 180
  if (pTask->execNodes) {
    taosArrayDestroy(pTask->execNodes);
H
Haojun Liao 已提交
181
  }
D
dapan1121 已提交
182 183
}

D
dapan1121 已提交
184 185 186 187 188 189
static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (pStatus) {
    *pStatus = status;
  }

L
Liu Jicong 已提交
190 191 192
  return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED ||
          status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING ||
          status == JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
193 194
}

D
dapan1121 已提交
195
int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
D
dapan1121 已提交
196
  int32_t lastMsgType = SCH_GET_TASK_LASTMSG_TYPE(pTask);
D
dapan1121 已提交
197
  int32_t taskStatus = SCH_GET_TASK_STATUS(pTask);
D
dapan1121 已提交
198
  int32_t reqMsgType = msgType - 1;
D
dapan1121 已提交
199
  switch (msgType) {
D
dapan1121 已提交
200
    case TDMT_SCH_LINK_BROKEN:
D
dapan1121 已提交
201
    case TDMT_VND_EXPLAIN_RSP:
D
dapan1121 已提交
202
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
203
    case TDMT_VND_QUERY_RSP:  // query_rsp may be processed later than ready_rsp
L
Liu Jicong 已提交
204 205 206
      if (lastMsgType != reqMsgType && -1 != lastMsgType && TDMT_VND_FETCH != lastMsgType) {
        SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
207
      }
L
Liu Jicong 已提交
208

D
dapan1121 已提交
209
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
210 211
        SCH_TASK_DLOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
212
      }
L
Liu Jicong 已提交
213

D
dapan1121 已提交
214
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan1121 已提交
215
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
216 217
    case TDMT_VND_RES_READY_RSP:
      reqMsgType = TDMT_VND_QUERY;
D
dapan1121 已提交
218
      if (lastMsgType != reqMsgType && -1 != lastMsgType) {
L
Liu Jicong 已提交
219 220
        SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s",
                      (lastMsgType > 0 ? TMSG_INFO(lastMsgType) : "null"), TMSG_INFO(msgType));
D
dapan1121 已提交
221 222
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
223

D
dapan1121 已提交
224
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
225 226
        SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
227 228 229
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }

D
dapan1121 已提交
230 231 232
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
      return TSDB_CODE_SUCCESS;
    case TDMT_VND_FETCH_RSP:
L
Liu Jicong 已提交
233 234 235
      if (lastMsgType != reqMsgType && -1 != lastMsgType) {
        SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
236 237
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
238

D
dapan1121 已提交
239
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
240 241
        SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
242 243
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
244

D
dapan1121 已提交
245 246
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
247 248
    case TDMT_VND_CREATE_TABLE_RSP:
    case TDMT_VND_SUBMIT_RSP:
D
dapan1121 已提交
249 250
      break;
    default:
D
dapan1121 已提交
251
      SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus));
D
dapan1121 已提交
252 253 254
      SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
255
  if (lastMsgType != reqMsgType) {
L
Liu Jicong 已提交
256 257
    SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                  TMSG_INFO(msgType));
D
dapan1121 已提交
258 259
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
L
Liu Jicong 已提交
260

D
dapan1121 已提交
261
  if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
262 263
    SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                  TMSG_INFO(msgType));
D
dapan1121 已提交
264 265 266
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

D
dapan1121 已提交
267
  SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan 已提交
268

D
dapan1121 已提交
269 270 271 272
  return TSDB_CODE_SUCCESS;
}

int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
D
dapan 已提交
273 274
  int32_t code = 0;

D
dapan1121 已提交
275
  int8_t oriStatus = 0;
D
dapan1121 已提交
276

D
dapan1121 已提交
277 278 279 280 281 282
  while (true) {
    oriStatus = SCH_GET_JOB_STATUS(pJob);

    if (oriStatus == newStatus) {
      SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
L
Liu Jicong 已提交
283

D
dapan1121 已提交
284 285 286 287 288
    switch (oriStatus) {
      case JOB_TASK_STATUS_NULL:
        if (newStatus != JOB_TASK_STATUS_NOT_START) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
289

D
dapan1121 已提交
290 291 292 293 294
        break;
      case JOB_TASK_STATUS_NOT_START:
        if (newStatus != JOB_TASK_STATUS_EXECUTING) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
295

D
dapan1121 已提交
296 297
        break;
      case JOB_TASK_STATUS_EXECUTING:
L
Liu Jicong 已提交
298 299 300
        if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED && newStatus != JOB_TASK_STATUS_FAILED &&
            newStatus != JOB_TASK_STATUS_CANCELLING && newStatus != JOB_TASK_STATUS_CANCELLED &&
            newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
301 302
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
303

D
dapan1121 已提交
304 305
        break;
      case JOB_TASK_STATUS_PARTIAL_SUCCEED:
L
Liu Jicong 已提交
306 307
        if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_SUCCEED &&
            newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
308 309
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
310

D
dapan1121 已提交
311 312 313 314 315 316 317
        break;
      case JOB_TASK_STATUS_SUCCEED:
      case JOB_TASK_STATUS_FAILED:
      case JOB_TASK_STATUS_CANCELLING:
        if (newStatus != JOB_TASK_STATUS_DROPPING) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
318

D
dapan1121 已提交
319 320 321
        break;
      case JOB_TASK_STATUS_CANCELLED:
      case JOB_TASK_STATUS_DROPPING:
D
dapan1121 已提交
322
        SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
323
        break;
L
Liu Jicong 已提交
324

D
dapan1121 已提交
325
      default:
D
dapan1121 已提交
326
        SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));
D
dapan 已提交
327
        SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
328 329 330 331 332
    }

    if (oriStatus != atomic_val_compare_exchange_8(&pJob->status, oriStatus, newStatus)) {
      continue;
    }
D
dapan 已提交
333

D
dapan1121 已提交
334
    SCH_JOB_DLOG("job status updated from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan1121 已提交
335

D
dapan1121 已提交
336 337
    break;
  }
D
dapan1121 已提交
338

D
dapan 已提交
339 340 341 342
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
343
  SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan 已提交
344
  SCH_ERR_RET(code);
345
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
346 347
}

D
dapan1121 已提交
348
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
D
dapan1121 已提交
349 350
  for (int32_t i = 0; i < pJob->levelNum; ++i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
L
Liu Jicong 已提交
351

D
dapan1121 已提交
352 353 354
    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
      SSubplan *pPlan = pTask->plan;
L
Liu Jicong 已提交
355 356
      int32_t   childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0;
      int32_t   parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0;
D
dapan1121 已提交
357 358

      if (childNum > 0) {
D
dapan1121 已提交
359 360 361 362
        if (pJob->levelIdx == pLevel->level) {
          SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
L
Liu Jicong 已提交
363

D
dapan1121 已提交
364 365 366
        pTask->children = taosArrayInit(childNum, POINTER_BYTES);
        if (NULL == pTask->children) {
          SCH_TASK_ELOG("taosArrayInit %d children failed", childNum);
D
dapan1121 已提交
367 368 369 370 371
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      for (int32_t n = 0; n < childNum; ++n) {
L
Liu Jicong 已提交
372
        SSubplan  *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
X
Xiaoyu Wang 已提交
373
        SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
D
dapan 已提交
374
        if (NULL == childTask || NULL == *childTask) {
D
dapan1121 已提交
375
          SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
376 377 378
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan1121 已提交
379 380
        if (NULL == taosArrayPush(pTask->children, childTask)) {
          SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
381 382 383 384 385
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      if (parentNum > 0) {
D
dapan1121 已提交
386 387 388 389
        if (0 == pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
L
Liu Jicong 已提交
390

D
dapan1121 已提交
391 392 393
        pTask->parents = taosArrayInit(parentNum, POINTER_BYTES);
        if (NULL == pTask->parents) {
          SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
D
dapan1121 已提交
394 395
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
D
dapan1121 已提交
396 397 398 399 400
      } else {
        if (0 != pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:%d, parentNum:%d", pLevel->level, parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
D
dapan1121 已提交
401 402 403
      }

      for (int32_t n = 0; n < parentNum; ++n) {
L
Liu Jicong 已提交
404
        SSubplan  *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
X
Xiaoyu Wang 已提交
405
        SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
D
dapan 已提交
406
        if (NULL == parentTask || NULL == *parentTask) {
D
dapan1121 已提交
407
          SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
408 409 410
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan1121 已提交
411 412
        if (NULL == taosArrayPush(pTask->parents, parentTask)) {
          SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
413 414
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
L
Liu Jicong 已提交
415
      }
D
dapan1121 已提交
416 417

      SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
D
dapan1121 已提交
418 419 420
    }
  }

D
dapan1121 已提交
421
  SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
D
dapan1121 已提交
422
  if (SCH_IS_QUERY_JOB(pJob) && pLevel->taskNum > 1) {
D
dapan1121 已提交
423
    SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
D
dapan 已提交
424 425 426
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

D
dapan1121 已提交
427 428 429
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
430
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
431
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
D
dapan1121 已提交
432
  if (NULL == addr) {
L
Liu Jicong 已提交
433 434
    SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
                  (int32_t)taosArrayGetSize(pTask->candidateAddrs));
D
dapan1121 已提交
435 436
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
D
dapan1121 已提交
437 438

  pTask->succeedAddr = *addr;
439

D
dapan1121 已提交
440
  return TSDB_CODE_SUCCESS;
441 442
}

D
dapan1121 已提交
443 444
int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle) {
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = handle};
L
Liu Jicong 已提交
445

D
dapan1121 已提交
446 447
  if (NULL == taosArrayPush(pTask->execNodes, &nodeInfo)) {
    SCH_TASK_ELOG("taosArrayPush nodeInfo to execNodes list failed, errno:%d", errno);
D
dapan1121 已提交
448 449 450
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
451 452
  SCH_TASK_DLOG("task execNode recorded, handle:%p", handle);

D
dapan1121 已提交
453
  return TSDB_CODE_SUCCESS;
454
}
D
dapan1121 已提交
455

X
Xiaoyu Wang 已提交
456
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
D
dapan1121 已提交
457
  int32_t code = 0;
D
dapan1121 已提交
458
  pJob->queryId = pDag->queryId;
L
Liu Jicong 已提交
459

D
dapan1121 已提交
460 461
  if (pDag->numOfSubplans <= 0) {
    SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
D
dapan 已提交
462 463
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
L
Liu Jicong 已提交
464

X
Xiaoyu Wang 已提交
465
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
466
  if (levelNum <= 0) {
D
dapan1121 已提交
467
    SCH_JOB_ELOG("invalid level num:%d", levelNum);
D
dapan1121 已提交
468
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
469 470
  }

L
Liu Jicong 已提交
471 472 473 474
  SHashObj *planToTask = taosHashInit(
      SCHEDULE_DEFAULT_MAX_TASK_NUM,
      taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
      HASH_NO_LOCK);
D
dapan1121 已提交
475
  if (NULL == planToTask) {
D
dapan1121 已提交
476
    SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
D
dapan1121 已提交
477 478
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
479

D
dapan1121 已提交
480 481
  pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
  if (NULL == pJob->levels) {
D
dapan1121 已提交
482
    SCH_JOB_ELOG("taosArrayInit %d failed", levelNum);
D
dapan1121 已提交
483
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
484 485
  }

D
dapan1121 已提交
486 487
  pJob->levelNum = levelNum;
  pJob->levelIdx = levelNum - 1;
488

D
dapan1121 已提交
489
  pJob->subPlans = pDag->pSubplans;
490

L
Liu Jicong 已提交
491
  SSchLevel      level = {0};
X
Xiaoyu Wang 已提交
492
  SNodeListNode *plans = NULL;
L
Liu Jicong 已提交
493 494
  int32_t        taskNum = 0;
  SSchLevel     *pLevel = NULL;
495

D
dapan1121 已提交
496
  level.status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
497

498
  for (int32_t i = 0; i < levelNum; ++i) {
D
dapan1121 已提交
499
    if (NULL == taosArrayPush(pJob->levels, &level)) {
D
dapan1121 已提交
500
      SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i);
D
dapan1121 已提交
501 502 503
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

D
dapan1121 已提交
504
    pLevel = taosArrayGet(pJob->levels, i);
D
dapan1121 已提交
505
    pLevel->level = i;
L
Liu Jicong 已提交
506 507

    plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
D
dapan1121 已提交
508 509
    if (NULL == plans) {
      SCH_JOB_ELOG("empty level plan, level:%d", i);
D
dapan1121 已提交
510
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
511 512
    }

X
Xiaoyu Wang 已提交
513
    taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
D
dapan1121 已提交
514 515
    if (taskNum <= 0) {
      SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
D
dapan1121 已提交
516
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
517 518
    }

D
dapan1121 已提交
519
    pLevel->taskNum = taskNum;
L
Liu Jicong 已提交
520

D
dapan1121 已提交
521
    pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
D
dapan1121 已提交
522
    if (NULL == pLevel->subTasks) {
D
dapan1121 已提交
523
      SCH_JOB_ELOG("taosArrayInit %d failed", taskNum);
D
dapan1121 已提交
524
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
525
    }
L
Liu Jicong 已提交
526

D
dapan1121 已提交
527
    for (int32_t n = 0; n < taskNum; ++n) {
L
Liu Jicong 已提交
528
      SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
D
dapan 已提交
529

D
dapan1121 已提交
530
      SCH_SET_JOB_TYPE(pJob, plan->subplanType);
D
dapan1121 已提交
531 532 533

      SSchTask  task = {0};
      SSchTask *pTask = &task;
534

D
dapan1121 已提交
535
      SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel));
L
Liu Jicong 已提交
536

D
dapan1121 已提交
537
      void *p = taosArrayPush(pLevel->subTasks, &task);
D
dapan1121 已提交
538
      if (NULL == p) {
D
dapan1121 已提交
539
        SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
D
dapan1121 已提交
540 541
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
542

D
dapan1121 已提交
543
      if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
D
dapan1121 已提交
544
        SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
D
dapan1121 已提交
545 546
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
547 548

      ++pJob->taskNum;
D
dapan1121 已提交
549
    }
D
dapan1121 已提交
550

D
dapan1121 已提交
551
    SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum);
D
dapan1121 已提交
552
  }
D
dapan1121 已提交
553 554

  SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
D
dapan1121 已提交
555 556

_return:
D
dapan1121 已提交
557 558 559 560
  if (planToTask) {
    taosHashCleanup(planToTask);
  }

D
dapan1121 已提交
561
  SCH_RET(code);
562 563
}

D
dapan1121 已提交
564 565
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
  if (NULL != pTask->candidateAddrs) {
D
dapan 已提交
566 567 568
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
569
  pTask->candidateIdx = 0;
570
  pTask->candidateAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
D
dapan1121 已提交
571
  if (NULL == pTask->candidateAddrs) {
572
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CANDIDATE_EP_NUM);
D
dapan1121 已提交
573 574 575
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

L
Liu Jicong 已提交
576
  if (pTask->plan->execNode.epSet.numOfEps > 0) {
D
dapan1121 已提交
577 578
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
D
dapan1121 已提交
579 580 581
      SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

L
Liu Jicong 已提交
582
    SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
D
dapan1121 已提交
583

D
dapan1121 已提交
584 585 586 587
    return TSDB_CODE_SUCCESS;
  }

  int32_t addNum = 0;
D
dapan 已提交
588
  int32_t nodeNum = 0;
589
  if (pJob->nodeList) {
D
dapan 已提交
590
    nodeNum = taosArrayGetSize(pJob->nodeList);
L
Liu Jicong 已提交
591

592
    for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
D
dapan 已提交
593
      SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i);
L
Liu Jicong 已提交
594

595
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
D
dapan 已提交
596 597 598
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
599 600

      ++addNum;
D
dapan1121 已提交
601
    }
D
dapan1121 已提交
602 603
  }

D
dapan1121 已提交
604
  if (addNum <= 0) {
H
Haojun Liao 已提交
605
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
D
dapan1121 已提交
606
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
607 608
  }

L
Liu Jicong 已提交
609 610 611 612 613 614 615 616
  /*
    for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
      strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
      epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];

      ++epSet->numOfEps;
    }
  */
D
dapan 已提交
617 618

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
619
}
D
dapan1121 已提交
620

D
dapan1121 已提交
621
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
622 623 624
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
625
      SCH_TASK_ELOG("task already in execTask list, code:%x", code);
D
dapan1121 已提交
626 627
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }
L
Liu Jicong 已提交
628

D
dapan1121 已提交
629
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
D
dapan 已提交
630 631 632
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
633
  SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
634

D
dapan 已提交
635 636 637
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
638 639
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
640
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan 已提交
641 642
  } else {
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
D
dapan 已提交
643 644
  }

D
dapan1121 已提交
645 646 647 648
  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
D
dapan1121 已提交
649
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
650 651
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
652

D
dapan1121 已提交
653
    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno);
D
dapan1121 已提交
654 655 656 657
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
D
dapan1121 已提交
658 659

  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));
L
Liu Jicong 已提交
660

D
dapan1121 已提交
661 662 663
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
664 665
int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  *moved = false;
L
Liu Jicong 已提交
666

D
dapan1121 已提交
667
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
668
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
669 670
  }

D
dapan1121 已提交
671 672 673 674
  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
L
Liu Jicong 已提交
675

D
dapan1121 已提交
676
      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
677 678
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
679

D
dapan1121 已提交
680
    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno);
D
dapan 已提交
681 682 683 684
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
D
dapan1121 已提交
685 686

  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));
L
Liu Jicong 已提交
687

D
dapan 已提交
688 689 690
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
691 692
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
693
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
694 695 696 697 698 699
  }

  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
L
Liu Jicong 已提交
700

D
dapan1121 已提交
701
      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
702 703
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
704

D
dapan1121 已提交
705 706 707 708 709 710 711
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
L
Liu Jicong 已提交
712

D
dapan1121 已提交
713 714 715
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
716
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
D
dapan1121 已提交
717 718
  int8_t status = 0;
  ++pTask->tryTimes;
L
Liu Jicong 已提交
719

D
dapan1121 已提交
720 721 722 723 724
  if (schJobNeedToStop(pJob, &status)) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry cause of job status, job status:%s", jobTaskStatusStr(status));
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
725

D
dapan1121 已提交
726 727 728 729 730
  if (pTask->tryTimes >= REQUEST_MAX_TRY_TIMES) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry since reach max try times, tryTimes:%d", pTask->tryTimes);
    return TSDB_CODE_SUCCESS;
  }
L
Liu Jicong 已提交
731

D
dapan1121 已提交
732 733 734 735 736
  if (!NEED_SCHEDULER_RETRY_ERROR(errCode)) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
737

L
Liu Jicong 已提交
738
  // TODO CHECK epList/condidateList
D
dapan1121 已提交
739 740 741
  if (SCH_IS_DATA_SRC_TASK(pTask)) {
    if (pTask->tryTimes >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
      *needRetry = false;
L
Liu Jicong 已提交
742 743
      SCH_TASK_DLOG("task no more retry since all ep tried, tryTimes:%d, epNum:%d", pTask->tryTimes,
                    SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
D
dapan1121 已提交
744 745
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
746 747
  } else {
    int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
L
Liu Jicong 已提交
748

D
dapan1121 已提交
749
    if ((pTask->candidateIdx + 1) >= candidateNum) {
D
dapan1121 已提交
750
      *needRetry = false;
L
Liu Jicong 已提交
751 752
      SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
                    pTask->candidateIdx, candidateNum);
D
dapan1121 已提交
753 754 755 756
      return TSDB_CODE_SUCCESS;
    }
  }

D
dapan1121 已提交
757 758
  *needRetry = true;
  SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->tryTimes, errCode, tstrerror(errCode));
L
Liu Jicong 已提交
759

D
dapan1121 已提交
760
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
761 762 763 764 765 766 767
}

int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
  atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);

  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));
D
dapan1121 已提交
768
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
D
dapan1121 已提交
769 770
  }

D
dapan1121 已提交
771
  if (SCH_IS_DATA_SRC_TASK(pTask)) {
D
dapan1121 已提交
772 773 774 775 776 777 778 779
    SCH_SWITCH_EPSET(&pTask->plan->execNode);
  } else {
    ++pTask->candidateIdx;
  }

  SCH_ERR_RET(schLaunchTask(pJob, pTask));

  return TSDB_CODE_SUCCESS;
D
dapan 已提交
780 781
}

D
dapan1121 已提交
782
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
D
dapan1121 已提交
783 784 785 786 787 788
  int32_t      code = 0;
  SSchHbTrans *hb = NULL;

  hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
    qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
D
dapan1121 已提交
789
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
790 791
  }

D
dapan1121 已提交
792
  SCH_LOCK(SCH_WRITE, &hb->lock);
D
dapan1121 已提交
793
  memcpy(&hb->trans, trans, sizeof(*trans));
D
dapan1121 已提交
794
  SCH_UNLOCK(SCH_WRITE, &hb->lock);
D
dapan1121 已提交
795

L
Liu Jicong 已提交
796 797
  qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, handle:%p", schMgmt.sId,
         epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->transInst, trans->transHandle);
L
Liu Jicong 已提交
798

D
dapan1121 已提交
799 800 801
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
  if (TSDB_CODE_SUCCESS == errCode) {
    return;
  }

  int32_t origCode = atomic_load_32(&pJob->errCode);
  if (TSDB_CODE_SUCCESS == origCode) {
    if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) {
      goto _return;
    }

    origCode = atomic_load_32(&pJob->errCode);
  }

  if (NEED_CLIENT_HANDLE_ERROR(origCode)) {
    return;
  }
L
Liu Jicong 已提交
819

D
dapan1121 已提交
820
  if (NEED_CLIENT_HANDLE_ERROR(errCode)) {
D
dapan1121 已提交
821
    atomic_store_32(&pJob->errCode, errCode);
D
dapan1121 已提交
822
    goto _return;
D
dapan1121 已提交
823 824
  }

D
dapan1121 已提交
825
  return;
L
Liu Jicong 已提交
826 827

_return:
D
dapan1121 已提交
828 829 830 831 832 833 834 835 836 837

  SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode));
}

int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) {
  // if already FAILED, no more processing
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, status));

  schUpdateJobErrCode(pJob, errCode);

D
dapan1121 已提交
838
  if (atomic_load_8(&pJob->userFetch) || pJob->attr.syncSchedule) {
D
dapan1121 已提交
839 840 841
    tsem_post(&pJob->rspSem);
  }

D
dapan1121 已提交
842
  int32_t code = atomic_load_32(&pJob->errCode);
D
dapan1121 已提交
843

D
dapan1121 已提交
844
  SCH_JOB_DLOG("job failed with error: %s", tstrerror(code));
L
Liu Jicong 已提交
845

D
dapan1121 已提交
846
  SCH_RET(code);
D
dapan1121 已提交
847 848
}

D
dapan1121 已提交
849
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
850 851 852 853 854 855 856 857 858
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
  SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAILED, errCode));
}

// Note: no more error processing, handled in function internal
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) {
  SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_DROPPING, errCode));
}

D
dapan1121 已提交
859
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
860 861
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
  int32_t code = 0;
L
Liu Jicong 已提交
862

D
dapan1121 已提交
863
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_PARTIAL_SUCCEED));
D
dapan 已提交
864

D
dapan1121 已提交
865
  if (pJob->attr.syncSchedule) {
D
dapan 已提交
866
    tsem_post(&pJob->rspSem);
D
dapan 已提交
867
  }
L
Liu Jicong 已提交
868

D
dapan1121 已提交
869 870 871
  if (atomic_load_8(&pJob->userFetch)) {
    SCH_ERR_JRET(schFetchFromRemote(pJob));
  }
D
dapan 已提交
872

D
dapan 已提交
873
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
874 875 876

_return:

D
dapan1121 已提交
877
  SCH_RET(schProcessOnJobFailure(pJob, code));
D
dapan 已提交
878 879
}

880
void schProcessOnDataFetched(SSchJob *job) {
D
dapan 已提交
881 882
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
  tsem_post(&job->rspSem);
D
dapan 已提交
883 884
}

D
dapan1121 已提交
885
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
886
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
D
dapan1121 已提交
887
  int8_t status = 0;
L
Liu Jicong 已提交
888

D
dapan1121 已提交
889
  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
890
    SCH_TASK_DLOG("task failed not processed cause of job status, job status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
891 892 893
    SCH_RET(atomic_load_32(&pJob->errCode));
  }

L
Liu Jicong 已提交
894 895
  bool    needRetry = false;
  bool    moved = false;
D
dapan1121 已提交
896
  int32_t taskDone = 0;
D
dapan1121 已提交
897
  int32_t code = 0;
D
dapan1121 已提交
898

H
Haojun Liao 已提交
899
  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
L
Liu Jicong 已提交
900

D
dapan1121 已提交
901
  SCH_ERR_JRET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
L
Liu Jicong 已提交
902

D
dapan1121 已提交
903
  if (!needRetry) {
H
Haojun Liao 已提交
904
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
D
dapan1121 已提交
905 906

    if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
D
dapan1121 已提交
907 908
      SCH_ERR_JRET(schMoveTaskToFailList(pJob, pTask, &moved));
    } else {
D
dapan1121 已提交
909
      SCH_TASK_ELOG("task not in executing list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
910
      SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
911 912 913
    }

    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED);
L
Liu Jicong 已提交
914

D
dapan1121 已提交
915
    if (SCH_IS_WAIT_ALL_JOB(pJob)) {
D
dapan1121 已提交
916 917 918 919 920
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskFailed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);

D
dapan1121 已提交
921
      schUpdateJobErrCode(pJob, errCode);
L
Liu Jicong 已提交
922

D
dapan1121 已提交
923
      if (taskDone < pTask->level->taskNum) {
L
Liu Jicong 已提交
924
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
D
dapan 已提交
925
        SCH_RET(errCode);
D
dapan1121 已提交
926 927 928
      }
    }
  } else {
D
dapan1121 已提交
929
    SCH_ERR_JRET(schHandleTaskRetry(pJob, pTask));
L
Liu Jicong 已提交
930

D
dapan 已提交
931 932
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
933

D
dapan1121 已提交
934 935
_return:

D
dapan1121 已提交
936
  SCH_RET(schProcessOnJobFailure(pJob, errCode));
D
dapan1121 已提交
937 938
}

D
dapan1121 已提交
939
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
940
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
941
  bool    moved = false;
D
dapan1121 已提交
942 943
  int32_t code = 0;

D
dapan1121 已提交
944
  SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan 已提交
945

D
dapan1121 已提交
946
  SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved));
D
dapan1121 已提交
947

D
dapan1121 已提交
948
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED);
D
dapan1121 已提交
949

D
dapan1121 已提交
950
  SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask));
D
dapan1121 已提交
951 952

  SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
L
Liu Jicong 已提交
953

D
dapan1121 已提交
954
  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
D
dapan 已提交
955
  if (parentNum == 0) {
L
Liu Jicong 已提交
956
    int32_t taskDone = 0;
D
dapan1121 已提交
957
    if (SCH_IS_WAIT_ALL_JOB(pJob)) {
D
dapan1121 已提交
958 959 960 961
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskSucceed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
L
Liu Jicong 已提交
962

D
dapan1121 已提交
963
      if (taskDone < pTask->level->taskNum) {
S
Shengliang Guan 已提交
964
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
965
        return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
966
      } else if (taskDone > pTask->level->taskNum) {
D
dapan1121 已提交
967
        SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
968 969
      }

D
dapan1121 已提交
970
      if (pTask->level->taskFailed > 0) {
D
dapan1121 已提交
971 972 973
        SCH_RET(schProcessOnJobFailure(pJob, 0));
      } else {
        SCH_RET(schProcessOnJobPartialSuccess(pJob));
D
dapan1121 已提交
974 975
      }
    } else {
D
dapan1121 已提交
976
      pJob->resNode = pTask->succeedAddr;
D
dapan1121 已提交
977
    }
D
dapan 已提交
978

D
dapan1121 已提交
979
    pJob->fetchTask = pTask;
D
dapan1121 已提交
980

D
dapan1121 已提交
981
    SCH_ERR_JRET(schMoveTaskToExecList(pJob, pTask, &moved));
L
Liu Jicong 已提交
982

D
dapan1121 已提交
983
    SCH_RET(schProcessOnJobPartialSuccess(pJob));
D
dapan 已提交
984 985
  }

L
Liu Jicong 已提交
986 987 988 989
  /*
    if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
      strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
      job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
D
dapan 已提交
990

L
Liu Jicong 已提交
991 992 993
      ++job->dataSrcEps.numOfEps;
    }
  */
D
dapan 已提交
994

D
dapan 已提交
995
  for (int32_t i = 0; i < parentNum; ++i) {
D
dapan1121 已提交
996
    SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i);
L
Liu Jicong 已提交
997
    int32_t   readyNum = atomic_add_fetch_32(&par->childReady, 1);
D
dapan 已提交
998

D
dapan1121 已提交
999
    SCH_LOCK(SCH_WRITE, &par->lock);
L
Liu Jicong 已提交
1000 1001 1002 1003
    SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE,
                                    .taskId = pTask->taskId,
                                    .schedId = schMgmt.sId,
                                    .addr = pTask->succeedAddr};
X
Xiaoyu Wang 已提交
1004
    qSetSubplanExecutionNode(par->plan, pTask->plan->id.groupId, &source);
D
dapan1121 已提交
1005
    SCH_UNLOCK(SCH_WRITE, &par->lock);
L
Liu Jicong 已提交
1006

D
dapan 已提交
1007
    if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) {
D
dapan1121 已提交
1008
      SCH_ERR_RET(schLaunchTaskImpl(pJob, par));
D
dapan 已提交
1009 1010 1011 1012 1013
    }
  }

  return TSDB_CODE_SUCCESS;

D
dapan1121 已提交
1014
_return:
D
dapan 已提交
1015

D
dapan1121 已提交
1016 1017
  SCH_RET(schProcessOnJobFailure(pJob, code));
}
D
dapan 已提交
1018

D
dapan1121 已提交
1019 1020 1021
// Note: no more error processing, handled in function internal
int32_t schFetchFromRemote(SSchJob *pJob) {
  int32_t code = 0;
L
Liu Jicong 已提交
1022

D
dapan1121 已提交
1023 1024 1025 1026 1027
  if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) {
    SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch));
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1028 1029
  void *resData = atomic_load_ptr(&pJob->resData);
  if (resData) {
D
dapan1121 已提交
1030 1031
    atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1032
    SCH_JOB_DLOG("res already fetched, res:%p", resData);
D
dapan1121 已提交
1033 1034 1035 1036 1037 1038
    return TSDB_CODE_SUCCESS;
  }

  SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH));

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1039

D
dapan1121 已提交
1040 1041 1042 1043
_return:

  atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1044
  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
D
dapan1121 已提交
1045 1046
}

D
dapan1121 已提交
1047 1048
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
  SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
H
Hongze Cheng 已提交
1049

D
dapan1121 已提交
1050 1051 1052 1053
  atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
  atomic_store_ptr(&pJob->resData, pRsp);

  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
H
Hongze Cheng 已提交
1054

D
dapan1121 已提交
1055 1056 1057 1058 1059
  schProcessOnDataFetched(pJob);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1060
// Note: no more task error processing, handled in function internal
L
Liu Jicong 已提交
1061 1062
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize,
                             int32_t rspCode) {
D
dapan1121 已提交
1063
  int32_t code = 0;
L
Liu Jicong 已提交
1064 1065
  int8_t  status = 0;

D
dapan1121 已提交
1066
  if (schJobNeedToStop(pJob, &status)) {
L
Liu Jicong 已提交
1067 1068
    SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status),
                  rspCode);
D
dapan1121 已提交
1069 1070
    SCH_RET(atomic_load_32(&pJob->errCode));
  }
H
Haojun Liao 已提交
1071

D
dapan1121 已提交
1072 1073
  SCH_ERR_JRET(schValidateTaskReceivedMsgType(pJob, pTask, msgType));

D
dapan1121 已提交
1074
  switch (msgType) {
H
Haojun Liao 已提交
1075
    case TDMT_VND_CREATE_TABLE_RSP: {
X
Xiaoyu Wang 已提交
1076 1077
      SVCreateTbBatchRsp batchRsp = {0};
      if (msg) {
D
dapan1121 已提交
1078
        SCH_ERR_JRET(tDeserializeSVCreateTbBatchRsp(msg, msgSize, &batchRsp));
X
Xiaoyu Wang 已提交
1079 1080 1081 1082 1083 1084 1085
        if (batchRsp.rspList) {
          int32_t num = taosArrayGetSize(batchRsp.rspList);
          for (int32_t i = 0; i < num; ++i) {
            SVCreateTbRsp *rsp = taosArrayGet(batchRsp.rspList, i);
            if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) {
              taosArrayDestroy(batchRsp.rspList);
              SCH_ERR_JRET(rsp->code);
D
dapan 已提交
1086 1087
            }
          }
L
Liu Jicong 已提交
1088

X
Xiaoyu Wang 已提交
1089
          taosArrayDestroy(batchRsp.rspList);
D
dapan1121 已提交
1090
        }
L
Liu Jicong 已提交
1091 1092
      }

L
Liu Jicong 已提交
1093 1094 1095 1096
      SCH_ERR_JRET(rspCode);
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
      break;
    }
D
dapan1121 已提交
1097
    case TDMT_VND_SUBMIT_RSP: {
X
Xiaoyu Wang 已提交
1098 1099 1100 1101
      if (msg) {
        SSubmitRsp *rsp = (SSubmitRsp *)msg;
        SCH_ERR_JRET(rsp->code);
      }
1102

L
Liu Jicong 已提交
1103
      SCH_ERR_JRET(rspCode);
D
dapan1121 已提交
1104

L
Liu Jicong 已提交
1105 1106 1107 1108
      SSubmitRsp *rsp = (SSubmitRsp *)msg;
      if (rsp) {
        pJob->resNumOfRows += rsp->affectedRows;
      }
D
dapan1121 已提交
1109

L
Liu Jicong 已提交
1110
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
D
dapan1121 已提交
1111

L
Liu Jicong 已提交
1112 1113
      break;
    }
D
dapan1121 已提交
1114
    case TDMT_VND_QUERY_RSP: {
L
Liu Jicong 已提交
1115 1116
      SQueryTableRsp rsp = {0};
      if (msg) {
D
dapan1121 已提交
1117
        SCH_ERR_JRET(tDeserializeSQueryTableRsp(msg, msgSize, &rsp));
L
Liu Jicong 已提交
1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129
        SCH_ERR_JRET(rsp.code);
      }

      SCH_ERR_JRET(rspCode);

      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }

      // SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY));

      break;
L
Liu Jicong 已提交
1130
    }
D
dapan1121 已提交
1131
    case TDMT_VND_RES_READY_RSP: {
L
Liu Jicong 已提交
1132 1133 1134 1135 1136
      SResReadyRsp *rsp = (SResReadyRsp *)msg;

      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
1137
      }
L
Liu Jicong 已提交
1138 1139
      SCH_ERR_JRET(rsp->code);
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
D
dapan1121 已提交
1140

L
Liu Jicong 已提交
1141 1142
      break;
    }
D
dapan1121 已提交
1143 1144 1145 1146 1147
    case TDMT_VND_EXPLAIN_RSP: {
      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }
H
Hongze Cheng 已提交
1148

D
dapan1121 已提交
1149 1150 1151 1152 1153 1154 1155 1156 1157 1158
      if (!SCH_IS_EXPLAIN_JOB(pJob)) {
        SCH_TASK_ELOG("invalid msg received for none explain query, msg type:%s", TMSG_INFO(msgType));
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }

      if (pJob->resData) {
        SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->resData);
        SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
      }

D
dapan1121 已提交
1159 1160 1161 1162 1163 1164
      SExplainRsp rsp = {0};
      if (tDeserializeSExplainRsp(msg, msgSize, &rsp)) {
        taosMemoryFree(rsp.subplanInfo);
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

D
dapan1121 已提交
1165
      SRetrieveTableRsp *pRsp = NULL;
D
dapan1121 已提交
1166
      SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, &rsp, pTask->plan->id.groupId, &pRsp));
D
dapan1121 已提交
1167 1168

      if (pRsp) {
D
dapan1121 已提交
1169
        SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
D
dapan1121 已提交
1170 1171 1172
      }
      break;
    }
L
Liu Jicong 已提交
1173 1174
    case TDMT_VND_FETCH_RSP: {
      SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
D
dapan1121 已提交
1175

L
Liu Jicong 已提交
1176 1177 1178 1179
      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }
D
dapan1121 已提交
1180

D
dapan1121 已提交
1181
      if (SCH_IS_EXPLAIN_JOB(pJob)) {
H
Hongze Cheng 已提交
1182
        if (rsp->completed) {
D
dapan1121 已提交
1183 1184 1185 1186 1187
          SRetrieveTableRsp *pRsp = NULL;
          SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
          if (pRsp) {
            SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
          }
H
Hongze Cheng 已提交
1188

D
dapan1121 已提交
1189 1190 1191
          return TSDB_CODE_SUCCESS;
        }

D
dapan1121 已提交
1192 1193
        atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1194 1195 1196 1197 1198
        SCH_ERR_JRET(schFetchFromRemote(pJob));

        return TSDB_CODE_SUCCESS;
      }

X
Xiaoyu Wang 已提交
1199 1200
      if (pJob->resData) {
        SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->resData);
wafwerar's avatar
wafwerar 已提交
1201
        taosMemoryFreeClear(rsp);
L
Liu Jicong 已提交
1202 1203
        SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
      }
H
Haojun Liao 已提交
1204

X
Xiaoyu Wang 已提交
1205
      atomic_store_ptr(&pJob->resData, rsp);
L
Liu Jicong 已提交
1206
      atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
D
dapan1121 已提交
1207

L
Liu Jicong 已提交
1208 1209
      if (rsp->completed) {
        SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
1210
      }
L
Liu Jicong 已提交
1211 1212 1213 1214 1215 1216

      SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);

      schProcessOnDataFetched(pJob);
      break;
    }
D
dapan1121 已提交
1217
    case TDMT_VND_DROP_TASK_RSP: {
L
Liu Jicong 已提交
1218 1219 1220 1221 1222
      // SHOULD NEVER REACH HERE
      SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:%" PRIx64, pJob->refId);
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
      break;
    }
D
dapan1121 已提交
1223 1224 1225 1226
    case TDMT_SCH_LINK_BROKEN:
      SCH_TASK_ELOG("link broken received, error:%x - %s", rspCode, tstrerror(rspCode));
      SCH_ERR_JRET(rspCode);
      break;
D
dapan1121 已提交
1227
    default:
D
dapan1121 已提交
1228
      SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%s", msgType, SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
1229
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
1230 1231 1232 1233 1234
  }

  return TSDB_CODE_SUCCESS;

_return:
D
dapan1121 已提交
1235

D
dapan1121 已提交
1236
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
1237 1238
}

D
dapan1121 已提交
1239
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
H
Hongze Cheng 已提交
1240 1241 1242 1243
  int32_t s = taosHashGetSize(pTaskList);
  if (s <= 0) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1244

H
Hongze Cheng 已提交
1245 1246 1247 1248
  SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
  if (NULL == task || NULL == (*task)) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1249

H
Hongze Cheng 已提交
1250 1251 1252
  *pTask = *task;

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1253 1254
}

D
dapan1121 已提交
1255
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) {
H
Hongze Cheng 已提交
1256 1257
  if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 ||
      taosArrayGetSize(pTask->execNodes) <= 0) {
D
dapan1121 已提交
1258 1259 1260 1261 1262 1263 1264 1265 1266
    return TSDB_CODE_SUCCESS;
  }

  SSchNodeInfo *nodeInfo = taosArrayGet(pTask->execNodes, 0);
  nodeInfo->handle = handle;

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1267
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
L
Liu Jicong 已提交
1268
  int32_t                code = 0;
D
dapan1121 已提交
1269
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
L
Liu Jicong 已提交
1270
  SSchTask              *pTask = NULL;
L
Liu Jicong 已提交
1271

D
dapan1121 已提交
1272
  SSchJob *pJob = schAcquireJob(pParam->refId);
D
dapan1121 已提交
1273
  if (NULL == pJob) {
D
dapan1121 已提交
1274
    qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:%" PRIx64,
L
Liu Jicong 已提交
1275
          pParam->queryId, pParam->taskId, pParam->refId);
D
dapan1121 已提交
1276
    SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
1277 1278
  }

D
dapan1121 已提交
1279 1280 1281 1282 1283
  schGetTaskFromTaskList(pJob->execTasks, pParam->taskId, &pTask);
  if (NULL == pTask) {
    if (TDMT_VND_EXPLAIN_RSP == msgType) {
      schGetTaskFromTaskList(pJob->succTasks, pParam->taskId, &pTask);
    } else {
H
Hongze Cheng 已提交
1284 1285
      SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
                   pParam->taskId);
D
dapan1121 已提交
1286 1287
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }
D
dapan1121 已提交
1288
  }
H
Hongze Cheng 已提交
1289

D
dapan1121 已提交
1290
  if (NULL == pTask) {
H
Hongze Cheng 已提交
1291 1292
    SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
                 pParam->taskId);
D
dapan1121 已提交
1293 1294
    SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
D
dapan1121 已提交
1295

D
dapan1121 已提交
1296
  SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
L
Liu Jicong 已提交
1297

L
Liu Jicong 已提交
1298
  SCH_SET_TASK_HANDLE(pTask, pMsg->handle);
D
dapan1121 已提交
1299
  schUpdateTaskExecNodeHandle(pTask, pMsg->handle, rspCode);
D
dapan1121 已提交
1300
  SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
D
dapan1121 已提交
1301

H
Haojun Liao 已提交
1302
_return:
D
dapan1121 已提交
1303
  if (pJob) {
D
dapan1121 已提交
1304
    schReleaseJob(pParam->refId);
D
dapan1121 已提交
1305 1306
  }

wafwerar's avatar
wafwerar 已提交
1307
  taosMemoryFreeClear(param);
D
dapan1121 已提交
1308 1309 1310
  SCH_RET(code);
}

L
Liu Jicong 已提交
1311
int32_t schHandleSubmitCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1312 1313
  return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
}
H
Haojun Liao 已提交
1314

L
Liu Jicong 已提交
1315
int32_t schHandleCreateTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
H
Haojun Liao 已提交
1316 1317 1318
  return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
}

L
Liu Jicong 已提交
1319
int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1320 1321
  return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
H
Haojun Liao 已提交
1322

L
Liu Jicong 已提交
1323
int32_t schHandleFetchCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1324 1325
  return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
}
H
Haojun Liao 已提交
1326

L
Liu Jicong 已提交
1327
int32_t schHandleReadyCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1328 1329
  return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
}
H
Haojun Liao 已提交
1330

D
dapan1121 已提交
1331 1332 1333 1334
int32_t schHandleExplainCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_EXPLAIN_RSP, code);
}

L
Liu Jicong 已提交
1335
int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1336
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
L
Liu Jicong 已提交
1337
  qDebug("QID:%" PRIx64 ",TID:%" PRIx64 " drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code);
1338
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1339 1340
}

L
Liu Jicong 已提交
1341
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1342 1343 1344 1345
  if (code) {
    qError("hb rsp error:%s", tstrerror(code));
    SCH_ERR_RET(code);
  }
L
Liu Jicong 已提交
1346

D
dapan1121 已提交
1347 1348 1349 1350 1351 1352
  SSchedulerHbRsp rsp = {0};
  if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) {
    qError("invalid hb rsp msg, size:%d", pMsg->len);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
1353 1354
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;

D
dapan1121 已提交
1355 1356 1357
  SSchTrans trans = {0};
  trans.transInst = pParam->transport;
  trans.transHandle = pMsg->handle;
L
Liu Jicong 已提交
1358

D
dapan1121 已提交
1359
  SCH_ERR_RET(schUpdateHbConnection(&rsp.epId, &trans));
D
dapan1121 已提交
1360 1361

  int32_t taskNum = (int32_t)taosArrayGetSize(rsp.taskStatus);
L
Liu Jicong 已提交
1362 1363
  qDebug("%d task status in hb rsp, nodeId:%d, fqdn:%s, port:%d", taskNum, rsp.epId.nodeId, rsp.epId.ep.fqdn,
         rsp.epId.ep.port);
D
dapan1121 已提交
1364

D
dapan1121 已提交
1365 1366
  for (int32_t i = 0; i < taskNum; ++i) {
    STaskStatus *taskStatus = taosArrayGet(rsp.taskStatus, i);
L
Liu Jicong 已提交
1367

D
dapan1121 已提交
1368 1369
    SSchJob *pJob = schAcquireJob(taskStatus->refId);
    if (NULL == pJob) {
L
Liu Jicong 已提交
1370 1371 1372
      qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId,
            taskStatus->queryId, taskStatus->taskId);
      // TODO DROP TASK FROM SERVER!!!!
D
dapan1121 已提交
1373 1374
      continue;
    }
L
Liu Jicong 已提交
1375

D
dapan1121 已提交
1376
    // TODO
L
Liu Jicong 已提交
1377 1378 1379

    SCH_JOB_DLOG("TID:0x%" PRIx64 " task status in server: %s", taskStatus->taskId,
                 jobTaskStatusStr(taskStatus->status));
D
dapan1121 已提交
1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390

    schReleaseJob(taskStatus->refId);
  }

_return:

  tFreeSSchedulerHbRsp(&rsp);

  SCH_RET(code);
}

D
dapan1121 已提交
1391 1392 1393 1394
int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param;
  rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);

D
dapan1121 已提交
1395 1396
  qDebug("handle %p is broken", pMsg->handle);

D
dapan1121 已提交
1397 1398
  if (head->isHbParam) {
    SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param;
L
Liu Jicong 已提交
1399
    SSchTrans            trans = {.transInst = hbParam->transport, .transHandle = NULL};
D
dapan1121 已提交
1400 1401 1402 1403 1404 1405 1406 1407 1408 1409
    SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans));

    SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId));
  } else {
    SCH_ERR_RET(schHandleCallback(param, pMsg, TDMT_SCH_LINK_BROKEN, code));
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1410
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
D
dapan1121 已提交
1411
  switch (msgType) {
H
Haojun Liao 已提交
1412 1413 1414
    case TDMT_VND_CREATE_TABLE:
      *fp = schHandleCreateTableCallback;
      break;
L
Liu Jicong 已提交
1415
    case TDMT_VND_SUBMIT:
D
dapan1121 已提交
1416 1417
      *fp = schHandleSubmitCallback;
      break;
L
Liu Jicong 已提交
1418
    case TDMT_VND_QUERY:
D
dapan1121 已提交
1419 1420
      *fp = schHandleQueryCallback;
      break;
L
Liu Jicong 已提交
1421
    case TDMT_VND_RES_READY:
D
dapan1121 已提交
1422 1423
      *fp = schHandleReadyCallback;
      break;
D
dapan1121 已提交
1424 1425 1426
    case TDMT_VND_EXPLAIN:
      *fp = schHandleExplainCallback;
      break;
L
Liu Jicong 已提交
1427
    case TDMT_VND_FETCH:
D
dapan1121 已提交
1428 1429 1430 1431 1432
      *fp = schHandleFetchCallback;
      break;
    case TDMT_VND_DROP_TASK:
      *fp = schHandleDropCallback;
      break;
D
dapan1121 已提交
1433 1434 1435
    case TDMT_VND_QUERY_HEARTBEAT:
      *fp = schHandleHbCallback;
      break;
D
dapan1121 已提交
1436 1437 1438
    case TDMT_SCH_LINK_BROKEN:
      *fp = schHandleLinkBrokenCallback;
      break;
D
dapan1121 已提交
1439
    default:
D
dapan1121 已提交
1440
      qError("unknown msg type for callback, msgType:%d", msgType);
D
dapan1121 已提交
1441 1442 1443 1444 1445 1446
      SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1447
int32_t schGenerateTaskCallBackAHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, SMsgSendInfo **pMsgSendInfo) {
H
Hongze Cheng 已提交
1448
  int32_t       code = 0;
D
dapan1121 已提交
1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483
  SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (NULL == msgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));

  param->queryId = pJob->queryId;
  param->refId = pJob->refId;
  param->taskId = SCH_TASK_ID(pTask);
  param->transport = pJob->transport;

  msgSendInfo->param = param;
  msgSendInfo->fp = fp;

  *pMsgSendInfo = msgSendInfo;

  return TSDB_CODE_SUCCESS;

_return:

  taosMemoryFree(param);
  taosMemoryFree(msgSendInfo);

  SCH_RET(code);
}

D
dapan1121 已提交
1484
void schFreeRpcCtxVal(const void *arg) {
D
dapan1121 已提交
1485 1486 1487
  if (NULL == arg) {
    return;
  }
L
Liu Jicong 已提交
1488 1489

  SMsgSendInfo *pMsgSendInfo = (SMsgSendInfo *)arg;
D
dapan1121 已提交
1490 1491
  taosMemoryFreeClear(pMsgSendInfo->param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1492
}
D
dapan1121 已提交
1493

D
dapan1121 已提交
1494
int32_t schMakeTaskCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
D
dapan1121 已提交
1495
  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  param->queryId = pJob->queryId;
  param->refId = pJob->refId;
  param->taskId = SCH_TASK_ID(pTask);
  param->transport = pJob->transport;

  *pParam = param;

  return TSDB_CODE_SUCCESS;
}

int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
D
dapan1121 已提交
1512
  SSchHbCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  param->head.isHbParam = true;

  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);

  param->nodeEpId.nodeId = addr->nodeId;
  memcpy(&param->nodeEpId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
  param->transport = pJob->transport;

  *pParam = param;

  return TSDB_CODE_SUCCESS;
}

int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb) {
L
Liu Jicong 已提交
1532 1533 1534
  int32_t       code = 0;
  SMsgSendInfo *pMsgSendInfo = NULL;

D
dapan1121 已提交
1535
  pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546
  if (NULL == pMsgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  if (isHb) {
    SCH_ERR_JRET(schMakeHbCallbackParam(pJob, pTask, &pMsgSendInfo->param));
  } else {
    SCH_ERR_JRET(schMakeTaskCallbackParam(pJob, pTask, &pMsgSendInfo->param));
  }

L
Liu Jicong 已提交
1547
  int32_t              msgType = TDMT_SCH_LINK_BROKEN;
D
dapan1121 已提交
1548 1549
  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
L
Liu Jicong 已提交
1550

D
dapan1121 已提交
1551 1552 1553 1554
  pMsgSendInfo->fp = fp;

  brokenVal->msgType = msgType;
  brokenVal->val = pMsgSendInfo;
D
dapan1121 已提交
1555
  brokenVal->clone = schCloneSMsgSendInfo;
D
dapan1121 已提交
1556
  brokenVal->freeFunc = schFreeRpcCtxVal;
L
Liu Jicong 已提交
1557

D
dapan1121 已提交
1558 1559 1560 1561
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
1562 1563
  taosMemoryFreeClear(pMsgSendInfo->param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1564 1565 1566 1567

  SCH_RET(code);
}

D
dapan1121 已提交
1568
int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
H
Hongze Cheng 已提交
1569
  int32_t       code = 0;
D
dapan1121 已提交
1570 1571
  SMsgSendInfo *pReadyMsgSendInfo = NULL;
  SMsgSendInfo *pExplainMsgSendInfo = NULL;
D
dapan1121 已提交
1572 1573 1574 1575 1576 1577

  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pCtx->args) {
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1578

D
dapan1121 已提交
1579 1580
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_RES_READY, &pReadyMsgSendInfo));
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_EXPLAIN, &pExplainMsgSendInfo));
D
dapan1121 已提交
1581

H
Hongze Cheng 已提交
1582
  int32_t    msgType = TDMT_VND_RES_READY_RSP;
D
dapan1121 已提交
1583 1584 1585
  SRpcCtxVal ctxVal = {.val = pReadyMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
D
dapan1121 已提交
1586 1587 1588
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1589 1590
  msgType = TDMT_VND_EXPLAIN_RSP;
  ctxVal.val = pExplainMsgSendInfo;
D
dapan1121 已提交
1591 1592 1593 1594
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
1595

D
dapan1121 已提交
1596 1597
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, false));

D
dapan1121 已提交
1598 1599 1600 1601 1602
  return TSDB_CODE_SUCCESS;

_return:

  taosHashCleanup(pCtx->args);
H
Hongze Cheng 已提交
1603

D
dapan1121 已提交
1604 1605 1606 1607 1608 1609 1610 1611 1612
  if (pReadyMsgSendInfo) {
    taosMemoryFreeClear(pReadyMsgSendInfo->param);
    taosMemoryFreeClear(pReadyMsgSendInfo);
  }

  if (pExplainMsgSendInfo) {
    taosMemoryFreeClear(pExplainMsgSendInfo->param);
    taosMemoryFreeClear(pExplainMsgSendInfo);
  }
D
dapan1121 已提交
1613 1614 1615 1616 1617

  SCH_RET(code);
}

int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
L
Liu Jicong 已提交
1618
  int32_t              code = 0;
D
dapan1121 已提交
1619
  SSchHbCallbackParam *param = NULL;
L
Liu Jicong 已提交
1620 1621 1622
  SMsgSendInfo        *pMsgSendInfo = NULL;
  SQueryNodeAddr      *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
  SQueryNodeEpId       epId = {0};
D
dapan1121 已提交
1623 1624 1625

  epId.nodeId = addr->nodeId;
  memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
D
dapan1121 已提交
1626 1627 1628 1629 1630 1631

  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pCtx->args) {
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1632

D
dapan1121 已提交
1633
  pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1634 1635 1636 1637 1638
  if (NULL == pMsgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1639
  param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1640
  if (NULL == param) {
D
dapan1121 已提交
1641
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1642 1643 1644
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

L
Liu Jicong 已提交
1645
  int32_t              msgType = TDMT_VND_QUERY_HEARTBEAT_RSP;
D
dapan1121 已提交
1646 1647 1648
  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_QUERY_HEARTBEAT, &fp));

D
dapan1121 已提交
1649
  param->nodeEpId = epId;
D
dapan1121 已提交
1650
  param->transport = pJob->transport;
L
Liu Jicong 已提交
1651

D
dapan1121 已提交
1652 1653 1654
  pMsgSendInfo->param = param;
  pMsgSendInfo->fp = fp;

D
dapan1121 已提交
1655
  SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
D
dapan1121 已提交
1656 1657 1658 1659 1660
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1661 1662
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, true));

D
dapan1121 已提交
1663 1664 1665 1666 1667
  return TSDB_CODE_SUCCESS;

_return:

  taosHashCleanup(pCtx->args);
D
dapan1121 已提交
1668 1669
  taosMemoryFreeClear(param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1670 1671 1672 1673

  SCH_RET(code);
}

D
dapan1121 已提交
1674
int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
L
Liu Jicong 已提交
1675
  int32_t     code = 0;
D
dapan1121 已提交
1676 1677 1678
  SSchHbTrans hb = {0};

  hb.trans.transInst = pJob->transport;
L
Liu Jicong 已提交
1679

D
dapan1121 已提交
1680 1681 1682 1683 1684
  SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx));

  code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans));
  if (code) {
    schFreeRpcCtx(&hb.rpcCtx);
L
Liu Jicong 已提交
1685

D
dapan1121 已提交
1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699
    if (HASH_NODE_EXIST(code)) {
      *exist = true;
      return TSDB_CODE_SUCCESS;
    }

    qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
    SCH_ERR_RET(code);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHeader **pDst) {
  if (pSrc->isHbParam) {
D
dapan1121 已提交
1700
    SSchHbCallbackParam *dst = taosMemoryMalloc(sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711
    if (NULL == dst) {
      qError("malloc SSchHbCallbackParam failed");
      SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    memcpy(dst, pSrc, sizeof(*dst));
    *pDst = (SSchCallbackParamHeader *)dst;

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1712
  SSchTaskCallbackParam *dst = taosMemoryMalloc(sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
1713 1714 1715 1716
  if (NULL == dst) {
    qError("malloc SSchTaskCallbackParam failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1717

D
dapan1121 已提交
1718 1719
  memcpy(dst, pSrc, sizeof(*dst));
  *pDst = (SSchCallbackParamHeader *)dst;
L
Liu Jicong 已提交
1720

D
dapan1121 已提交
1721 1722 1723
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1724 1725
int32_t schCloneSMsgSendInfo(void *src, void **dst) {
  SMsgSendInfo *pSrc = src;
L
Liu Jicong 已提交
1726
  int32_t       code = 0;
D
dapan1121 已提交
1727
  SMsgSendInfo *pDst = taosMemoryMalloc(sizeof(*pSrc));
D
dapan1121 已提交
1728
  if (NULL == pDst) {
D
dapan1121 已提交
1729 1730 1731 1732
    qError("malloc SMsgSendInfo for rpcCtx failed, len:%d", (int32_t)sizeof(*pSrc));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1733 1734
  memcpy(pDst, pSrc, sizeof(*pSrc));
  pDst->param = NULL;
D
dapan1121 已提交
1735

D
dapan1121 已提交
1736
  SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
D
dapan1121 已提交
1737

D
dapan1121 已提交
1738
  *dst = pDst;
D
dapan1121 已提交
1739

D
dapan1121 已提交
1740
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1741

D
dapan1121 已提交
1742 1743
_return:

D
dapan1121 已提交
1744
  taosMemoryFreeClear(pDst);
D
dapan1121 已提交
1745 1746 1747 1748 1749 1750 1751
  SCH_RET(code);
}

int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
  int32_t code = 0;
  memcpy(&pDst->brokenVal, &pSrc->brokenVal, sizeof(pSrc->brokenVal));
  pDst->brokenVal.val = NULL;
L
Liu Jicong 已提交
1752

D
dapan1121 已提交
1753
  SCH_ERR_RET(schCloneSMsgSendInfo(pSrc->brokenVal.val, &pDst->brokenVal.val));
D
dapan1121 已提交
1754 1755 1756 1757 1758 1759 1760 1761

  pDst->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pDst->args) {
    qError("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SRpcCtxVal dst = {0};
L
Liu Jicong 已提交
1762
  void      *pIter = taosHashIterate(pSrc->args, NULL);
D
dapan1121 已提交
1763 1764
  while (pIter) {
    SRpcCtxVal *pVal = (SRpcCtxVal *)pIter;
L
Liu Jicong 已提交
1765
    int32_t    *msgType = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
1766

D
dapan1121 已提交
1767 1768
    dst = *pVal;
    dst.val = NULL;
L
Liu Jicong 已提交
1769

D
dapan1121 已提交
1770
    SCH_ERR_JRET(schCloneSMsgSendInfo(pVal->val, &dst.val));
L
Liu Jicong 已提交
1771

D
dapan1121 已提交
1772
    if (taosHashPut(pDst->args, msgType, sizeof(*msgType), &dst, sizeof(dst))) {
D
dapan1121 已提交
1773
      qError("taosHashPut msg %d to rpcCtx failed", *msgType);
D
dapan1121 已提交
1774
      (*dst.freeFunc)(dst.val);
D
dapan1121 已提交
1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    pIter = taosHashIterate(pSrc->args, pIter);
  }

  return TSDB_CODE_SUCCESS;

_return:

  schFreeRpcCtx(pDst);
  SCH_RET(code);
}

L
Liu Jicong 已提交
1789 1790
int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet *epSet, int32_t msgType, void *msg,
                        uint32_t msgSize, bool persistHandle, SRpcCtx *ctx) {
D
dapan1121 已提交
1791
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
1792 1793 1794

  SSchTrans *trans = (SSchTrans *)transport;

D
dapan1121 已提交
1795 1796
  SMsgSendInfo *pMsgSendInfo = NULL;
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, msgType, &pMsgSendInfo));
D
dapan1121 已提交
1797 1798 1799

  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
L
Liu Jicong 已提交
1800
  pMsgSendInfo->msgInfo.handle = trans->transHandle;
D
dapan1121 已提交
1801
  pMsgSendInfo->msgType = msgType;
D
dapan1121 已提交
1802

L
Liu Jicong 已提交
1803 1804 1805 1806 1807
  qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "instance:%p, handle:%p", TMSG_INFO(msgType),
         ntohl(((SMsgHead *)msg)->vgId), epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, pJob->refId,
         trans->transInst, trans->transHandle);

  int64_t transporterId = 0;
D
dapan1121 已提交
1808
  code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
D
dapan1121 已提交
1809 1810 1811
  if (code) {
    SCH_ERR_JRET(code);
  }
D
dapan1121 已提交
1812

D
dapan1121 已提交
1813
  SCH_TASK_DLOG("req msg sent, refId:%" PRIx64 ", type:%d, %s", pJob->refId, msgType, TMSG_INFO(msgType));
D
dapan1121 已提交
1814 1815 1816
  return TSDB_CODE_SUCCESS;

_return:
L
Liu Jicong 已提交
1817

D
dapan1121 已提交
1818 1819 1820 1821
  if (pMsgSendInfo) {
    taosMemoryFreeClear(pMsgSendInfo->param);
    taosMemoryFreeClear(pMsgSendInfo);
  }
H
Hongze Cheng 已提交
1822

D
dapan1121 已提交
1823 1824 1825
  SCH_RET(code);
}

D
dapan1121 已提交
1826 1827
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
  SSchedulerHbReq req = {0};
L
Liu Jicong 已提交
1828 1829 1830 1831
  int32_t         code = 0;
  SRpcCtx         rpcCtx = {0};
  SSchTrans       trans = {0};
  int32_t         msgType = TDMT_VND_QUERY_HEARTBEAT;
D
dapan1121 已提交
1832

L
Liu Jicong 已提交
1833
  req.header.vgId = nodeEpId->nodeId;
D
dapan1121 已提交
1834 1835 1836 1837 1838
  req.sId = schMgmt.sId;
  memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));

  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
L
Liu Jicong 已提交
1839 1840
    qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
           nodeEpId->ep.port);
D
dapan1121 已提交
1841 1842 1843 1844 1845 1846 1847
    SCH_ERR_RET(code);
  }

  SCH_LOCK(SCH_WRITE, &hb->lock);
  code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx);
  memcpy(&trans, &hb->trans, sizeof(trans));
  SCH_UNLOCK(SCH_WRITE, &hb->lock);
L
Liu Jicong 已提交
1848

D
dapan1121 已提交
1849
  SCH_ERR_RET(code);
L
Liu Jicong 已提交
1850

D
dapan1121 已提交
1851 1852 1853 1854 1855
  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
  if (msgSize < 0) {
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
1856
  void *msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1857 1858 1859 1860
  if (NULL == msg) {
    qError("calloc hb req %d failed", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1861

D
dapan1121 已提交
1862 1863 1864 1865 1866
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1867
  SMsgSendInfo *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1868 1869 1870 1871 1872
  if (NULL == pMsgSendInfo) {
    qError("calloc SMsgSendInfo failed");
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1873
  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889
  if (NULL == param) {
    qError("calloc SSchTaskCallbackParam failed");
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));

  param->transport = trans.transInst;

  pMsgSendInfo->param = param;
  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
  pMsgSendInfo->msgInfo.handle = trans.transHandle;
  pMsgSendInfo->msgType = msgType;
  pMsgSendInfo->fp = fp;
L
Liu Jicong 已提交
1890 1891 1892

  int64_t transporterId = 0;
  SEpSet  epSet = {.inUse = 0, .numOfEps = 1};
D
dapan1121 已提交
1893
  memcpy(&epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
D
dapan1121 已提交
1894

L
Liu Jicong 已提交
1895 1896 1897
  qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle,
         nodeEpId->ep.fqdn, nodeEpId->ep.port);

D
dapan1121 已提交
1898 1899
  code = asyncSendMsgToServerExt(trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx);
  if (code) {
L
Liu Jicong 已提交
1900 1901
    qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s", trans.transInst,
           trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code));
D
dapan1121 已提交
1902 1903 1904
    SCH_ERR_JRET(code);
  }

D
dapan1121 已提交
1905
  qDebug("hb msg sent");
D
dapan1121 已提交
1906 1907 1908 1909
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
1910 1911 1912
  taosMemoryFreeClear(msg);
  taosMemoryFreeClear(param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1913 1914 1915 1916
  schFreeRpcCtx(&rpcCtx);
  SCH_RET(code);
}

D
dapan1121 已提交
1917
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
D
dapan1121 已提交
1918
  uint32_t msgSize = 0;
L
Liu Jicong 已提交
1919 1920 1921 1922 1923 1924
  void    *msg = NULL;
  int32_t  code = 0;
  bool     isCandidateAddr = false;
  bool     persistHandle = false;
  SRpcCtx  rpcCtx = {0};

D
dapan1121 已提交
1925
  if (NULL == addr) {
D
dapan1121 已提交
1926
    addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
D
dapan1121 已提交
1927 1928 1929
    isCandidateAddr = true;
  }

L
Liu Jicong 已提交
1930
  SEpSet epSet = addr->epSet;
H
Haojun Liao 已提交
1931

D
dapan1121 已提交
1932
  switch (msgType) {
H
Haojun Liao 已提交
1933
    case TDMT_VND_CREATE_TABLE:
D
dapan1121 已提交
1934
    case TDMT_VND_SUBMIT: {
D
dapan1121 已提交
1935
      msgSize = pTask->msgLen;
wafwerar's avatar
wafwerar 已提交
1936
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1937 1938 1939 1940 1941 1942
      if (NULL == msg) {
        SCH_TASK_ELOG("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      memcpy(msg, pTask->msg, msgSize);
D
dapan1121 已提交
1943 1944
      break;
    }
1945

D
dapan1121 已提交
1946
    case TDMT_VND_QUERY: {
D
dapan1121 已提交
1947
      SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
L
Liu Jicong 已提交
1948

1949 1950
      uint32_t len = strlen(pJob->sql);
      msgSize = sizeof(SSubQueryMsg) + pTask->msgLen + len;
wafwerar's avatar
wafwerar 已提交
1951
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1952
      if (NULL == msg) {
D
dapan 已提交
1953
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
1954 1955 1956 1957
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      SSubQueryMsg *pMsg = msg;
D
dapan 已提交
1958
      pMsg->header.vgId = htonl(addr->nodeId);
L
Liu Jicong 已提交
1959 1960 1961 1962 1963
      pMsg->sId = htobe64(schMgmt.sId);
      pMsg->queryId = htobe64(pJob->queryId);
      pMsg->taskId = htobe64(pTask->taskId);
      pMsg->refId = htobe64(pJob->refId);
      pMsg->taskType = TASK_TYPE_TEMP;
D
dapan1121 已提交
1964
      pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob);
L
Liu Jicong 已提交
1965 1966
      pMsg->phyLen = htonl(pTask->msgLen);
      pMsg->sqlLen = htonl(len);
1967 1968 1969

      memcpy(pMsg->msg, pJob->sql, len);
      memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
D
dapan1121 已提交
1970 1971

      persistHandle = true;
D
dapan1121 已提交
1972
      break;
1973 1974
    }

D
dapan1121 已提交
1975
    case TDMT_VND_RES_READY: {
S
Shengliang Guan 已提交
1976
      msgSize = sizeof(SResReadyReq);
wafwerar's avatar
wafwerar 已提交
1977
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1978
      if (NULL == msg) {
D
dapan 已提交
1979
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
1980 1981 1982
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

S
Shengliang Guan 已提交
1983
      SResReadyReq *pMsg = msg;
L
Liu Jicong 已提交
1984 1985 1986 1987

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
1988
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
1989
      pMsg->taskId = htobe64(pTask->taskId);
D
dapan1121 已提交
1990 1991 1992
      break;
    }
    case TDMT_VND_FETCH: {
S
Shengliang Guan 已提交
1993
      msgSize = sizeof(SResFetchReq);
wafwerar's avatar
wafwerar 已提交
1994
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1995
      if (NULL == msg) {
D
dapan 已提交
1996
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
1997 1998
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
1999

S
Shengliang Guan 已提交
2000
      SResFetchReq *pMsg = msg;
L
Liu Jicong 已提交
2001 2002 2003 2004

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
2005
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
2006
      pMsg->taskId = htobe64(pTask->taskId);
D
dapan1121 已提交
2007

D
dapan1121 已提交
2008 2009
      break;
    }
L
Liu Jicong 已提交
2010
    case TDMT_VND_DROP_TASK: {
S
Shengliang Guan 已提交
2011
      msgSize = sizeof(STaskDropReq);
wafwerar's avatar
wafwerar 已提交
2012
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2013
      if (NULL == msg) {
D
dapan 已提交
2014
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2015 2016
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
2017

S
Shengliang Guan 已提交
2018
      STaskDropReq *pMsg = msg;
L
Liu Jicong 已提交
2019 2020 2021 2022

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
2023
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
2024 2025
      pMsg->taskId = htobe64(pTask->taskId);
      pMsg->refId = htobe64(pJob->refId);
D
dapan1121 已提交
2026 2027 2028
      break;
    }
    case TDMT_VND_QUERY_HEARTBEAT: {
D
dapan1121 已提交
2029
      SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &rpcCtx));
L
Liu Jicong 已提交
2030

D
dapan1121 已提交
2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041
      SSchedulerHbReq req = {0};
      req.sId = schMgmt.sId;
      req.header.vgId = addr->nodeId;
      req.epId.nodeId = addr->nodeId;
      memcpy(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));

      msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
      if (msgSize < 0) {
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
wafwerar's avatar
wafwerar 已提交
2042
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2043 2044 2045 2046 2047 2048 2049 2050
      if (NULL == msg) {
        SCH_JOB_ELOG("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
      if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
2051 2052

      persistHandle = true;
D
dapan1121 已提交
2053 2054 2055
      break;
    }
    default:
D
dapan1121 已提交
2056
      SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType);
D
dapan1121 已提交
2057 2058 2059 2060
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      break;
  }

D
dapan1121 已提交
2061
  SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType);
D
dapan1121 已提交
2062

D
dapan1121 已提交
2063
  SSchTrans trans = {.transInst = pJob->transport, .transHandle = SCH_GET_TASK_HANDLE(pTask)};
L
Liu Jicong 已提交
2064 2065
  SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle,
                               (rpcCtx.args ? &rpcCtx : NULL)));
D
dapan1121 已提交
2066

D
dapan1121 已提交
2067 2068
  if (msgType == TDMT_VND_QUERY) {
    SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.transHandle));
D
dapan1121 已提交
2069
  }
L
Liu Jicong 已提交
2070

D
dapan1121 已提交
2071 2072 2073 2074
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
2075
  SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan1121 已提交
2076
  schFreeRpcCtx(&rpcCtx);
L
Liu Jicong 已提交
2077

wafwerar's avatar
wafwerar 已提交
2078
  taosMemoryFreeClear(msg);
D
dapan1121 已提交
2079 2080
  SCH_RET(code);
}
D
dapan1121 已提交
2081

D
dapan1121 已提交
2082 2083
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
L
Liu Jicong 已提交
2084
  SQueryNodeEpId  epId = {0};
D
dapan1121 已提交
2085 2086 2087

  epId.nodeId = addr->nodeId;
  memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
L
Liu Jicong 已提交
2088

D
dapan1121 已提交
2089
#if 1
D
dapan1121 已提交
2090 2091
  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
D
dapan1121 已提交
2092
    bool exist = false;
D
dapan1121 已提交
2093
    SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist));
D
dapan1121 已提交
2094
    if (!exist) {
D
dapan1121 已提交
2095
      SCH_ERR_RET(schBuildAndSendHbMsg(&epId));
D
dapan1121 已提交
2096
    }
D
dapan1121 已提交
2097
  }
D
dapan1121 已提交
2098
#endif
D
dapan1121 已提交
2099 2100 2101

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
2102

D
dapan1121 已提交
2103
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
2104
  int8_t  status = 0;
D
dapan1121 已提交
2105
  int32_t code = 0;
D
dapan1121 已提交
2106 2107

  atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
L
Liu Jicong 已提交
2108

D
dapan1121 已提交
2109
  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
2110
    SCH_TASK_DLOG("no need to launch task cause of job status, job status:%s", jobTaskStatusStr(status));
L
Liu Jicong 已提交
2111

D
dapan1121 已提交
2112
    SCH_RET(atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
2113
  }
D
dapan1121 已提交
2114 2115 2116 2117 2118 2119

  // NOTE: race condition: the task should be put into the hash table before send msg to server
  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) {
    SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING);
  }
L
Liu Jicong 已提交
2120

D
dapan1121 已提交
2121
  SSubplan *plan = pTask->plan;
D
dapan1121 已提交
2122

L
Liu Jicong 已提交
2123
  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
D
dapan1121 已提交
2124
    code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen);
D
dapan1121 已提交
2125
    if (TSDB_CODE_SUCCESS != code) {
L
Liu Jicong 已提交
2126 2127
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
                    pTask->msgLen);
D
dapan1121 已提交
2128
      SCH_ERR_RET(code);
2129
    } else {
D
dapan1121 已提交
2130
      SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg);
D
dapan1121 已提交
2131
    }
D
dapan1121 已提交
2132
  }
L
Liu Jicong 已提交
2133

D
dapan1121 已提交
2134
  SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
D
dapan1121 已提交
2135

D
dapan1121 已提交
2136 2137 2138
  if (SCH_IS_QUERY_JOB(pJob)) {
    SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
  }
L
Liu Jicong 已提交
2139

D
dapan1121 已提交
2140
  SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
L
Liu Jicong 已提交
2141

D
dapan1121 已提交
2142 2143 2144 2145 2146
  return TSDB_CODE_SUCCESS;
}

// Note: no more error processing, handled in function internal
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
2147
  bool    enough = false;
D
dapan1121 已提交
2148 2149
  int32_t code = 0;

D
dapan1121 已提交
2150 2151
  SCH_SET_TASK_HANDLE(pTask, NULL);

D
dapan1121 已提交
2152 2153 2154 2155 2156 2157 2158 2159 2160 2161
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));

    if (enough) {
      SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
    }
  } else {
    SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
  }

D
dapan1121 已提交
2162
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2163 2164

_return:
D
dapan1121 已提交
2165

D
dapan1121 已提交
2166
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
2167
}
D
dapan1121 已提交
2168

D
dapan1121 已提交
2169
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
D
dapan 已提交
2170
  for (int32_t i = 0; i < level->taskNum; ++i) {
D
dapan1121 已提交
2171
    SSchTask *pTask = taosArrayGet(level->subTasks, i);
D
dapan1121 已提交
2172

D
dapan1121 已提交
2173
    SCH_ERR_RET(schLaunchTask(pJob, pTask));
D
dapan1121 已提交
2174
  }
D
dapan1121 已提交
2175 2176 2177 2178 2179 2180

  return TSDB_CODE_SUCCESS;
}

int32_t schLaunchJob(SSchJob *pJob) {
  SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
L
Liu Jicong 已提交
2181

D
dapan1121 已提交
2182 2183 2184 2185 2186 2187
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_EXECUTING));

  SCH_ERR_RET(schCheckJobNeedFlowCtrl(pJob, level));

  SCH_ERR_RET(schLaunchLevelTasks(pJob, level));

D
dapan1121 已提交
2188
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2189 2190
}

D
dapan1121 已提交
2191
void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
2192
  if (NULL == pTask->execNodes) {
D
dapan1121 已提交
2193
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
2194 2195
    return;
  }
H
Haojun Liao 已提交
2196

D
dapan1121 已提交
2197
  int32_t size = (int32_t)taosArrayGetSize(pTask->execNodes);
L
Liu Jicong 已提交
2198

D
dapan1121 已提交
2199
  if (size <= 0) {
D
dapan1121 已提交
2200
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
2201 2202
    return;
  }
H
Haojun Liao 已提交
2203

D
dapan1121 已提交
2204
  SSchNodeInfo *nodeInfo = NULL;
D
dapan1121 已提交
2205
  for (int32_t i = 0; i < size; ++i) {
D
dapan1121 已提交
2206 2207
    nodeInfo = (SSchNodeInfo *)taosArrayGet(pTask->execNodes, i);
    SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
D
dapan1121 已提交
2208

D
dapan1121 已提交
2209
    schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK);
D
dapan1121 已提交
2210
  }
D
dapan1121 已提交
2211 2212

  SCH_TASK_DLOG("task has %d exec address", size);
D
dapan1121 已提交
2213 2214 2215
}

void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
D
dapan1121 已提交
2216 2217 2218 2219
  if (!SCH_IS_NEED_DROP_JOB(pJob)) {
    return;
  }

D
dapan1121 已提交
2220
  void *pIter = taosHashIterate(list, NULL);
D
dapan1121 已提交
2221
  while (pIter) {
D
dapan1121 已提交
2222
    SSchTask *pTask = *(SSchTask **)pIter;
H
Haojun Liao 已提交
2223

D
dapan1121 已提交
2224
    schDropTaskOnExecutedNode(pJob, pTask);
L
Liu Jicong 已提交
2225

D
dapan1121 已提交
2226
    pIter = taosHashIterate(list, pIter);
L
Liu Jicong 已提交
2227
  }
D
dapan1121 已提交
2228
}
H
Haojun Liao 已提交
2229

D
dapan1121 已提交
2230 2231 2232 2233
void schDropJobAllTasks(SSchJob *pJob) {
  schDropTaskInHashList(pJob, pJob->execTasks);
  schDropTaskInHashList(pJob, pJob->succTasks);
  schDropTaskInHashList(pJob, pJob->failTasks);
D
dapan1121 已提交
2234
}
2235

D
dapan1121 已提交
2236
int32_t schCancelJob(SSchJob *pJob) {
L
Liu Jicong 已提交
2237
  // TODO
2238
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
2239
  // TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
D
dapan1121 已提交
2240 2241 2242 2243 2244 2245 2246 2247 2248
}

void schFreeJobImpl(void *job) {
  if (NULL == job) {
    return;
  }

  SSchJob *pJob = job;
  uint64_t queryId = pJob->queryId;
L
Liu Jicong 已提交
2249
  int64_t  refId = pJob->refId;
D
dapan1121 已提交
2250 2251 2252 2253 2254 2255 2256

  if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
    schCancelJob(pJob);
  }

  schDropJobAllTasks(pJob);

L
Liu Jicong 已提交
2257 2258
  pJob->subPlans = NULL;  // it is a reference to pDag->pSubplans

D
dapan1121 已提交
2259
  int32_t numOfLevels = taosArrayGetSize(pJob->levels);
L
Liu Jicong 已提交
2260
  for (int32_t i = 0; i < numOfLevels; ++i) {
D
dapan1121 已提交
2261 2262
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);

D
dapan1121 已提交
2263
    schFreeFlowCtrl(pLevel);
L
Liu Jicong 已提交
2264

D
dapan1121 已提交
2265
    int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
L
Liu Jicong 已提交
2266 2267
    for (int32_t j = 0; j < numOfTasks; ++j) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
D
dapan1121 已提交
2268 2269 2270 2271 2272
      schFreeTask(pTask);
    }

    taosArrayDestroy(pLevel->subTasks);
  }
L
Liu Jicong 已提交
2273

D
dapan1121 已提交
2274 2275 2276
  taosHashCleanup(pJob->execTasks);
  taosHashCleanup(pJob->failTasks);
  taosHashCleanup(pJob->succTasks);
L
Liu Jicong 已提交
2277

D
dapan1121 已提交
2278 2279
  taosArrayDestroy(pJob->levels);
  taosArrayDestroy(pJob->nodeList);
L
Liu Jicong 已提交
2280

D
dapan1121 已提交
2281 2282
  qExplainFreeCtx(pJob->explainCtx);

wafwerar's avatar
wafwerar 已提交
2283 2284
  taosMemoryFreeClear(pJob->resData);
  taosMemoryFreeClear(pJob);
D
dapan1121 已提交
2285

L
Liu Jicong 已提交
2286
  qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob);
D
dapan1121 已提交
2287 2288
}

L
Liu Jicong 已提交
2289
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
D
dapan1121 已提交
2290
                              int64_t startTs, bool syncSchedule) {
L
Liu Jicong 已提交
2291
  qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
H
Haojun Liao 已提交
2292

D
dapan1121 已提交
2293
  if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) {
L
Liu Jicong 已提交
2294
    qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pDag->queryId);
D
dapan1121 已提交
2295 2296
  }

L
Liu Jicong 已提交
2297
  int32_t  code = 0;
D
dapan1121 已提交
2298 2299
  SSchJob *pJob = NULL;
  SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, syncSchedule));
D
dapan1121 已提交
2300

D
dapan1121 已提交
2301
  SCH_ERR_JRET(schLaunchJob(pJob));
2302

D
dapan1121 已提交
2303
  *job = pJob->refId;
L
Liu Jicong 已提交
2304

D
dapan 已提交
2305
  if (syncSchedule) {
D
dapan1121 已提交
2306
    SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2307
    tsem_wait(&pJob->rspSem);
D
dapan1121 已提交
2308 2309
  }

D
dapan1121 已提交
2310
  SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2311

D
dapan1121 已提交
2312
  schReleaseJob(pJob->refId);
L
Liu Jicong 已提交
2313

D
dapan1121 已提交
2314
  return TSDB_CODE_SUCCESS;
2315

D
dapan1121 已提交
2316
_return:
D
dapan1121 已提交
2317

D
dapan1121 已提交
2318
  schFreeJobImpl(pJob);
D
dapan1121 已提交
2319
  SCH_RET(code);
2320
}
D
dapan1121 已提交
2321

D
dapan1121 已提交
2322
int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
H
Hongze Cheng 已提交
2323
                             bool syncSchedule) {
D
dapan1121 已提交
2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334
  qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);

  int32_t  code = 0;
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
    qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->sql = sql;
  pJob->attr.queryJob = true;
D
dapan1121 已提交
2335
  pJob->attr.explainMode = pDag->explainInfo.mode;
D
dapan1121 已提交
2336 2337
  pJob->queryId = pDag->queryId;
  pJob->subPlans = pDag->pSubplans;
D
dapan1121 已提交
2338

D
dapan1121 已提交
2339
  SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData));
D
dapan1121 已提交
2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369

  int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
  if (refId < 0) {
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
    SCH_ERR_JRET(terrno);
  }

  if (NULL == schAcquireJob(refId)) {
    SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
    SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  pJob->refId = refId;

  SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);

  pJob->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
  *job = pJob->refId;
  SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));

  schReleaseJob(pJob->refId);

  return TSDB_CODE_SUCCESS;

_return:

  schFreeJobImpl(pJob);
  SCH_RET(code);
}

D
dapan1121 已提交
2370
int32_t schedulerInit(SSchedulerCfg *cfg) {
D
dapan1121 已提交
2371
  if (schMgmt.jobRef) {
D
dapan1121 已提交
2372 2373 2374 2375 2376 2377
    qError("scheduler already initialized");
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  if (cfg) {
    schMgmt.cfg = *cfg;
L
Liu Jicong 已提交
2378

D
dapan1121 已提交
2379
    if (schMgmt.cfg.maxJobNum == 0) {
D
dapan1121 已提交
2380
      schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
D
dapan1121 已提交
2381
    }
D
dapan1121 已提交
2382 2383 2384
    if (schMgmt.cfg.maxNodeTableNum <= 0) {
      schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
    }
D
dapan1121 已提交
2385
  } else {
D
dapan1121 已提交
2386 2387
    schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
    schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
D
dapan1121 已提交
2388
  }
L
Liu Jicong 已提交
2389

D
dapan1121 已提交
2390 2391
  schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl);
  if (schMgmt.jobRef < 0) {
D
dapan1121 已提交
2392 2393 2394 2395 2396 2397 2398
    qError("init schduler jobRef failed, num:%u", schMgmt.cfg.maxJobNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  schMgmt.hbConnections = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
  if (NULL == schMgmt.hbConnections) {
    qError("taosHashInit hb connections failed");
D
dapan1121 已提交
2399 2400 2401
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
2402
  if (taosGetSystemUUID((char *)&schMgmt.sId, sizeof(schMgmt.sId))) {
D
dapan1121 已提交
2403 2404 2405 2406
    qError("generate schdulerId failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR);
  }

L
Liu Jicong 已提交
2407 2408
  qInfo("scheduler %" PRIx64 " initizlized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum);

D
dapan1121 已提交
2409 2410 2411
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2412
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
D
dapan1121 已提交
2413
                         int64_t startTs, SQueryResult *pRes) {
H
Haojun Liao 已提交
2414
  if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
D
dapan1121 已提交
2415 2416 2417
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
2418
  if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
D
dapan1121 已提交
2419
    SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true));
D
dapan1121 已提交
2420
  } else {
D
dapan1121 已提交
2421
    SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true));
D
dapan1121 已提交
2422
  }
D
dapan1121 已提交
2423

D
dapan1121 已提交
2424
  SSchJob *job = schAcquireJob(*pJob);
D
dapan1121 已提交
2425

D
dapan1121 已提交
2426 2427
  pRes->code = atomic_load_32(&job->errCode);
  pRes->numOfRows = job->resNumOfRows;
L
Liu Jicong 已提交
2428

D
dapan1121 已提交
2429
  schReleaseJob(*pJob);
L
Liu Jicong 已提交
2430

D
dapan1121 已提交
2431 2432 2433
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2434
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, const char *sql, int64_t *pJob) {
2435
  if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
D
dapan1121 已提交
2436 2437 2438
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
2439 2440 2441 2442 2443
  if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
    SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false));
  } else {
    SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false));
  }
L
Liu Jicong 已提交
2444

D
dapan1121 已提交
2445
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2446 2447
}

L
Liu Jicong 已提交
2448
#if 0
X
Xiaoyu Wang 已提交
2449
int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks) {
X
Xiaoyu Wang 已提交
2450
  if (NULL == pDag || pDag->numOfSubplans <= 0 || LIST_LENGTH(pDag->pSubplans) == 0) {
D
dapan1121 已提交
2451 2452 2453
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

X
Xiaoyu Wang 已提交
2454
  int32_t levelNum = LIST_LENGTH(pDag->pSubplans);
D
dapan1121 已提交
2455 2456 2457 2458 2459
  if (1 != levelNum) {
    qError("invalid level num: %d", levelNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

X
Xiaoyu Wang 已提交
2460 2461
  SNodeListNode *plans = (SNodeListNode*)nodesListGetNode(pDag->pSubplans, 0);
  int32_t taskNum = LIST_LENGTH(plans->pNodeList);
D
dapan1121 已提交
2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478
  if (taskNum <= 0) {
    qError("invalid task num: %d", taskNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SArray *info = taosArrayInit(taskNum, sizeof(STaskInfo));
  if (NULL == info) {
    qError("taosArrayInit %d taskInfo failed", taskNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  STaskInfo tInfo = {0};
  char *msg = NULL;
  int32_t msgLen = 0;
  int32_t code = 0;
  
  for (int32_t i = 0; i < taskNum; ++i) {
X
Xiaoyu Wang 已提交
2479
    SSubplan *plan = (SSubplan*)nodesListGetNode(plans->pNodeList, i);
D
dapan1121 已提交
2480 2481 2482
    tInfo.addr = plan->execNode;

    code = qSubPlanToString(plan, &msg, &msgLen);
D
dapan1121 已提交
2483
    if (TSDB_CODE_SUCCESS != code) {
D
dapan1121 已提交
2484 2485 2486 2487 2488 2489 2490 2491 2492 2493
      qError("subplanToString error, code:%x, msg:%p, len:%d", code, msg, msgLen);
      SCH_ERR_JRET(code);
    }

    int32_t msgSize = sizeof(SSubQueryMsg) + msgLen;
    if (NULL == msg) {
      qError("calloc %d failed", msgSize);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
    
wafwerar's avatar
wafwerar 已提交
2494
    SSubQueryMsg* pMsg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2495
    
L
Liu Jicong 已提交
2496
    pMsg->header.vgId = tInfo.addr.nodeId;
D
dapan1121 已提交
2497
    
2498 2499 2500
    pMsg->sId      = schMgmt.sId;
    pMsg->queryId  = plan->id.queryId;
    pMsg->taskId   = schGenUUID();
D
dapan1121 已提交
2501
    pMsg->taskType = TASK_TYPE_PERSISTENT;
2502 2503
    pMsg->phyLen   = msgLen;
    pMsg->sqlLen   = 0;
L
Liu Jicong 已提交
2504
    memcpy(pMsg->msg, msg, msgLen);
L
fix tq  
Liu Jicong 已提交
2505
    /*memcpy(pMsg->msg, ((SSubQueryMsg*)msg)->msg, msgLen);*/
D
dapan1121 已提交
2506 2507 2508 2509 2510

    tInfo.msg = pMsg;

    if (NULL == taosArrayPush(info, &tInfo)) {
      qError("taosArrayPush failed, idx:%d", i);
wafwerar's avatar
wafwerar 已提交
2511
      taosMemoryFree(msg);
D
dapan1121 已提交
2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

  *pTasks = info;
  info = NULL;
  
_return:
  schedulerFreeTaskList(info);
  SCH_RET(code);
}

D
dapan1121 已提交
2524 2525 2526 2527 2528 2529
int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) {
  if (NULL == src || NULL == dst || copyNum <= 0) {
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
H
Haojun Liao 已提交
2530

D
dapan1121 已提交
2531 2532 2533 2534 2535 2536
  *dst = taosArrayInit(copyNum, sizeof(STaskInfo));
  if (NULL == *dst) {
    qError("taosArrayInit %d taskInfo failed", copyNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

2537
  int32_t msgSize = src->msg->phyLen + sizeof(*src->msg);
D
dapan1121 已提交
2538 2539 2540
  STaskInfo info = {0};

  info.addr = src->addr;
H
Haojun Liao 已提交
2541

D
dapan1121 已提交
2542
  for (int32_t i = 0; i < copyNum; ++i) {
wafwerar's avatar
wafwerar 已提交
2543
    info.msg = taosMemoryMalloc(msgSize);
D
dapan1121 已提交
2544 2545 2546 2547 2548 2549 2550 2551
    if (NULL == info.msg) {
      qError("malloc %d failed", msgSize);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    memcpy(info.msg, src->msg, msgSize);

    info.msg->taskId = schGenUUID();
H
Haojun Liao 已提交
2552

D
dapan1121 已提交
2553 2554
    if (NULL == taosArrayPush(*dst, &info)) {
      qError("taosArrayPush failed, idx:%d", i);
wafwerar's avatar
wafwerar 已提交
2555
      taosMemoryFree(info.msg);
D
dapan1121 已提交
2556 2557 2558 2559 2560 2561 2562 2563 2564 2565
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

  return TSDB_CODE_SUCCESS;

_return:

  schedulerFreeTaskList(*dst);
  *dst = NULL;
H
Haojun Liao 已提交
2566

D
dapan1121 已提交
2567 2568
  SCH_RET(code);
}
L
Liu Jicong 已提交
2569
#endif
D
dapan1121 已提交
2570

L
Liu Jicong 已提交
2571
int32_t schedulerFetchRows(int64_t job, void **pData) {
D
dapan1121 已提交
2572
  if (NULL == pData) {
D
dapan1121 已提交
2573
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan 已提交
2574 2575
  }

L
Liu Jicong 已提交
2576
  int32_t  code = 0;
D
dapan1121 已提交
2577
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2578 2579 2580 2581
  if (NULL == pJob) {
    qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
D
dapan1121 已提交
2582

D
dapan1121 已提交
2583 2584
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (status == JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
2585
    SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2586
    schReleaseJob(job);
D
dapan1121 已提交
2587
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
2588 2589
  }

D
dapan1121 已提交
2590
  if (!SCH_JOB_NEED_FETCH(pJob)) {
D
dapan1121 已提交
2591
    SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2592
    schReleaseJob(job);
D
dapan1121 已提交
2593
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
2594 2595
  }

D
dapan1121 已提交
2596 2597
  if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) {
    SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch));
D
dapan1121 已提交
2598
    schReleaseJob(job);
D
dapan1121 已提交
2599
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan 已提交
2600 2601
  }

D
dapan1121 已提交
2602
  if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
D
dapan1121 已提交
2603
    SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2604 2605
    SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
  } else if (status == JOB_TASK_STATUS_SUCCEED) {
D
dapan1121 已提交
2606
    SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2607 2608
    goto _return;
  } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
D
dapan1121 已提交
2609
    if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
D
dapan1121 已提交
2610 2611
      SCH_ERR_JRET(schFetchFromRemote(pJob));
      tsem_wait(&pJob->rspSem);
H
Hongze Cheng 已提交
2612
    }
D
dapan1121 已提交
2613 2614 2615
  } else {
    SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
    SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan 已提交
2616 2617
  }

D
dapan1121 已提交
2618
  status = SCH_GET_JOB_STATUS(pJob);
D
dapan 已提交
2619

D
dapan1121 已提交
2620
  if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
D
dapan1121 已提交
2621
    SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2622
    SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
D
dapan 已提交
2623
  }
L
Liu Jicong 已提交
2624

D
dapan1121 已提交
2625
  if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) {
D
dapan1121 已提交
2626
    SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED));
D
dapan 已提交
2627 2628
  }

D
dapan1121 已提交
2629
  while (true) {
D
dapan1121 已提交
2630 2631
    *pData = atomic_load_ptr(&pJob->resData);
    if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) {
D
dapan1121 已提交
2632 2633 2634 2635 2636
      continue;
    }

    break;
  }
D
dapan 已提交
2637

D
dapan1121 已提交
2638
  if (NULL == *pData) {
wafwerar's avatar
wafwerar 已提交
2639
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
D
dapan1121 已提交
2640 2641 2642 2643 2644
    if (rsp) {
      rsp->completed = 1;
    }

    *pData = rsp;
D
dapan1121 已提交
2645
    SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
D
dapan1121 已提交
2646
  }
D
dapan1121 已提交
2647

2648
  SCH_JOB_DLOG("fetch done, totalRows:%d, code:%s", pJob->resNumOfRows, tstrerror(code));
D
dapan1121 已提交
2649 2650 2651 2652

_return:

  atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
L
Liu Jicong 已提交
2653

D
dapan1121 已提交
2654
  schReleaseJob(job);
D
dapan 已提交
2655

D
dapan1121 已提交
2656
  SCH_RET(code);
D
dapan 已提交
2657
}
D
dapan1121 已提交
2658

D
dapan1121 已提交
2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
  int32_t  code = 0;
  SSchJob *pJob = schAcquireJob(job);
  if (NULL == pJob) {
    qDebug("acquire job from jobRef list failed, may not started or dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  if (pJob->status < JOB_TASK_STATUS_NOT_START || pJob->levelNum <= 0 || NULL == pJob->levels) {
    qDebug("job not initialized or not executable job, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  for (int32_t i = pJob->levelNum - 1; i >= 0; --i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
H
Hongze Cheng 已提交
2674

D
dapan1121 已提交
2675
    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
H
Hongze Cheng 已提交
2676
      SSchTask     *pTask = taosArrayGet(pLevel->subTasks, m);
D
dapan1121 已提交
2677
      SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status};
H
Hongze Cheng 已提交
2678

D
dapan1121 已提交
2679 2680 2681 2682 2683 2684 2685
      taosArrayPush(pSub, &subDesc);
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2686
int32_t scheduleCancelJob(int64_t job) {
D
dapan1121 已提交
2687
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2688 2689 2690 2691
  if (NULL == pJob) {
    qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
D
dapan1121 已提交
2692

D
dapan1121 已提交
2693 2694
  int32_t code = schCancelJob(pJob);

D
dapan1121 已提交
2695
  schReleaseJob(job);
D
dapan1121 已提交
2696 2697

  SCH_RET(code);
D
dapan1121 已提交
2698 2699
}

D
dapan1121 已提交
2700
void schedulerFreeJob(int64_t job) {
D
dapan1121 已提交
2701
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2702
  if (NULL == pJob) {
D
dapan1121 已提交
2703
    qDebug("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
D
dapan 已提交
2704 2705
    return;
  }
D
dapan1121 已提交
2706

D
dapan1121 已提交
2707 2708
  if (atomic_load_8(&pJob->userFetch) > 0) {
    schProcessOnJobDropped(pJob, TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
2709
  }
D
dapan1121 已提交
2710

D
dapan1121 已提交
2711
  SCH_JOB_DLOG("start to remove job from jobRef list, refId:%" PRIx64, job);
2712

D
dapan1121 已提交
2713 2714
  if (taosRemoveRef(schMgmt.jobRef, job)) {
    SCH_JOB_ELOG("remove job from job list failed, refId:%" PRIx64, job);
2715
  }
D
dapan1121 已提交
2716 2717

  schReleaseJob(job);
D
dapan1121 已提交
2718
}
D
dapan1121 已提交
2719 2720 2721 2722 2723 2724 2725 2726 2727

void schedulerFreeTaskList(SArray *taskList) {
  if (NULL == taskList) {
    return;
  }

  int32_t taskNum = taosArrayGetSize(taskList);
  for (int32_t i = 0; i < taskNum; ++i) {
    STaskInfo *info = taosArrayGet(taskList, i);
wafwerar's avatar
wafwerar 已提交
2728
    taosMemoryFreeClear(info->msg);
D
dapan1121 已提交
2729 2730 2731 2732
  }

  taosArrayDestroy(taskList);
}
L
Liu Jicong 已提交
2733

D
dapan1121 已提交
2734
void schedulerDestroy(void) {
D
dapan1121 已提交
2735 2736
  if (schMgmt.jobRef) {
    SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0);
D
dapan1121 已提交
2737
    int64_t refId = 0;
C
Cary Xu 已提交
2738

D
dapan1121 已提交
2739
    while (pJob) {
D
dapan1121 已提交
2740
      refId = pJob->refId;
C
Cary Xu 已提交
2741 2742 2743
      if (refId == 0) {
        break;
      }
D
dapan1121 已提交
2744
      taosRemoveRef(schMgmt.jobRef, pJob->refId);
L
Liu Jicong 已提交
2745

D
dapan1121 已提交
2746
      pJob = taosIterateRef(schMgmt.jobRef, refId);
D
dapan1121 已提交
2747
    }
L
Liu Jicong 已提交
2748

D
dapan1121 已提交
2749 2750
    taosCloseRef(schMgmt.jobRef);
    schMgmt.jobRef = 0;
D
dapan1121 已提交
2751 2752
  }
}