scheduler.c 85.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15
 */

L
Liu Jicong 已提交
16
#include "catalog.h"
H
Hongze Cheng 已提交
17
#include "command.h"
L
Liu Jicong 已提交
18
#include "query.h"
D
dapan1121 已提交
19
#include "schedulerInt.h"
H
Hongze Cheng 已提交
20
#include "tmsg.h"
D
dapan1121 已提交
21
#include "tref.h"
D
dapan1121 已提交
22
#include "trpc.h"
23

D
dapan1121 已提交
24
SSchedulerMgmt schMgmt = {
25
    .jobRef = -1,
D
dapan1121 已提交
26
};
D
dapan1121 已提交
27

L
Liu Jicong 已提交
28
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); }
D
dapan1121 已提交
29

L
Liu Jicong 已提交
30
FORCE_INLINE int32_t schReleaseJob(int64_t refId) { return taosReleaseRef(schMgmt.jobRef, refId); }
D
dapan1121 已提交
31

L
Liu Jicong 已提交
32
uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }
D
dapan1121 已提交
33

L
Liu Jicong 已提交
34
#if 0
D
dapan1121 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
uint64_t schGenUUID(void) {
  static uint64_t hashId = 0;
  static int32_t requestSerialId = 0;

  if (hashId == 0) {
    char    uid[64];
    int32_t code = taosGetSystemUUID(uid, tListLen(uid));
    if (code != TSDB_CODE_SUCCESS) {
      qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    } else {
      hashId = MurmurHash3_32(uid, strlen(uid));
    }
  }

  int64_t ts      = taosGetTimestampMs();
  uint64_t pid    = taosGetPId();
  int32_t val     = atomic_add_fetch_32(&requestSerialId, 1);

  uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
  return id;
}
L
Liu Jicong 已提交
56
#endif
D
dapan1121 已提交
57

L
Liu Jicong 已提交
58 59 60
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
  pTask->plan = pPlan;
  pTask->level = pLevel;
D
dapan1121 已提交
61
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
D
dapan1121 已提交
62
  pTask->taskId = schGenTaskId();
D
dapan1121 已提交
63 64 65
  pTask->execNodes = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SSchNodeInfo));
  if (NULL == pTask->execNodes) {
    SCH_TASK_ELOG("taosArrayInit %d execNodes failed", SCH_MAX_CANDIDATE_EP_NUM);
D
dapan1121 已提交
66 67 68 69 70 71
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
72
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql,
H
Hongze Cheng 已提交
73 74
                   int64_t startTs, bool syncSchedule) {
  int32_t  code = 0;
75
  int64_t  refId = -1;
D
dapan1121 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
    qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->attr.explainMode = pDag->explainInfo.mode;
  pJob->attr.syncSchedule = syncSchedule;
  pJob->transport = transport;
  pJob->sql = sql;

  if (pNodeList != NULL) {
    pJob->nodeList = taosArrayDup(pNodeList);
  }

  SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_JRET(qExecExplainBegin(pDag, &pJob->explainCtx, startTs));
  }

  pJob->execTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->execTasks) {
    SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->succTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->succTasks) {
    SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->failTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->failTasks) {
    SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  tsem_init(&pJob->rspSem, 0, 0);

D
dapan1121 已提交
120
  refId = taosAddRef(schMgmt.jobRef, pJob);
D
dapan1121 已提交
121 122 123 124 125
  if (refId < 0) {
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
    SCH_ERR_JRET(terrno);
  }

D
dapan1121 已提交
126
  atomic_add_fetch_32(&schMgmt.jobNum, 1);
127

D
dapan1121 已提交
128 129
  if (NULL == schAcquireJob(refId)) {
    SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
D
dapan1121 已提交
130
    SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143 144
  }

  pJob->refId = refId;

  SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);

  pJob->status = JOB_TASK_STATUS_NOT_START;

  *pSchJob = pJob;

  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
145 146 147 148 149
  if (refId < 0) {
    schFreeJobImpl(pJob);
  } else {
    taosRemoveRef(schMgmt.jobRef, refId);
  }
D
dapan1121 已提交
150 151 152
  SCH_RET(code);
}

D
dapan1121 已提交
153 154 155 156 157 158 159 160
void schFreeRpcCtx(SRpcCtx *pCtx) {
  if (NULL == pCtx) {
    return;
  }
  void *pIter = taosHashIterate(pCtx->args, NULL);
  while (pIter) {
    SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter;

D
dapan1121 已提交
161
    (*ctxVal->freeFunc)(ctxVal->val);
L
Liu Jicong 已提交
162

D
dapan1121 已提交
163 164
    pIter = taosHashIterate(pCtx->args, pIter);
  }
L
Liu Jicong 已提交
165

D
dapan1121 已提交
166
  taosHashCleanup(pCtx->args);
D
dapan1121 已提交
167

D
dapan1121 已提交
168 169
  if (pCtx->brokenVal.freeFunc) {
    (*pCtx->brokenVal.freeFunc)(pCtx->brokenVal.val);
D
dapan1121 已提交
170
  }
D
dapan1121 已提交
171 172
}

L
Liu Jicong 已提交
173
void schFreeTask(SSchTask *pTask) {
D
dapan1121 已提交
174 175 176 177
  if (pTask->candidateAddrs) {
    taosArrayDestroy(pTask->candidateAddrs);
  }

wafwerar's avatar
wafwerar 已提交
178
  taosMemoryFreeClear(pTask->msg);
D
dapan1121 已提交
179 180 181 182 183 184 185 186

  if (pTask->children) {
    taosArrayDestroy(pTask->children);
  }

  if (pTask->parents) {
    taosArrayDestroy(pTask->parents);
  }
H
Haojun Liao 已提交
187

D
dapan1121 已提交
188 189
  if (pTask->execNodes) {
    taosArrayDestroy(pTask->execNodes);
H
Haojun Liao 已提交
190
  }
D
dapan1121 已提交
191 192
}

D
dapan1121 已提交
193 194 195 196 197 198
static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (pStatus) {
    *pStatus = status;
  }

L
Liu Jicong 已提交
199 200 201
  return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED ||
          status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING ||
          status == JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
202 203
}

D
dapan1121 已提交
204
int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
D
dapan1121 已提交
205
  int32_t lastMsgType = SCH_GET_TASK_LASTMSG_TYPE(pTask);
D
dapan1121 已提交
206
  int32_t taskStatus = SCH_GET_TASK_STATUS(pTask);
D
dapan1121 已提交
207
  int32_t reqMsgType = msgType - 1;
D
dapan1121 已提交
208
  switch (msgType) {
D
dapan1121 已提交
209
    case TDMT_SCH_LINK_BROKEN:
D
dapan1121 已提交
210
    case TDMT_VND_EXPLAIN_RSP:
D
dapan1121 已提交
211
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
212
    case TDMT_VND_QUERY_RSP:  // query_rsp may be processed later than ready_rsp
L
Liu Jicong 已提交
213 214 215
      if (lastMsgType != reqMsgType && -1 != lastMsgType && TDMT_VND_FETCH != lastMsgType) {
        SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
216
      }
L
Liu Jicong 已提交
217

D
dapan1121 已提交
218
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
219 220
        SCH_TASK_DLOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
221
      }
L
Liu Jicong 已提交
222

D
dapan1121 已提交
223
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan1121 已提交
224
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
225 226
    case TDMT_VND_RES_READY_RSP:
      reqMsgType = TDMT_VND_QUERY;
D
dapan1121 已提交
227
      if (lastMsgType != reqMsgType && -1 != lastMsgType) {
L
Liu Jicong 已提交
228 229
        SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s",
                      (lastMsgType > 0 ? TMSG_INFO(lastMsgType) : "null"), TMSG_INFO(msgType));
D
dapan1121 已提交
230 231
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
232

D
dapan1121 已提交
233
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
234 235
        SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
236 237 238
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }

D
dapan1121 已提交
239 240 241
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
      return TSDB_CODE_SUCCESS;
    case TDMT_VND_FETCH_RSP:
L
Liu Jicong 已提交
242 243 244
      if (lastMsgType != reqMsgType && -1 != lastMsgType) {
        SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
245 246
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
247

D
dapan1121 已提交
248
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
249 250
        SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
251 252
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
253

D
dapan1121 已提交
254 255
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
256
    case TDMT_VND_CREATE_TABLE_RSP:
X
Xiaoyu Wang 已提交
257
    case TDMT_VND_DROP_TABLE_RSP:
D
dapan1121 已提交
258
    case TDMT_VND_SUBMIT_RSP:
D
dapan1121 已提交
259 260
      break;
    default:
D
dapan1121 已提交
261
      SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus));
D
dapan1121 已提交
262 263 264
      SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
265
  if (lastMsgType != reqMsgType) {
L
Liu Jicong 已提交
266 267
    SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                  TMSG_INFO(msgType));
D
dapan1121 已提交
268 269
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
L
Liu Jicong 已提交
270

D
dapan1121 已提交
271
  if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
272 273
    SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                  TMSG_INFO(msgType));
D
dapan1121 已提交
274 275 276
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

D
dapan1121 已提交
277
  SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan 已提交
278

D
dapan1121 已提交
279 280 281 282
  return TSDB_CODE_SUCCESS;
}

int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
D
dapan 已提交
283 284
  int32_t code = 0;

D
dapan1121 已提交
285
  int8_t oriStatus = 0;
D
dapan1121 已提交
286

D
dapan1121 已提交
287 288 289 290 291 292
  while (true) {
    oriStatus = SCH_GET_JOB_STATUS(pJob);

    if (oriStatus == newStatus) {
      SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
L
Liu Jicong 已提交
293

D
dapan1121 已提交
294 295 296 297 298
    switch (oriStatus) {
      case JOB_TASK_STATUS_NULL:
        if (newStatus != JOB_TASK_STATUS_NOT_START) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
299

D
dapan1121 已提交
300 301 302 303 304
        break;
      case JOB_TASK_STATUS_NOT_START:
        if (newStatus != JOB_TASK_STATUS_EXECUTING) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
305

D
dapan1121 已提交
306 307
        break;
      case JOB_TASK_STATUS_EXECUTING:
L
Liu Jicong 已提交
308 309 310
        if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED && newStatus != JOB_TASK_STATUS_FAILED &&
            newStatus != JOB_TASK_STATUS_CANCELLING && newStatus != JOB_TASK_STATUS_CANCELLED &&
            newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
311 312
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
313

D
dapan1121 已提交
314 315
        break;
      case JOB_TASK_STATUS_PARTIAL_SUCCEED:
L
Liu Jicong 已提交
316 317
        if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_SUCCEED &&
            newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
318 319
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
320

D
dapan1121 已提交
321 322 323 324 325 326 327
        break;
      case JOB_TASK_STATUS_SUCCEED:
      case JOB_TASK_STATUS_FAILED:
      case JOB_TASK_STATUS_CANCELLING:
        if (newStatus != JOB_TASK_STATUS_DROPPING) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
328

D
dapan1121 已提交
329 330 331
        break;
      case JOB_TASK_STATUS_CANCELLED:
      case JOB_TASK_STATUS_DROPPING:
D
dapan1121 已提交
332
        SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
333
        break;
L
Liu Jicong 已提交
334

D
dapan1121 已提交
335
      default:
D
dapan1121 已提交
336
        SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));
D
dapan 已提交
337
        SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
338 339 340 341 342
    }

    if (oriStatus != atomic_val_compare_exchange_8(&pJob->status, oriStatus, newStatus)) {
      continue;
    }
D
dapan 已提交
343

D
dapan1121 已提交
344
    SCH_JOB_DLOG("job status updated from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan1121 已提交
345

D
dapan1121 已提交
346 347
    break;
  }
D
dapan1121 已提交
348

D
dapan 已提交
349 350 351 352
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
353
  SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan 已提交
354
  SCH_ERR_RET(code);
355
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
356 357
}

D
dapan1121 已提交
358
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
D
dapan1121 已提交
359 360
  for (int32_t i = 0; i < pJob->levelNum; ++i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
L
Liu Jicong 已提交
361

D
dapan1121 已提交
362 363 364
    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
      SSubplan *pPlan = pTask->plan;
L
Liu Jicong 已提交
365 366
      int32_t   childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0;
      int32_t   parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0;
D
dapan1121 已提交
367 368

      if (childNum > 0) {
D
dapan1121 已提交
369 370 371 372
        if (pJob->levelIdx == pLevel->level) {
          SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
L
Liu Jicong 已提交
373

D
dapan1121 已提交
374 375 376
        pTask->children = taosArrayInit(childNum, POINTER_BYTES);
        if (NULL == pTask->children) {
          SCH_TASK_ELOG("taosArrayInit %d children failed", childNum);
D
dapan1121 已提交
377 378 379 380 381
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      for (int32_t n = 0; n < childNum; ++n) {
X
Xiaoyu Wang 已提交
382
        SSubplan  *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
X
Xiaoyu Wang 已提交
383
        SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
D
dapan 已提交
384
        if (NULL == childTask || NULL == *childTask) {
D
dapan1121 已提交
385
          SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
386 387 388
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan1121 已提交
389 390
        if (NULL == taosArrayPush(pTask->children, childTask)) {
          SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
391 392 393 394 395
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      if (parentNum > 0) {
D
dapan1121 已提交
396 397 398 399
        if (0 == pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
L
Liu Jicong 已提交
400

D
dapan1121 已提交
401 402 403
        pTask->parents = taosArrayInit(parentNum, POINTER_BYTES);
        if (NULL == pTask->parents) {
          SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
D
dapan1121 已提交
404 405
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
D
dapan1121 已提交
406 407 408 409 410
      } else {
        if (0 != pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:%d, parentNum:%d", pLevel->level, parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
D
dapan1121 已提交
411 412 413
      }

      for (int32_t n = 0; n < parentNum; ++n) {
X
Xiaoyu Wang 已提交
414
        SSubplan  *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
X
Xiaoyu Wang 已提交
415
        SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
D
dapan 已提交
416
        if (NULL == parentTask || NULL == *parentTask) {
D
dapan1121 已提交
417
          SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
418 419 420
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan1121 已提交
421 422
        if (NULL == taosArrayPush(pTask->parents, parentTask)) {
          SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
423 424
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
L
Liu Jicong 已提交
425
      }
D
dapan1121 已提交
426 427

      SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
D
dapan1121 已提交
428 429 430
    }
  }

D
dapan1121 已提交
431
  SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
D
dapan1121 已提交
432
  if (SCH_IS_QUERY_JOB(pJob) && pLevel->taskNum > 1) {
D
dapan1121 已提交
433
    SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
D
dapan 已提交
434 435 436
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

D
dapan1121 已提交
437 438 439
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
440
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
441
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
D
dapan1121 已提交
442
  if (NULL == addr) {
L
Liu Jicong 已提交
443 444
    SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
                  (int32_t)taosArrayGetSize(pTask->candidateAddrs));
D
dapan1121 已提交
445 446
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
D
dapan1121 已提交
447 448

  pTask->succeedAddr = *addr;
449

D
dapan1121 已提交
450
  return TSDB_CODE_SUCCESS;
451 452
}

D
dapan1121 已提交
453 454
int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle) {
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = handle};
L
Liu Jicong 已提交
455

D
dapan1121 已提交
456 457
  if (NULL == taosArrayPush(pTask->execNodes, &nodeInfo)) {
    SCH_TASK_ELOG("taosArrayPush nodeInfo to execNodes list failed, errno:%d", errno);
D
dapan1121 已提交
458 459 460
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
461 462
  SCH_TASK_DLOG("task execNode recorded, handle:%p", handle);

D
dapan1121 已提交
463
  return TSDB_CODE_SUCCESS;
464
}
D
dapan1121 已提交
465

X
Xiaoyu Wang 已提交
466
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
D
dapan1121 已提交
467
  int32_t code = 0;
D
dapan1121 已提交
468
  pJob->queryId = pDag->queryId;
L
Liu Jicong 已提交
469

D
dapan1121 已提交
470 471
  if (pDag->numOfSubplans <= 0) {
    SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
D
dapan 已提交
472 473
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
L
Liu Jicong 已提交
474

X
Xiaoyu Wang 已提交
475
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
476
  if (levelNum <= 0) {
D
dapan1121 已提交
477
    SCH_JOB_ELOG("invalid level num:%d", levelNum);
D
dapan1121 已提交
478
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
479 480
  }

L
Liu Jicong 已提交
481 482 483 484
  SHashObj *planToTask = taosHashInit(
      SCHEDULE_DEFAULT_MAX_TASK_NUM,
      taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
      HASH_NO_LOCK);
D
dapan1121 已提交
485
  if (NULL == planToTask) {
D
dapan1121 已提交
486
    SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
D
dapan1121 已提交
487 488
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
489

D
dapan1121 已提交
490 491
  pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
  if (NULL == pJob->levels) {
D
dapan1121 已提交
492
    SCH_JOB_ELOG("taosArrayInit %d failed", levelNum);
D
dapan1121 已提交
493
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
494 495
  }

D
dapan1121 已提交
496 497
  pJob->levelNum = levelNum;
  pJob->levelIdx = levelNum - 1;
498

D
dapan1121 已提交
499
  pJob->subPlans = pDag->pSubplans;
500

L
Liu Jicong 已提交
501
  SSchLevel      level = {0};
X
Xiaoyu Wang 已提交
502
  SNodeListNode *plans = NULL;
L
Liu Jicong 已提交
503
  int32_t        taskNum = 0;
X
Xiaoyu Wang 已提交
504
  SSchLevel     *pLevel = NULL;
505

D
dapan1121 已提交
506
  level.status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
507

508
  for (int32_t i = 0; i < levelNum; ++i) {
D
dapan1121 已提交
509
    if (NULL == taosArrayPush(pJob->levels, &level)) {
D
dapan1121 已提交
510
      SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i);
D
dapan1121 已提交
511 512 513
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

D
dapan1121 已提交
514
    pLevel = taosArrayGet(pJob->levels, i);
D
dapan1121 已提交
515
    pLevel->level = i;
L
Liu Jicong 已提交
516 517

    plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
D
dapan1121 已提交
518 519
    if (NULL == plans) {
      SCH_JOB_ELOG("empty level plan, level:%d", i);
D
dapan1121 已提交
520
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
521 522
    }

X
Xiaoyu Wang 已提交
523
    taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
D
dapan1121 已提交
524 525
    if (taskNum <= 0) {
      SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
D
dapan1121 已提交
526
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
527 528
    }

D
dapan1121 已提交
529
    pLevel->taskNum = taskNum;
L
Liu Jicong 已提交
530

D
dapan1121 已提交
531
    pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
D
dapan1121 已提交
532
    if (NULL == pLevel->subTasks) {
D
dapan1121 已提交
533
      SCH_JOB_ELOG("taosArrayInit %d failed", taskNum);
D
dapan1121 已提交
534
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
535
    }
L
Liu Jicong 已提交
536

D
dapan1121 已提交
537
    for (int32_t n = 0; n < taskNum; ++n) {
L
Liu Jicong 已提交
538
      SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
D
dapan 已提交
539

D
dapan1121 已提交
540
      SCH_SET_JOB_TYPE(pJob, plan->subplanType);
D
dapan1121 已提交
541 542 543

      SSchTask  task = {0};
      SSchTask *pTask = &task;
544

D
dapan1121 已提交
545
      SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel));
L
Liu Jicong 已提交
546

D
dapan1121 已提交
547
      void *p = taosArrayPush(pLevel->subTasks, &task);
D
dapan1121 已提交
548
      if (NULL == p) {
D
dapan1121 已提交
549
        SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
D
dapan1121 已提交
550 551
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
552

D
dapan1121 已提交
553
      if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
D
dapan1121 已提交
554
        SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
D
dapan1121 已提交
555 556
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
557 558

      ++pJob->taskNum;
D
dapan1121 已提交
559
    }
D
dapan1121 已提交
560

D
dapan1121 已提交
561
    SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum);
D
dapan1121 已提交
562
  }
D
dapan1121 已提交
563 564

  SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
D
dapan1121 已提交
565 566

_return:
D
dapan1121 已提交
567 568 569 570
  if (planToTask) {
    taosHashCleanup(planToTask);
  }

D
dapan1121 已提交
571
  SCH_RET(code);
572 573
}

D
dapan1121 已提交
574 575
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
  if (NULL != pTask->candidateAddrs) {
D
dapan 已提交
576 577 578
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
579
  pTask->candidateIdx = 0;
580
  pTask->candidateAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
D
dapan1121 已提交
581
  if (NULL == pTask->candidateAddrs) {
582
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CANDIDATE_EP_NUM);
D
dapan1121 已提交
583 584 585
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

L
Liu Jicong 已提交
586
  if (pTask->plan->execNode.epSet.numOfEps > 0) {
D
dapan1121 已提交
587 588
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
D
dapan1121 已提交
589 590 591
      SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

L
Liu Jicong 已提交
592
    SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
D
dapan1121 已提交
593

D
dapan1121 已提交
594 595 596 597
    return TSDB_CODE_SUCCESS;
  }

  int32_t addNum = 0;
D
dapan 已提交
598
  int32_t nodeNum = 0;
599
  if (pJob->nodeList) {
D
dapan 已提交
600
    nodeNum = taosArrayGetSize(pJob->nodeList);
L
Liu Jicong 已提交
601

602
    for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
D
dapan 已提交
603
      SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i);
L
Liu Jicong 已提交
604

605
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
D
dapan 已提交
606 607 608
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
609 610

      ++addNum;
D
dapan1121 已提交
611
    }
D
dapan1121 已提交
612 613
  }

D
dapan1121 已提交
614
  if (addNum <= 0) {
H
Haojun Liao 已提交
615
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
D
dapan1121 已提交
616
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
617 618
  }

L
Liu Jicong 已提交
619 620 621 622 623 624 625 626
  /*
    for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
      strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
      epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];

      ++epSet->numOfEps;
    }
  */
D
dapan 已提交
627 628

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
629
}
D
dapan1121 已提交
630

D
dapan1121 已提交
631
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
632 633 634
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
635
      SCH_TASK_ELOG("task already in execTask list, code:%x", code);
D
dapan1121 已提交
636 637
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }
L
Liu Jicong 已提交
638

D
dapan1121 已提交
639
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
D
dapan 已提交
640 641 642
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
643
  SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
644

D
dapan 已提交
645 646 647
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
648 649
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
650
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan 已提交
651 652
  } else {
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
D
dapan 已提交
653 654
  }

D
dapan1121 已提交
655 656 657 658
  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
D
dapan1121 已提交
659
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
660 661
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
662

D
dapan1121 已提交
663
    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno);
D
dapan1121 已提交
664 665 666 667
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
D
dapan1121 已提交
668 669

  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));
L
Liu Jicong 已提交
670

D
dapan1121 已提交
671 672 673
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
674 675
int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  *moved = false;
L
Liu Jicong 已提交
676

D
dapan1121 已提交
677
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
678
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
679 680
  }

D
dapan1121 已提交
681 682 683 684
  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
L
Liu Jicong 已提交
685

D
dapan1121 已提交
686
      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
687 688
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
689

D
dapan1121 已提交
690
    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno);
D
dapan 已提交
691 692 693 694
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
D
dapan1121 已提交
695 696

  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));
L
Liu Jicong 已提交
697

D
dapan 已提交
698 699 700
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
701 702
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
703
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
704 705 706 707 708 709
  }

  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
L
Liu Jicong 已提交
710

D
dapan1121 已提交
711
      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
712 713
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
714

D
dapan1121 已提交
715 716 717 718 719 720 721
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
L
Liu Jicong 已提交
722

D
dapan1121 已提交
723 724 725
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
726
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
D
dapan1121 已提交
727 728
  int8_t status = 0;
  ++pTask->tryTimes;
L
Liu Jicong 已提交
729

D
dapan1121 已提交
730 731 732 733 734
  if (schJobNeedToStop(pJob, &status)) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry cause of job status, job status:%s", jobTaskStatusStr(status));
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
735

D
dapan1121 已提交
736 737 738 739 740
  if (pTask->tryTimes >= REQUEST_MAX_TRY_TIMES) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry since reach max try times, tryTimes:%d", pTask->tryTimes);
    return TSDB_CODE_SUCCESS;
  }
L
Liu Jicong 已提交
741

D
dapan1121 已提交
742 743 744 745 746
  if (!NEED_SCHEDULER_RETRY_ERROR(errCode)) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
747

L
Liu Jicong 已提交
748
  // TODO CHECK epList/condidateList
D
dapan1121 已提交
749 750 751
  if (SCH_IS_DATA_SRC_TASK(pTask)) {
    if (pTask->tryTimes >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
      *needRetry = false;
L
Liu Jicong 已提交
752 753
      SCH_TASK_DLOG("task no more retry since all ep tried, tryTimes:%d, epNum:%d", pTask->tryTimes,
                    SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
D
dapan1121 已提交
754 755
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
756 757
  } else {
    int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
L
Liu Jicong 已提交
758

D
dapan1121 已提交
759
    if ((pTask->candidateIdx + 1) >= candidateNum) {
D
dapan1121 已提交
760
      *needRetry = false;
L
Liu Jicong 已提交
761 762
      SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
                    pTask->candidateIdx, candidateNum);
D
dapan1121 已提交
763 764 765 766
      return TSDB_CODE_SUCCESS;
    }
  }

D
dapan1121 已提交
767 768
  *needRetry = true;
  SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->tryTimes, errCode, tstrerror(errCode));
L
Liu Jicong 已提交
769

D
dapan1121 已提交
770
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
771 772 773 774 775 776 777
}

int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
  atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);

  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));
D
dapan1121 已提交
778
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
D
dapan1121 已提交
779 780
  }

D
dapan1121 已提交
781
  if (SCH_IS_DATA_SRC_TASK(pTask)) {
D
dapan1121 已提交
782 783 784 785 786 787 788 789
    SCH_SWITCH_EPSET(&pTask->plan->execNode);
  } else {
    ++pTask->candidateIdx;
  }

  SCH_ERR_RET(schLaunchTask(pJob, pTask));

  return TSDB_CODE_SUCCESS;
D
dapan 已提交
790 791
}

D
dapan1121 已提交
792
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
D
dapan1121 已提交
793 794 795 796 797 798
  int32_t      code = 0;
  SSchHbTrans *hb = NULL;

  hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
    qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
D
dapan1121 已提交
799
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
800 801
  }

D
dapan1121 已提交
802
  SCH_LOCK(SCH_WRITE, &hb->lock);
D
dapan1121 已提交
803
  memcpy(&hb->trans, trans, sizeof(*trans));
D
dapan1121 已提交
804
  SCH_UNLOCK(SCH_WRITE, &hb->lock);
D
dapan1121 已提交
805

L
Liu Jicong 已提交
806 807
  qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, handle:%p", schMgmt.sId,
         epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->transInst, trans->transHandle);
L
Liu Jicong 已提交
808

D
dapan1121 已提交
809 810 811
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
  if (TSDB_CODE_SUCCESS == errCode) {
    return;
  }

  int32_t origCode = atomic_load_32(&pJob->errCode);
  if (TSDB_CODE_SUCCESS == origCode) {
    if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) {
      goto _return;
    }

    origCode = atomic_load_32(&pJob->errCode);
  }

  if (NEED_CLIENT_HANDLE_ERROR(origCode)) {
    return;
  }
L
Liu Jicong 已提交
829

D
dapan1121 已提交
830
  if (NEED_CLIENT_HANDLE_ERROR(errCode)) {
D
dapan1121 已提交
831
    atomic_store_32(&pJob->errCode, errCode);
D
dapan1121 已提交
832
    goto _return;
D
dapan1121 已提交
833 834
  }

D
dapan1121 已提交
835
  return;
L
Liu Jicong 已提交
836 837

_return:
D
dapan1121 已提交
838 839 840 841 842 843 844 845 846 847

  SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode));
}

int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) {
  // if already FAILED, no more processing
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, status));

  schUpdateJobErrCode(pJob, errCode);

D
dapan1121 已提交
848
  if (atomic_load_8(&pJob->userFetch) || pJob->attr.syncSchedule) {
D
dapan1121 已提交
849 850 851
    tsem_post(&pJob->rspSem);
  }

D
dapan1121 已提交
852
  int32_t code = atomic_load_32(&pJob->errCode);
D
dapan1121 已提交
853

D
dapan1121 已提交
854
  SCH_JOB_DLOG("job failed with error: %s", tstrerror(code));
L
Liu Jicong 已提交
855

D
dapan1121 已提交
856
  SCH_RET(code);
D
dapan1121 已提交
857 858
}

D
dapan1121 已提交
859
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
860 861 862 863 864 865 866 867 868
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
  SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAILED, errCode));
}

// Note: no more error processing, handled in function internal
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) {
  SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_DROPPING, errCode));
}

D
dapan1121 已提交
869
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
870 871
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
  int32_t code = 0;
L
Liu Jicong 已提交
872

D
dapan1121 已提交
873
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_PARTIAL_SUCCEED));
D
dapan 已提交
874

D
dapan1121 已提交
875
  if (pJob->attr.syncSchedule) {
D
dapan 已提交
876
    tsem_post(&pJob->rspSem);
D
dapan 已提交
877
  }
L
Liu Jicong 已提交
878

D
dapan1121 已提交
879 880 881
  if (atomic_load_8(&pJob->userFetch)) {
    SCH_ERR_JRET(schFetchFromRemote(pJob));
  }
D
dapan 已提交
882

D
dapan 已提交
883
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
884 885 886

_return:

D
dapan1121 已提交
887
  SCH_RET(schProcessOnJobFailure(pJob, code));
D
dapan 已提交
888 889
}

890
void schProcessOnDataFetched(SSchJob *job) {
D
dapan 已提交
891 892
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
  tsem_post(&job->rspSem);
D
dapan 已提交
893 894
}

D
dapan1121 已提交
895
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
896
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
D
dapan1121 已提交
897
  int8_t status = 0;
L
Liu Jicong 已提交
898

D
dapan1121 已提交
899
  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
900
    SCH_TASK_DLOG("task failed not processed cause of job status, job status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
901 902 903
    SCH_RET(atomic_load_32(&pJob->errCode));
  }

L
Liu Jicong 已提交
904 905
  bool    needRetry = false;
  bool    moved = false;
D
dapan1121 已提交
906
  int32_t taskDone = 0;
D
dapan1121 已提交
907
  int32_t code = 0;
D
dapan1121 已提交
908

H
Haojun Liao 已提交
909
  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
L
Liu Jicong 已提交
910

D
dapan1121 已提交
911
  SCH_ERR_JRET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
L
Liu Jicong 已提交
912

D
dapan1121 已提交
913
  if (!needRetry) {
H
Haojun Liao 已提交
914
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
D
dapan1121 已提交
915 916

    if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
D
dapan1121 已提交
917 918
      SCH_ERR_JRET(schMoveTaskToFailList(pJob, pTask, &moved));
    } else {
D
dapan1121 已提交
919
      SCH_TASK_ELOG("task not in executing list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
920
      SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
921 922 923
    }

    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED);
L
Liu Jicong 已提交
924

D
dapan1121 已提交
925
    if (SCH_IS_WAIT_ALL_JOB(pJob)) {
D
dapan1121 已提交
926 927 928 929 930
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskFailed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);

D
dapan1121 已提交
931
      schUpdateJobErrCode(pJob, errCode);
L
Liu Jicong 已提交
932

D
dapan1121 已提交
933
      if (taskDone < pTask->level->taskNum) {
L
Liu Jicong 已提交
934
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
D
dapan 已提交
935
        SCH_RET(errCode);
D
dapan1121 已提交
936 937 938
      }
    }
  } else {
D
dapan1121 已提交
939
    SCH_ERR_JRET(schHandleTaskRetry(pJob, pTask));
L
Liu Jicong 已提交
940

D
dapan 已提交
941 942
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
943

D
dapan1121 已提交
944 945
_return:

D
dapan1121 已提交
946
  SCH_RET(schProcessOnJobFailure(pJob, errCode));
D
dapan1121 已提交
947 948
}

D
dapan1121 已提交
949
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
950
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
951
  bool    moved = false;
D
dapan1121 已提交
952 953
  int32_t code = 0;

D
dapan1121 已提交
954
  SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan 已提交
955

D
dapan1121 已提交
956
  SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved));
D
dapan1121 已提交
957

D
dapan1121 已提交
958
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED);
D
dapan1121 已提交
959

D
dapan1121 已提交
960
  SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask));
D
dapan1121 已提交
961 962

  SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
L
Liu Jicong 已提交
963

D
dapan1121 已提交
964
  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
D
dapan 已提交
965
  if (parentNum == 0) {
L
Liu Jicong 已提交
966
    int32_t taskDone = 0;
D
dapan1121 已提交
967
    if (SCH_IS_WAIT_ALL_JOB(pJob)) {
D
dapan1121 已提交
968 969 970 971
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskSucceed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
L
Liu Jicong 已提交
972

D
dapan1121 已提交
973
      if (taskDone < pTask->level->taskNum) {
S
Shengliang Guan 已提交
974
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
975
        return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
976
      } else if (taskDone > pTask->level->taskNum) {
D
dapan1121 已提交
977
        SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
978 979
      }

D
dapan1121 已提交
980
      if (pTask->level->taskFailed > 0) {
D
dapan1121 已提交
981 982 983
        SCH_RET(schProcessOnJobFailure(pJob, 0));
      } else {
        SCH_RET(schProcessOnJobPartialSuccess(pJob));
D
dapan1121 已提交
984 985
      }
    } else {
D
dapan1121 已提交
986
      pJob->resNode = pTask->succeedAddr;
D
dapan1121 已提交
987
    }
D
dapan 已提交
988

D
dapan1121 已提交
989
    pJob->fetchTask = pTask;
D
dapan1121 已提交
990

D
dapan1121 已提交
991
    SCH_ERR_JRET(schMoveTaskToExecList(pJob, pTask, &moved));
L
Liu Jicong 已提交
992

D
dapan1121 已提交
993
    SCH_RET(schProcessOnJobPartialSuccess(pJob));
D
dapan 已提交
994 995
  }

L
Liu Jicong 已提交
996 997 998 999
  /*
    if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
      strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
      job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
D
dapan 已提交
1000

L
Liu Jicong 已提交
1001 1002 1003
      ++job->dataSrcEps.numOfEps;
    }
  */
D
dapan 已提交
1004

D
dapan 已提交
1005
  for (int32_t i = 0; i < parentNum; ++i) {
D
dapan1121 已提交
1006
    SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i);
L
Liu Jicong 已提交
1007
    int32_t   readyNum = atomic_add_fetch_32(&par->childReady, 1);
D
dapan 已提交
1008

D
dapan1121 已提交
1009
    SCH_LOCK(SCH_WRITE, &par->lock);
L
Liu Jicong 已提交
1010 1011 1012 1013
    SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE,
                                    .taskId = pTask->taskId,
                                    .schedId = schMgmt.sId,
                                    .addr = pTask->succeedAddr};
X
Xiaoyu Wang 已提交
1014
    qSetSubplanExecutionNode(par->plan, pTask->plan->id.groupId, &source);
D
dapan1121 已提交
1015
    SCH_UNLOCK(SCH_WRITE, &par->lock);
L
Liu Jicong 已提交
1016

D
dapan 已提交
1017
    if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) {
D
dapan1121 已提交
1018
      SCH_ERR_RET(schLaunchTaskImpl(pJob, par));
D
dapan 已提交
1019 1020 1021 1022 1023
    }
  }

  return TSDB_CODE_SUCCESS;

D
dapan1121 已提交
1024
_return:
D
dapan 已提交
1025

D
dapan1121 已提交
1026 1027
  SCH_RET(schProcessOnJobFailure(pJob, code));
}
D
dapan 已提交
1028

D
dapan1121 已提交
1029 1030 1031
// Note: no more error processing, handled in function internal
int32_t schFetchFromRemote(SSchJob *pJob) {
  int32_t code = 0;
L
Liu Jicong 已提交
1032

D
dapan1121 已提交
1033 1034 1035 1036 1037
  if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) {
    SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch));
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1038 1039
  void *resData = atomic_load_ptr(&pJob->resData);
  if (resData) {
D
dapan1121 已提交
1040 1041
    atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1042
    SCH_JOB_DLOG("res already fetched, res:%p", resData);
D
dapan1121 已提交
1043 1044 1045 1046 1047 1048
    return TSDB_CODE_SUCCESS;
  }

  SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH));

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1049

D
dapan1121 已提交
1050 1051 1052 1053
_return:

  atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1054
  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
D
dapan1121 已提交
1055 1056
}

D
dapan1121 已提交
1057 1058
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
  SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
H
Hongze Cheng 已提交
1059

D
dapan1121 已提交
1060 1061 1062 1063
  atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
  atomic_store_ptr(&pJob->resData, pRsp);

  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
H
Hongze Cheng 已提交
1064

D
dapan1121 已提交
1065 1066 1067 1068 1069
  schProcessOnDataFetched(pJob);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1070
// Note: no more task error processing, handled in function internal
L
Liu Jicong 已提交
1071 1072
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize,
                             int32_t rspCode) {
D
dapan1121 已提交
1073
  int32_t code = 0;
L
Liu Jicong 已提交
1074 1075
  int8_t  status = 0;

D
dapan1121 已提交
1076
  if (schJobNeedToStop(pJob, &status)) {
L
Liu Jicong 已提交
1077 1078
    SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status),
                  rspCode);
D
dapan1121 已提交
1079 1080
    SCH_RET(atomic_load_32(&pJob->errCode));
  }
H
Haojun Liao 已提交
1081

D
dapan1121 已提交
1082 1083
  SCH_ERR_JRET(schValidateTaskReceivedMsgType(pJob, pTask, msgType));

D
dapan1121 已提交
1084
  switch (msgType) {
H
Haojun Liao 已提交
1085
    case TDMT_VND_CREATE_TABLE_RSP: {
X
Xiaoyu Wang 已提交
1086 1087
      SVCreateTbBatchRsp batchRsp = {0};
      if (msg) {
1088 1089 1090 1091 1092 1093
        SCoder coder = {0};
        tCoderInit(&coder, TD_LITTLE_ENDIAN, msg, msgSize, TD_DECODER);
        code = tDecodeSVCreateTbBatchRsp(&coder, &batchRsp);
        if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
          for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
            SVCreateTbRsp *rsp = batchRsp.pRsps + i;
X
Xiaoyu Wang 已提交
1094
            if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) {
1095
              tCoderClear(&coder);
X
Xiaoyu Wang 已提交
1096
              SCH_ERR_JRET(rsp->code);
1097 1098
            } else if (TSDB_CODE_SUCCESS != rsp->code) {
              code = rsp->code;
D
dapan 已提交
1099 1100
            }
          }
D
dapan1121 已提交
1101
        }
1102 1103
        tCoderClear(&coder);
        SCH_ERR_JRET(code);
L
Liu Jicong 已提交
1104 1105
      }

L
Liu Jicong 已提交
1106 1107 1108 1109
      SCH_ERR_JRET(rspCode);
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
      break;
    }
X
Xiaoyu Wang 已提交
1110 1111 1112 1113 1114 1115
    case TDMT_VND_DROP_TABLE_RSP: {
      SVDropTbBatchRsp batchRsp = {0};
      if (msg) {
        SCoder coder = {0};
        tCoderInit(&coder, TD_LITTLE_ENDIAN, msg, msgSize, TD_DECODER);
        code = tDecodeSVDropTbBatchRsp(&coder, &batchRsp);
1116 1117 1118
        if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
          for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
            SVDropTbRsp *rsp = batchRsp.pRsps + i;
X
Xiaoyu Wang 已提交
1119 1120 1121
            if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) {
              tCoderClear(&coder);
              SCH_ERR_JRET(rsp->code);
1122 1123
            } else if (TSDB_CODE_SUCCESS != rsp->code) {
              code = rsp->code;
X
Xiaoyu Wang 已提交
1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134
            }
          }
        }
        tCoderClear(&coder);
        SCH_ERR_JRET(code);
      }

      SCH_ERR_JRET(rspCode);
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
      break;
    }
D
dapan1121 已提交
1135
    case TDMT_VND_SUBMIT_RSP: {
X
Xiaoyu Wang 已提交
1136 1137 1138 1139
      if (msg) {
        SSubmitRsp *rsp = (SSubmitRsp *)msg;
        SCH_ERR_JRET(rsp->code);
      }
1140

L
Liu Jicong 已提交
1141
      SCH_ERR_JRET(rspCode);
D
dapan1121 已提交
1142

L
Liu Jicong 已提交
1143 1144 1145 1146
      SSubmitRsp *rsp = (SSubmitRsp *)msg;
      if (rsp) {
        pJob->resNumOfRows += rsp->affectedRows;
      }
D
dapan1121 已提交
1147

L
Liu Jicong 已提交
1148
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
D
dapan1121 已提交
1149

L
Liu Jicong 已提交
1150 1151
      break;
    }
D
dapan1121 已提交
1152
    case TDMT_VND_QUERY_RSP: {
L
Liu Jicong 已提交
1153 1154
      SQueryTableRsp rsp = {0};
      if (msg) {
D
dapan1121 已提交
1155
        SCH_ERR_JRET(tDeserializeSQueryTableRsp(msg, msgSize, &rsp));
L
Liu Jicong 已提交
1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167
        SCH_ERR_JRET(rsp.code);
      }

      SCH_ERR_JRET(rspCode);

      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }

      // SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY));

      break;
L
Liu Jicong 已提交
1168
    }
D
dapan1121 已提交
1169
    case TDMT_VND_RES_READY_RSP: {
L
Liu Jicong 已提交
1170 1171 1172 1173 1174
      SResReadyRsp *rsp = (SResReadyRsp *)msg;

      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
1175
      }
L
Liu Jicong 已提交
1176 1177
      SCH_ERR_JRET(rsp->code);
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
D
dapan1121 已提交
1178

L
Liu Jicong 已提交
1179 1180
      break;
    }
D
dapan1121 已提交
1181 1182 1183 1184 1185
    case TDMT_VND_EXPLAIN_RSP: {
      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }
H
Hongze Cheng 已提交
1186

D
dapan1121 已提交
1187 1188 1189 1190 1191 1192 1193 1194 1195 1196
      if (!SCH_IS_EXPLAIN_JOB(pJob)) {
        SCH_TASK_ELOG("invalid msg received for none explain query, msg type:%s", TMSG_INFO(msgType));
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }

      if (pJob->resData) {
        SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->resData);
        SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
      }

D
dapan1121 已提交
1197 1198 1199 1200 1201 1202
      SExplainRsp rsp = {0};
      if (tDeserializeSExplainRsp(msg, msgSize, &rsp)) {
        taosMemoryFree(rsp.subplanInfo);
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

D
dapan1121 已提交
1203
      SRetrieveTableRsp *pRsp = NULL;
D
dapan1121 已提交
1204
      SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, &rsp, pTask->plan->id.groupId, &pRsp));
D
dapan1121 已提交
1205 1206

      if (pRsp) {
D
dapan1121 已提交
1207
        SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
D
dapan1121 已提交
1208 1209 1210
      }
      break;
    }
L
Liu Jicong 已提交
1211 1212
    case TDMT_VND_FETCH_RSP: {
      SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
D
dapan1121 已提交
1213

L
Liu Jicong 已提交
1214 1215 1216 1217
      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }
D
dapan1121 已提交
1218

D
dapan1121 已提交
1219
      if (SCH_IS_EXPLAIN_JOB(pJob)) {
H
Hongze Cheng 已提交
1220
        if (rsp->completed) {
D
dapan1121 已提交
1221 1222 1223 1224 1225
          SRetrieveTableRsp *pRsp = NULL;
          SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
          if (pRsp) {
            SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
          }
H
Hongze Cheng 已提交
1226

D
dapan1121 已提交
1227 1228 1229
          return TSDB_CODE_SUCCESS;
        }

D
dapan1121 已提交
1230 1231
        atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1232 1233 1234 1235 1236
        SCH_ERR_JRET(schFetchFromRemote(pJob));

        return TSDB_CODE_SUCCESS;
      }

X
Xiaoyu Wang 已提交
1237 1238
      if (pJob->resData) {
        SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->resData);
wafwerar's avatar
wafwerar 已提交
1239
        taosMemoryFreeClear(rsp);
L
Liu Jicong 已提交
1240 1241
        SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
      }
H
Haojun Liao 已提交
1242

X
Xiaoyu Wang 已提交
1243
      atomic_store_ptr(&pJob->resData, rsp);
L
Liu Jicong 已提交
1244
      atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
D
dapan1121 已提交
1245

L
Liu Jicong 已提交
1246 1247
      if (rsp->completed) {
        SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
1248
      }
L
Liu Jicong 已提交
1249 1250 1251 1252 1253 1254

      SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);

      schProcessOnDataFetched(pJob);
      break;
    }
D
dapan1121 已提交
1255
    case TDMT_VND_DROP_TASK_RSP: {
L
Liu Jicong 已提交
1256 1257 1258 1259 1260
      // SHOULD NEVER REACH HERE
      SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:%" PRIx64, pJob->refId);
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
      break;
    }
D
dapan1121 已提交
1261 1262 1263 1264
    case TDMT_SCH_LINK_BROKEN:
      SCH_TASK_ELOG("link broken received, error:%x - %s", rspCode, tstrerror(rspCode));
      SCH_ERR_JRET(rspCode);
      break;
D
dapan1121 已提交
1265
    default:
D
dapan1121 已提交
1266
      SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%s", msgType, SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
1267
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
1268 1269 1270 1271 1272
  }

  return TSDB_CODE_SUCCESS;

_return:
D
dapan1121 已提交
1273

D
dapan1121 已提交
1274
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
1275 1276
}

D
dapan1121 已提交
1277
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
H
Hongze Cheng 已提交
1278 1279 1280 1281
  int32_t s = taosHashGetSize(pTaskList);
  if (s <= 0) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1282

H
Hongze Cheng 已提交
1283 1284 1285 1286
  SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
  if (NULL == task || NULL == (*task)) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1287

H
Hongze Cheng 已提交
1288 1289 1290
  *pTask = *task;

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1291 1292
}

D
dapan1121 已提交
1293
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) {
H
Hongze Cheng 已提交
1294 1295
  if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 ||
      taosArrayGetSize(pTask->execNodes) <= 0) {
D
dapan1121 已提交
1296 1297 1298 1299 1300 1301 1302 1303 1304
    return TSDB_CODE_SUCCESS;
  }

  SSchNodeInfo *nodeInfo = taosArrayGet(pTask->execNodes, 0);
  nodeInfo->handle = handle;

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1305
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
L
Liu Jicong 已提交
1306
  int32_t                code = 0;
D
dapan1121 已提交
1307
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
X
Xiaoyu Wang 已提交
1308
  SSchTask              *pTask = NULL;
L
Liu Jicong 已提交
1309

D
dapan1121 已提交
1310
  SSchJob *pJob = schAcquireJob(pParam->refId);
D
dapan1121 已提交
1311
  if (NULL == pJob) {
D
dapan1121 已提交
1312
    qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:%" PRIx64,
L
Liu Jicong 已提交
1313
          pParam->queryId, pParam->taskId, pParam->refId);
D
dapan1121 已提交
1314
    SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
1315 1316
  }

D
dapan1121 已提交
1317 1318 1319 1320 1321
  schGetTaskFromTaskList(pJob->execTasks, pParam->taskId, &pTask);
  if (NULL == pTask) {
    if (TDMT_VND_EXPLAIN_RSP == msgType) {
      schGetTaskFromTaskList(pJob->succTasks, pParam->taskId, &pTask);
    } else {
H
Hongze Cheng 已提交
1322 1323
      SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
                   pParam->taskId);
D
dapan1121 已提交
1324 1325
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }
D
dapan1121 已提交
1326
  }
H
Hongze Cheng 已提交
1327

D
dapan1121 已提交
1328
  if (NULL == pTask) {
H
Hongze Cheng 已提交
1329 1330
    SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
                 pParam->taskId);
D
dapan1121 已提交
1331 1332
    SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
D
dapan1121 已提交
1333

D
dapan1121 已提交
1334
  SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
L
Liu Jicong 已提交
1335

L
Liu Jicong 已提交
1336
  SCH_SET_TASK_HANDLE(pTask, pMsg->handle);
D
dapan1121 已提交
1337
  schUpdateTaskExecNodeHandle(pTask, pMsg->handle, rspCode);
D
dapan1121 已提交
1338
  SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
D
dapan1121 已提交
1339

H
Haojun Liao 已提交
1340
_return:
D
dapan1121 已提交
1341
  if (pJob) {
D
dapan1121 已提交
1342
    schReleaseJob(pParam->refId);
D
dapan1121 已提交
1343 1344
  }

wafwerar's avatar
wafwerar 已提交
1345
  taosMemoryFreeClear(param);
D
dapan1121 已提交
1346 1347 1348
  SCH_RET(code);
}

L
Liu Jicong 已提交
1349
int32_t schHandleSubmitCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1350 1351
  return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
}
H
Haojun Liao 已提交
1352

L
Liu Jicong 已提交
1353
int32_t schHandleCreateTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
H
Haojun Liao 已提交
1354 1355 1356
  return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
}

X
Xiaoyu Wang 已提交
1357 1358 1359 1360
int32_t schHandleDropTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_DROP_TABLE_RSP, code);
}

L
Liu Jicong 已提交
1361
int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1362 1363
  return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
H
Haojun Liao 已提交
1364

L
Liu Jicong 已提交
1365
int32_t schHandleFetchCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1366 1367
  return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
}
H
Haojun Liao 已提交
1368

L
Liu Jicong 已提交
1369
int32_t schHandleReadyCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1370 1371
  return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
}
H
Haojun Liao 已提交
1372

D
dapan1121 已提交
1373 1374 1375 1376
int32_t schHandleExplainCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_EXPLAIN_RSP, code);
}

L
Liu Jicong 已提交
1377
int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1378
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
L
Liu Jicong 已提交
1379
  qDebug("QID:%" PRIx64 ",TID:%" PRIx64 " drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code);
1380
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1381 1382
}

L
Liu Jicong 已提交
1383
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1384 1385 1386 1387
  if (code) {
    qError("hb rsp error:%s", tstrerror(code));
    SCH_ERR_RET(code);
  }
L
Liu Jicong 已提交
1388

D
dapan1121 已提交
1389 1390 1391 1392 1393 1394
  SSchedulerHbRsp rsp = {0};
  if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) {
    qError("invalid hb rsp msg, size:%d", pMsg->len);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
1395 1396
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;

D
dapan1121 已提交
1397 1398 1399
  SSchTrans trans = {0};
  trans.transInst = pParam->transport;
  trans.transHandle = pMsg->handle;
L
Liu Jicong 已提交
1400

D
dapan1121 已提交
1401
  SCH_ERR_RET(schUpdateHbConnection(&rsp.epId, &trans));
D
dapan1121 已提交
1402 1403

  int32_t taskNum = (int32_t)taosArrayGetSize(rsp.taskStatus);
L
Liu Jicong 已提交
1404 1405
  qDebug("%d task status in hb rsp, nodeId:%d, fqdn:%s, port:%d", taskNum, rsp.epId.nodeId, rsp.epId.ep.fqdn,
         rsp.epId.ep.port);
D
dapan1121 已提交
1406

D
dapan1121 已提交
1407 1408
  for (int32_t i = 0; i < taskNum; ++i) {
    STaskStatus *taskStatus = taosArrayGet(rsp.taskStatus, i);
L
Liu Jicong 已提交
1409

D
dapan1121 已提交
1410 1411
    SSchJob *pJob = schAcquireJob(taskStatus->refId);
    if (NULL == pJob) {
L
Liu Jicong 已提交
1412 1413 1414
      qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId,
            taskStatus->queryId, taskStatus->taskId);
      // TODO DROP TASK FROM SERVER!!!!
D
dapan1121 已提交
1415 1416
      continue;
    }
L
Liu Jicong 已提交
1417

D
dapan1121 已提交
1418
    // TODO
L
Liu Jicong 已提交
1419 1420 1421

    SCH_JOB_DLOG("TID:0x%" PRIx64 " task status in server: %s", taskStatus->taskId,
                 jobTaskStatusStr(taskStatus->status));
D
dapan1121 已提交
1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432

    schReleaseJob(taskStatus->refId);
  }

_return:

  tFreeSSchedulerHbRsp(&rsp);

  SCH_RET(code);
}

D
dapan1121 已提交
1433 1434 1435 1436
int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param;
  rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);

D
dapan1121 已提交
1437 1438
  qDebug("handle %p is broken", pMsg->handle);

D
dapan1121 已提交
1439 1440
  if (head->isHbParam) {
    SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param;
L
Liu Jicong 已提交
1441
    SSchTrans            trans = {.transInst = hbParam->transport, .transHandle = NULL};
D
dapan1121 已提交
1442 1443 1444 1445 1446 1447 1448 1449 1450 1451
    SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans));

    SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId));
  } else {
    SCH_ERR_RET(schHandleCallback(param, pMsg, TDMT_SCH_LINK_BROKEN, code));
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1452
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
D
dapan1121 已提交
1453
  switch (msgType) {
H
Haojun Liao 已提交
1454 1455 1456
    case TDMT_VND_CREATE_TABLE:
      *fp = schHandleCreateTableCallback;
      break;
X
Xiaoyu Wang 已提交
1457 1458 1459
    case TDMT_VND_DROP_TABLE:
      *fp = schHandleDropTableCallback;
      break;
L
Liu Jicong 已提交
1460
    case TDMT_VND_SUBMIT:
D
dapan1121 已提交
1461 1462
      *fp = schHandleSubmitCallback;
      break;
L
Liu Jicong 已提交
1463
    case TDMT_VND_QUERY:
D
dapan1121 已提交
1464 1465
      *fp = schHandleQueryCallback;
      break;
L
Liu Jicong 已提交
1466
    case TDMT_VND_RES_READY:
D
dapan1121 已提交
1467 1468
      *fp = schHandleReadyCallback;
      break;
D
dapan1121 已提交
1469 1470 1471
    case TDMT_VND_EXPLAIN:
      *fp = schHandleExplainCallback;
      break;
L
Liu Jicong 已提交
1472
    case TDMT_VND_FETCH:
D
dapan1121 已提交
1473 1474 1475 1476 1477
      *fp = schHandleFetchCallback;
      break;
    case TDMT_VND_DROP_TASK:
      *fp = schHandleDropCallback;
      break;
D
dapan1121 已提交
1478 1479 1480
    case TDMT_VND_QUERY_HEARTBEAT:
      *fp = schHandleHbCallback;
      break;
D
dapan1121 已提交
1481 1482 1483
    case TDMT_SCH_LINK_BROKEN:
      *fp = schHandleLinkBrokenCallback;
      break;
D
dapan1121 已提交
1484
    default:
D
dapan1121 已提交
1485
      qError("unknown msg type for callback, msgType:%d", msgType);
D
dapan1121 已提交
1486 1487 1488 1489 1490 1491
      SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1492
int32_t schGenerateTaskCallBackAHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, SMsgSendInfo **pMsgSendInfo) {
H
Hongze Cheng 已提交
1493
  int32_t       code = 0;
D
dapan1121 已提交
1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528
  SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (NULL == msgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));

  param->queryId = pJob->queryId;
  param->refId = pJob->refId;
  param->taskId = SCH_TASK_ID(pTask);
  param->transport = pJob->transport;

  msgSendInfo->param = param;
  msgSendInfo->fp = fp;

  *pMsgSendInfo = msgSendInfo;

  return TSDB_CODE_SUCCESS;

_return:

  taosMemoryFree(param);
  taosMemoryFree(msgSendInfo);

  SCH_RET(code);
}

D
dapan1121 已提交
1529
void schFreeRpcCtxVal(const void *arg) {
D
dapan1121 已提交
1530 1531 1532
  if (NULL == arg) {
    return;
  }
L
Liu Jicong 已提交
1533 1534

  SMsgSendInfo *pMsgSendInfo = (SMsgSendInfo *)arg;
D
dapan1121 已提交
1535 1536
  taosMemoryFreeClear(pMsgSendInfo->param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1537
}
D
dapan1121 已提交
1538

D
dapan1121 已提交
1539
int32_t schMakeTaskCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
D
dapan1121 已提交
1540
  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  param->queryId = pJob->queryId;
  param->refId = pJob->refId;
  param->taskId = SCH_TASK_ID(pTask);
  param->transport = pJob->transport;

  *pParam = param;

  return TSDB_CODE_SUCCESS;
}

int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
D
dapan1121 已提交
1557
  SSchHbCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  param->head.isHbParam = true;

  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);

  param->nodeEpId.nodeId = addr->nodeId;
  memcpy(&param->nodeEpId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
  param->transport = pJob->transport;

  *pParam = param;

  return TSDB_CODE_SUCCESS;
}

int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb) {
L
Liu Jicong 已提交
1577 1578 1579
  int32_t       code = 0;
  SMsgSendInfo *pMsgSendInfo = NULL;

D
dapan1121 已提交
1580
  pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591
  if (NULL == pMsgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  if (isHb) {
    SCH_ERR_JRET(schMakeHbCallbackParam(pJob, pTask, &pMsgSendInfo->param));
  } else {
    SCH_ERR_JRET(schMakeTaskCallbackParam(pJob, pTask, &pMsgSendInfo->param));
  }

L
Liu Jicong 已提交
1592
  int32_t              msgType = TDMT_SCH_LINK_BROKEN;
D
dapan1121 已提交
1593 1594
  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
L
Liu Jicong 已提交
1595

D
dapan1121 已提交
1596 1597 1598 1599
  pMsgSendInfo->fp = fp;

  brokenVal->msgType = msgType;
  brokenVal->val = pMsgSendInfo;
D
dapan1121 已提交
1600
  brokenVal->clone = schCloneSMsgSendInfo;
D
dapan1121 已提交
1601
  brokenVal->freeFunc = schFreeRpcCtxVal;
L
Liu Jicong 已提交
1602

D
dapan1121 已提交
1603 1604 1605 1606
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
1607 1608
  taosMemoryFreeClear(pMsgSendInfo->param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1609 1610 1611 1612

  SCH_RET(code);
}

D
dapan1121 已提交
1613
int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
H
Hongze Cheng 已提交
1614
  int32_t       code = 0;
D
dapan1121 已提交
1615 1616
  SMsgSendInfo *pReadyMsgSendInfo = NULL;
  SMsgSendInfo *pExplainMsgSendInfo = NULL;
D
dapan1121 已提交
1617 1618 1619 1620 1621 1622

  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pCtx->args) {
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1623

D
dapan1121 已提交
1624 1625
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_RES_READY, &pReadyMsgSendInfo));
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_EXPLAIN, &pExplainMsgSendInfo));
D
dapan1121 已提交
1626

H
Hongze Cheng 已提交
1627
  int32_t    msgType = TDMT_VND_RES_READY_RSP;
D
dapan1121 已提交
1628 1629 1630
  SRpcCtxVal ctxVal = {.val = pReadyMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
D
dapan1121 已提交
1631 1632 1633
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1634 1635
  msgType = TDMT_VND_EXPLAIN_RSP;
  ctxVal.val = pExplainMsgSendInfo;
D
dapan1121 已提交
1636 1637 1638 1639
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
1640

D
dapan1121 已提交
1641 1642
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, false));

D
dapan1121 已提交
1643 1644 1645 1646 1647
  return TSDB_CODE_SUCCESS;

_return:

  taosHashCleanup(pCtx->args);
H
Hongze Cheng 已提交
1648

D
dapan1121 已提交
1649 1650 1651 1652 1653 1654 1655 1656 1657
  if (pReadyMsgSendInfo) {
    taosMemoryFreeClear(pReadyMsgSendInfo->param);
    taosMemoryFreeClear(pReadyMsgSendInfo);
  }

  if (pExplainMsgSendInfo) {
    taosMemoryFreeClear(pExplainMsgSendInfo->param);
    taosMemoryFreeClear(pExplainMsgSendInfo);
  }
D
dapan1121 已提交
1658 1659 1660 1661 1662

  SCH_RET(code);
}

int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
L
Liu Jicong 已提交
1663
  int32_t              code = 0;
D
dapan1121 已提交
1664
  SSchHbCallbackParam *param = NULL;
X
Xiaoyu Wang 已提交
1665 1666
  SMsgSendInfo        *pMsgSendInfo = NULL;
  SQueryNodeAddr      *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
L
Liu Jicong 已提交
1667
  SQueryNodeEpId       epId = {0};
D
dapan1121 已提交
1668 1669 1670

  epId.nodeId = addr->nodeId;
  memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
D
dapan1121 已提交
1671 1672 1673 1674 1675 1676

  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pCtx->args) {
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1677

D
dapan1121 已提交
1678
  pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1679 1680 1681 1682 1683
  if (NULL == pMsgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1684
  param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1685
  if (NULL == param) {
D
dapan1121 已提交
1686
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1687 1688 1689
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

L
Liu Jicong 已提交
1690
  int32_t              msgType = TDMT_VND_QUERY_HEARTBEAT_RSP;
D
dapan1121 已提交
1691 1692 1693
  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_QUERY_HEARTBEAT, &fp));

D
dapan1121 已提交
1694
  param->nodeEpId = epId;
D
dapan1121 已提交
1695
  param->transport = pJob->transport;
L
Liu Jicong 已提交
1696

D
dapan1121 已提交
1697 1698 1699
  pMsgSendInfo->param = param;
  pMsgSendInfo->fp = fp;

D
dapan1121 已提交
1700
  SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
D
dapan1121 已提交
1701 1702 1703 1704 1705
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1706 1707
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, true));

D
dapan1121 已提交
1708 1709 1710 1711 1712
  return TSDB_CODE_SUCCESS;

_return:

  taosHashCleanup(pCtx->args);
D
dapan1121 已提交
1713 1714
  taosMemoryFreeClear(param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1715 1716 1717 1718

  SCH_RET(code);
}

D
dapan1121 已提交
1719
int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
L
Liu Jicong 已提交
1720
  int32_t     code = 0;
D
dapan1121 已提交
1721 1722 1723
  SSchHbTrans hb = {0};

  hb.trans.transInst = pJob->transport;
L
Liu Jicong 已提交
1724

D
dapan1121 已提交
1725 1726 1727 1728 1729
  SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx));

  code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans));
  if (code) {
    schFreeRpcCtx(&hb.rpcCtx);
L
Liu Jicong 已提交
1730

D
dapan1121 已提交
1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744
    if (HASH_NODE_EXIST(code)) {
      *exist = true;
      return TSDB_CODE_SUCCESS;
    }

    qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
    SCH_ERR_RET(code);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHeader **pDst) {
  if (pSrc->isHbParam) {
D
dapan1121 已提交
1745
    SSchHbCallbackParam *dst = taosMemoryMalloc(sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756
    if (NULL == dst) {
      qError("malloc SSchHbCallbackParam failed");
      SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    memcpy(dst, pSrc, sizeof(*dst));
    *pDst = (SSchCallbackParamHeader *)dst;

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1757
  SSchTaskCallbackParam *dst = taosMemoryMalloc(sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
1758 1759 1760 1761
  if (NULL == dst) {
    qError("malloc SSchTaskCallbackParam failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1762

D
dapan1121 已提交
1763 1764
  memcpy(dst, pSrc, sizeof(*dst));
  *pDst = (SSchCallbackParamHeader *)dst;
L
Liu Jicong 已提交
1765

D
dapan1121 已提交
1766 1767 1768
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1769 1770
int32_t schCloneSMsgSendInfo(void *src, void **dst) {
  SMsgSendInfo *pSrc = src;
L
Liu Jicong 已提交
1771
  int32_t       code = 0;
D
dapan1121 已提交
1772
  SMsgSendInfo *pDst = taosMemoryMalloc(sizeof(*pSrc));
D
dapan1121 已提交
1773
  if (NULL == pDst) {
D
dapan1121 已提交
1774 1775 1776 1777
    qError("malloc SMsgSendInfo for rpcCtx failed, len:%d", (int32_t)sizeof(*pSrc));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1778 1779
  memcpy(pDst, pSrc, sizeof(*pSrc));
  pDst->param = NULL;
D
dapan1121 已提交
1780

D
dapan1121 已提交
1781
  SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
D
dapan1121 已提交
1782

D
dapan1121 已提交
1783
  *dst = pDst;
D
dapan1121 已提交
1784

D
dapan1121 已提交
1785
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1786

D
dapan1121 已提交
1787 1788
_return:

D
dapan1121 已提交
1789
  taosMemoryFreeClear(pDst);
D
dapan1121 已提交
1790 1791 1792 1793 1794 1795 1796
  SCH_RET(code);
}

int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
  int32_t code = 0;
  memcpy(&pDst->brokenVal, &pSrc->brokenVal, sizeof(pSrc->brokenVal));
  pDst->brokenVal.val = NULL;
L
Liu Jicong 已提交
1797

D
dapan1121 已提交
1798
  SCH_ERR_RET(schCloneSMsgSendInfo(pSrc->brokenVal.val, &pDst->brokenVal.val));
D
dapan1121 已提交
1799 1800 1801 1802 1803 1804 1805 1806

  pDst->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pDst->args) {
    qError("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SRpcCtxVal dst = {0};
X
Xiaoyu Wang 已提交
1807
  void      *pIter = taosHashIterate(pSrc->args, NULL);
D
dapan1121 已提交
1808 1809
  while (pIter) {
    SRpcCtxVal *pVal = (SRpcCtxVal *)pIter;
X
Xiaoyu Wang 已提交
1810
    int32_t    *msgType = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
1811

D
dapan1121 已提交
1812 1813
    dst = *pVal;
    dst.val = NULL;
L
Liu Jicong 已提交
1814

D
dapan1121 已提交
1815
    SCH_ERR_JRET(schCloneSMsgSendInfo(pVal->val, &dst.val));
L
Liu Jicong 已提交
1816

D
dapan1121 已提交
1817
    if (taosHashPut(pDst->args, msgType, sizeof(*msgType), &dst, sizeof(dst))) {
D
dapan1121 已提交
1818
      qError("taosHashPut msg %d to rpcCtx failed", *msgType);
D
dapan1121 已提交
1819
      (*dst.freeFunc)(dst.val);
D
dapan1121 已提交
1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    pIter = taosHashIterate(pSrc->args, pIter);
  }

  return TSDB_CODE_SUCCESS;

_return:

  schFreeRpcCtx(pDst);
  SCH_RET(code);
}

L
Liu Jicong 已提交
1834 1835
int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet *epSet, int32_t msgType, void *msg,
                        uint32_t msgSize, bool persistHandle, SRpcCtx *ctx) {
D
dapan1121 已提交
1836
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
1837 1838 1839

  SSchTrans *trans = (SSchTrans *)transport;

D
dapan1121 已提交
1840 1841
  SMsgSendInfo *pMsgSendInfo = NULL;
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, msgType, &pMsgSendInfo));
D
dapan1121 已提交
1842 1843 1844

  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
L
Liu Jicong 已提交
1845
  pMsgSendInfo->msgInfo.handle = trans->transHandle;
D
dapan1121 已提交
1846
  pMsgSendInfo->msgType = msgType;
D
dapan1121 已提交
1847

L
Liu Jicong 已提交
1848 1849 1850 1851 1852
  qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "instance:%p, handle:%p", TMSG_INFO(msgType),
         ntohl(((SMsgHead *)msg)->vgId), epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, pJob->refId,
         trans->transInst, trans->transHandle);

  int64_t transporterId = 0;
D
dapan1121 已提交
1853
  code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
D
dapan1121 已提交
1854 1855 1856
  if (code) {
    SCH_ERR_JRET(code);
  }
D
dapan1121 已提交
1857

D
dapan1121 已提交
1858
  SCH_TASK_DLOG("req msg sent, refId:%" PRIx64 ", type:%d, %s", pJob->refId, msgType, TMSG_INFO(msgType));
D
dapan1121 已提交
1859 1860 1861
  return TSDB_CODE_SUCCESS;

_return:
L
Liu Jicong 已提交
1862

D
dapan1121 已提交
1863 1864 1865 1866
  if (pMsgSendInfo) {
    taosMemoryFreeClear(pMsgSendInfo->param);
    taosMemoryFreeClear(pMsgSendInfo);
  }
H
Hongze Cheng 已提交
1867

D
dapan1121 已提交
1868 1869 1870
  SCH_RET(code);
}

D
dapan1121 已提交
1871 1872
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
  SSchedulerHbReq req = {0};
L
Liu Jicong 已提交
1873 1874 1875 1876
  int32_t         code = 0;
  SRpcCtx         rpcCtx = {0};
  SSchTrans       trans = {0};
  int32_t         msgType = TDMT_VND_QUERY_HEARTBEAT;
D
dapan1121 已提交
1877

L
Liu Jicong 已提交
1878
  req.header.vgId = nodeEpId->nodeId;
D
dapan1121 已提交
1879 1880 1881 1882 1883
  req.sId = schMgmt.sId;
  memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));

  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
L
Liu Jicong 已提交
1884 1885
    qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
           nodeEpId->ep.port);
D
dapan1121 已提交
1886 1887 1888 1889 1890 1891 1892
    SCH_ERR_RET(code);
  }

  SCH_LOCK(SCH_WRITE, &hb->lock);
  code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx);
  memcpy(&trans, &hb->trans, sizeof(trans));
  SCH_UNLOCK(SCH_WRITE, &hb->lock);
L
Liu Jicong 已提交
1893

D
dapan1121 已提交
1894
  SCH_ERR_RET(code);
L
Liu Jicong 已提交
1895

D
dapan1121 已提交
1896 1897 1898 1899 1900
  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
  if (msgSize < 0) {
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
1901
  void *msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1902 1903 1904 1905
  if (NULL == msg) {
    qError("calloc hb req %d failed", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1906

D
dapan1121 已提交
1907 1908 1909 1910 1911
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1912
  SMsgSendInfo *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1913 1914 1915 1916 1917
  if (NULL == pMsgSendInfo) {
    qError("calloc SMsgSendInfo failed");
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1918
  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934
  if (NULL == param) {
    qError("calloc SSchTaskCallbackParam failed");
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));

  param->transport = trans.transInst;

  pMsgSendInfo->param = param;
  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
  pMsgSendInfo->msgInfo.handle = trans.transHandle;
  pMsgSendInfo->msgType = msgType;
  pMsgSendInfo->fp = fp;
L
Liu Jicong 已提交
1935 1936 1937

  int64_t transporterId = 0;
  SEpSet  epSet = {.inUse = 0, .numOfEps = 1};
D
dapan1121 已提交
1938
  memcpy(&epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
D
dapan1121 已提交
1939

L
Liu Jicong 已提交
1940 1941 1942
  qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle,
         nodeEpId->ep.fqdn, nodeEpId->ep.port);

D
dapan1121 已提交
1943 1944
  code = asyncSendMsgToServerExt(trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx);
  if (code) {
L
Liu Jicong 已提交
1945 1946
    qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s", trans.transInst,
           trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code));
D
dapan1121 已提交
1947 1948 1949
    SCH_ERR_JRET(code);
  }

D
dapan1121 已提交
1950
  qDebug("hb msg sent");
D
dapan1121 已提交
1951 1952 1953 1954
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
1955 1956 1957
  taosMemoryFreeClear(msg);
  taosMemoryFreeClear(param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1958 1959 1960 1961
  schFreeRpcCtx(&rpcCtx);
  SCH_RET(code);
}

D
dapan1121 已提交
1962
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
D
dapan1121 已提交
1963
  uint32_t msgSize = 0;
X
Xiaoyu Wang 已提交
1964
  void    *msg = NULL;
L
Liu Jicong 已提交
1965 1966 1967 1968 1969
  int32_t  code = 0;
  bool     isCandidateAddr = false;
  bool     persistHandle = false;
  SRpcCtx  rpcCtx = {0};

D
dapan1121 已提交
1970
  if (NULL == addr) {
D
dapan1121 已提交
1971
    addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
D
dapan1121 已提交
1972 1973 1974
    isCandidateAddr = true;
  }

L
Liu Jicong 已提交
1975
  SEpSet epSet = addr->epSet;
H
Haojun Liao 已提交
1976

D
dapan1121 已提交
1977
  switch (msgType) {
H
Haojun Liao 已提交
1978
    case TDMT_VND_CREATE_TABLE:
X
Xiaoyu Wang 已提交
1979
    case TDMT_VND_DROP_TABLE:
D
dapan1121 已提交
1980
    case TDMT_VND_SUBMIT: {
D
dapan1121 已提交
1981
      msgSize = pTask->msgLen;
wafwerar's avatar
wafwerar 已提交
1982
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1983 1984 1985 1986 1987 1988
      if (NULL == msg) {
        SCH_TASK_ELOG("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      memcpy(msg, pTask->msg, msgSize);
D
dapan1121 已提交
1989 1990
      break;
    }
1991

D
dapan1121 已提交
1992
    case TDMT_VND_QUERY: {
D
dapan1121 已提交
1993
      SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
L
Liu Jicong 已提交
1994

1995 1996
      uint32_t len = strlen(pJob->sql);
      msgSize = sizeof(SSubQueryMsg) + pTask->msgLen + len;
wafwerar's avatar
wafwerar 已提交
1997
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1998
      if (NULL == msg) {
D
dapan 已提交
1999
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2000 2001 2002 2003
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      SSubQueryMsg *pMsg = msg;
D
dapan 已提交
2004
      pMsg->header.vgId = htonl(addr->nodeId);
L
Liu Jicong 已提交
2005 2006 2007 2008 2009
      pMsg->sId = htobe64(schMgmt.sId);
      pMsg->queryId = htobe64(pJob->queryId);
      pMsg->taskId = htobe64(pTask->taskId);
      pMsg->refId = htobe64(pJob->refId);
      pMsg->taskType = TASK_TYPE_TEMP;
D
dapan1121 已提交
2010
      pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob);
L
Liu Jicong 已提交
2011 2012
      pMsg->phyLen = htonl(pTask->msgLen);
      pMsg->sqlLen = htonl(len);
2013 2014 2015

      memcpy(pMsg->msg, pJob->sql, len);
      memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
D
dapan1121 已提交
2016 2017

      persistHandle = true;
D
dapan1121 已提交
2018
      break;
2019 2020
    }

D
dapan1121 已提交
2021
    case TDMT_VND_RES_READY: {
S
Shengliang Guan 已提交
2022
      msgSize = sizeof(SResReadyReq);
wafwerar's avatar
wafwerar 已提交
2023
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2024
      if (NULL == msg) {
D
dapan 已提交
2025
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2026 2027 2028
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

S
Shengliang Guan 已提交
2029
      SResReadyReq *pMsg = msg;
L
Liu Jicong 已提交
2030 2031 2032 2033

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
2034
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
2035
      pMsg->taskId = htobe64(pTask->taskId);
D
dapan1121 已提交
2036 2037 2038
      break;
    }
    case TDMT_VND_FETCH: {
S
Shengliang Guan 已提交
2039
      msgSize = sizeof(SResFetchReq);
wafwerar's avatar
wafwerar 已提交
2040
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2041
      if (NULL == msg) {
D
dapan 已提交
2042
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2043 2044
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
2045

S
Shengliang Guan 已提交
2046
      SResFetchReq *pMsg = msg;
L
Liu Jicong 已提交
2047 2048 2049 2050

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
2051
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
2052
      pMsg->taskId = htobe64(pTask->taskId);
D
dapan1121 已提交
2053

D
dapan1121 已提交
2054 2055
      break;
    }
L
Liu Jicong 已提交
2056
    case TDMT_VND_DROP_TASK: {
S
Shengliang Guan 已提交
2057
      msgSize = sizeof(STaskDropReq);
wafwerar's avatar
wafwerar 已提交
2058
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2059
      if (NULL == msg) {
D
dapan 已提交
2060
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2061 2062
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
2063

S
Shengliang Guan 已提交
2064
      STaskDropReq *pMsg = msg;
L
Liu Jicong 已提交
2065 2066 2067 2068

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
2069
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
2070 2071
      pMsg->taskId = htobe64(pTask->taskId);
      pMsg->refId = htobe64(pJob->refId);
D
dapan1121 已提交
2072 2073 2074
      break;
    }
    case TDMT_VND_QUERY_HEARTBEAT: {
D
dapan1121 已提交
2075
      SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &rpcCtx));
L
Liu Jicong 已提交
2076

D
dapan1121 已提交
2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087
      SSchedulerHbReq req = {0};
      req.sId = schMgmt.sId;
      req.header.vgId = addr->nodeId;
      req.epId.nodeId = addr->nodeId;
      memcpy(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));

      msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
      if (msgSize < 0) {
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
wafwerar's avatar
wafwerar 已提交
2088
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2089 2090 2091 2092 2093 2094 2095 2096
      if (NULL == msg) {
        SCH_JOB_ELOG("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
      if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
2097 2098

      persistHandle = true;
D
dapan1121 已提交
2099 2100 2101
      break;
    }
    default:
D
dapan1121 已提交
2102
      SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType);
D
dapan1121 已提交
2103 2104 2105 2106
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      break;
  }

D
dapan1121 已提交
2107
  SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType);
D
dapan1121 已提交
2108

D
dapan1121 已提交
2109
  SSchTrans trans = {.transInst = pJob->transport, .transHandle = SCH_GET_TASK_HANDLE(pTask)};
L
Liu Jicong 已提交
2110 2111
  SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle,
                               (rpcCtx.args ? &rpcCtx : NULL)));
D
dapan1121 已提交
2112

D
dapan1121 已提交
2113 2114
  if (msgType == TDMT_VND_QUERY) {
    SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.transHandle));
D
dapan1121 已提交
2115
  }
L
Liu Jicong 已提交
2116

D
dapan1121 已提交
2117 2118 2119 2120
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
2121
  SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan1121 已提交
2122
  schFreeRpcCtx(&rpcCtx);
L
Liu Jicong 已提交
2123

wafwerar's avatar
wafwerar 已提交
2124
  taosMemoryFreeClear(msg);
D
dapan1121 已提交
2125 2126
  SCH_RET(code);
}
D
dapan1121 已提交
2127

D
dapan1121 已提交
2128 2129
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
L
Liu Jicong 已提交
2130
  SQueryNodeEpId  epId = {0};
D
dapan1121 已提交
2131 2132 2133

  epId.nodeId = addr->nodeId;
  memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
L
Liu Jicong 已提交
2134

D
dapan1121 已提交
2135
#if 1
D
dapan1121 已提交
2136 2137
  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
D
dapan1121 已提交
2138
    bool exist = false;
D
dapan1121 已提交
2139
    SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist));
D
dapan1121 已提交
2140
    if (!exist) {
D
dapan1121 已提交
2141
      SCH_ERR_RET(schBuildAndSendHbMsg(&epId));
D
dapan1121 已提交
2142
    }
D
dapan1121 已提交
2143
  }
D
dapan1121 已提交
2144
#endif
D
dapan1121 已提交
2145 2146 2147

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
2148

D
dapan1121 已提交
2149
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
2150
  int8_t  status = 0;
D
dapan1121 已提交
2151
  int32_t code = 0;
D
dapan1121 已提交
2152 2153

  atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
L
Liu Jicong 已提交
2154

D
dapan1121 已提交
2155
  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
2156
    SCH_TASK_DLOG("no need to launch task cause of job status, job status:%s", jobTaskStatusStr(status));
L
Liu Jicong 已提交
2157

D
dapan1121 已提交
2158
    SCH_RET(atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
2159
  }
D
dapan1121 已提交
2160 2161 2162 2163 2164 2165

  // NOTE: race condition: the task should be put into the hash table before send msg to server
  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) {
    SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING);
  }
L
Liu Jicong 已提交
2166

D
dapan1121 已提交
2167
  SSubplan *plan = pTask->plan;
D
dapan1121 已提交
2168

L
Liu Jicong 已提交
2169
  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
D
dapan1121 已提交
2170
    code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen);
D
dapan1121 已提交
2171
    if (TSDB_CODE_SUCCESS != code) {
L
Liu Jicong 已提交
2172 2173
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
                    pTask->msgLen);
D
dapan1121 已提交
2174
      SCH_ERR_RET(code);
2175
    } else {
D
dapan1121 已提交
2176
      SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg);
D
dapan1121 已提交
2177
    }
D
dapan1121 已提交
2178
  }
L
Liu Jicong 已提交
2179

D
dapan1121 已提交
2180
  SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
D
dapan1121 已提交
2181

D
dapan1121 已提交
2182 2183 2184
  if (SCH_IS_QUERY_JOB(pJob)) {
    SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
  }
L
Liu Jicong 已提交
2185

D
dapan1121 已提交
2186
  SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
L
Liu Jicong 已提交
2187

D
dapan1121 已提交
2188 2189 2190 2191 2192
  return TSDB_CODE_SUCCESS;
}

// Note: no more error processing, handled in function internal
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
2193
  bool    enough = false;
D
dapan1121 已提交
2194 2195
  int32_t code = 0;

D
dapan1121 已提交
2196 2197
  SCH_SET_TASK_HANDLE(pTask, NULL);

D
dapan1121 已提交
2198 2199 2200 2201 2202 2203 2204 2205 2206 2207
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));

    if (enough) {
      SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
    }
  } else {
    SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
  }

D
dapan1121 已提交
2208
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2209 2210

_return:
D
dapan1121 已提交
2211

D
dapan1121 已提交
2212
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
2213
}
D
dapan1121 已提交
2214

D
dapan1121 已提交
2215
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
D
dapan 已提交
2216
  for (int32_t i = 0; i < level->taskNum; ++i) {
D
dapan1121 已提交
2217
    SSchTask *pTask = taosArrayGet(level->subTasks, i);
D
dapan1121 已提交
2218

D
dapan1121 已提交
2219
    SCH_ERR_RET(schLaunchTask(pJob, pTask));
D
dapan1121 已提交
2220
  }
D
dapan1121 已提交
2221 2222 2223 2224 2225 2226

  return TSDB_CODE_SUCCESS;
}

int32_t schLaunchJob(SSchJob *pJob) {
  SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
L
Liu Jicong 已提交
2227

D
dapan1121 已提交
2228 2229 2230 2231 2232 2233
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_EXECUTING));

  SCH_ERR_RET(schCheckJobNeedFlowCtrl(pJob, level));

  SCH_ERR_RET(schLaunchLevelTasks(pJob, level));

D
dapan1121 已提交
2234
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2235 2236
}

D
dapan1121 已提交
2237
void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
2238
  if (NULL == pTask->execNodes) {
D
dapan1121 已提交
2239
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
2240 2241
    return;
  }
H
Haojun Liao 已提交
2242

D
dapan1121 已提交
2243
  int32_t size = (int32_t)taosArrayGetSize(pTask->execNodes);
L
Liu Jicong 已提交
2244

D
dapan1121 已提交
2245
  if (size <= 0) {
D
dapan1121 已提交
2246
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
2247 2248
    return;
  }
H
Haojun Liao 已提交
2249

D
dapan1121 已提交
2250
  SSchNodeInfo *nodeInfo = NULL;
D
dapan1121 已提交
2251
  for (int32_t i = 0; i < size; ++i) {
D
dapan1121 已提交
2252 2253
    nodeInfo = (SSchNodeInfo *)taosArrayGet(pTask->execNodes, i);
    SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
D
dapan1121 已提交
2254

D
dapan1121 已提交
2255
    schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK);
D
dapan1121 已提交
2256
  }
D
dapan1121 已提交
2257 2258

  SCH_TASK_DLOG("task has %d exec address", size);
D
dapan1121 已提交
2259 2260 2261
}

void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
D
dapan1121 已提交
2262 2263 2264 2265
  if (!SCH_IS_NEED_DROP_JOB(pJob)) {
    return;
  }

D
dapan1121 已提交
2266
  void *pIter = taosHashIterate(list, NULL);
D
dapan1121 已提交
2267
  while (pIter) {
D
dapan1121 已提交
2268
    SSchTask *pTask = *(SSchTask **)pIter;
H
Haojun Liao 已提交
2269

D
dapan1121 已提交
2270
    schDropTaskOnExecutedNode(pJob, pTask);
L
Liu Jicong 已提交
2271

D
dapan1121 已提交
2272
    pIter = taosHashIterate(list, pIter);
L
Liu Jicong 已提交
2273
  }
D
dapan1121 已提交
2274
}
H
Haojun Liao 已提交
2275

D
dapan1121 已提交
2276 2277 2278 2279
void schDropJobAllTasks(SSchJob *pJob) {
  schDropTaskInHashList(pJob, pJob->execTasks);
  schDropTaskInHashList(pJob, pJob->succTasks);
  schDropTaskInHashList(pJob, pJob->failTasks);
D
dapan1121 已提交
2280
}
2281

D
dapan1121 已提交
2282
int32_t schCancelJob(SSchJob *pJob) {
L
Liu Jicong 已提交
2283
  // TODO
2284
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
2285
  // TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
D
dapan1121 已提交
2286 2287
}

D
dapan1121 已提交
2288
void schCloseJobRef(void) {
2289
  if (!atomic_load_8((int8_t *)&schMgmt.exit)) {
D
dapan1121 已提交
2290 2291
    return;
  }
2292

D
dapan1121 已提交
2293 2294 2295 2296 2297 2298 2299 2300
  SCH_LOCK(SCH_WRITE, &schMgmt.lock);
  if (atomic_load_32(&schMgmt.jobNum) <= 0 && schMgmt.jobRef >= 0) {
    taosCloseRef(schMgmt.jobRef);
    schMgmt.jobRef = -1;
  }
  SCH_UNLOCK(SCH_WRITE, &schMgmt.lock);
}

D
dapan1121 已提交
2301 2302 2303 2304 2305 2306 2307
void schFreeJobImpl(void *job) {
  if (NULL == job) {
    return;
  }

  SSchJob *pJob = job;
  uint64_t queryId = pJob->queryId;
L
Liu Jicong 已提交
2308
  int64_t  refId = pJob->refId;
D
dapan1121 已提交
2309 2310 2311 2312 2313 2314 2315

  if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
    schCancelJob(pJob);
  }

  schDropJobAllTasks(pJob);

L
Liu Jicong 已提交
2316 2317
  pJob->subPlans = NULL;  // it is a reference to pDag->pSubplans

D
dapan1121 已提交
2318
  int32_t numOfLevels = taosArrayGetSize(pJob->levels);
L
Liu Jicong 已提交
2319
  for (int32_t i = 0; i < numOfLevels; ++i) {
D
dapan1121 已提交
2320 2321
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);

D
dapan1121 已提交
2322
    schFreeFlowCtrl(pLevel);
L
Liu Jicong 已提交
2323

D
dapan1121 已提交
2324
    int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
L
Liu Jicong 已提交
2325 2326
    for (int32_t j = 0; j < numOfTasks; ++j) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
D
dapan1121 已提交
2327 2328 2329 2330 2331
      schFreeTask(pTask);
    }

    taosArrayDestroy(pLevel->subTasks);
  }
L
Liu Jicong 已提交
2332

D
dapan1121 已提交
2333 2334 2335
  taosHashCleanup(pJob->execTasks);
  taosHashCleanup(pJob->failTasks);
  taosHashCleanup(pJob->succTasks);
L
Liu Jicong 已提交
2336

D
dapan1121 已提交
2337 2338
  taosArrayDestroy(pJob->levels);
  taosArrayDestroy(pJob->nodeList);
L
Liu Jicong 已提交
2339

D
dapan1121 已提交
2340 2341
  qExplainFreeCtx(pJob->explainCtx);

wafwerar's avatar
wafwerar 已提交
2342 2343
  taosMemoryFreeClear(pJob->resData);
  taosMemoryFreeClear(pJob);
D
dapan1121 已提交
2344

L
Liu Jicong 已提交
2345
  qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob);
D
dapan1121 已提交
2346 2347 2348 2349

  atomic_sub_fetch_32(&schMgmt.jobNum, 1);

  schCloseJobRef();
D
dapan1121 已提交
2350 2351
}

L
Liu Jicong 已提交
2352
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
D
dapan1121 已提交
2353
                              int64_t startTs, bool syncSchedule) {
L
Liu Jicong 已提交
2354
  qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
H
Haojun Liao 已提交
2355

D
dapan1121 已提交
2356
  if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) {
L
Liu Jicong 已提交
2357
    qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pDag->queryId);
D
dapan1121 已提交
2358 2359
  }

L
Liu Jicong 已提交
2360
  int32_t  code = 0;
D
dapan1121 已提交
2361 2362
  SSchJob *pJob = NULL;
  SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, syncSchedule));
D
dapan1121 已提交
2363

D
dapan1121 已提交
2364
  SCH_ERR_JRET(schLaunchJob(pJob));
2365

D
dapan1121 已提交
2366
  *job = pJob->refId;
L
Liu Jicong 已提交
2367

D
dapan 已提交
2368
  if (syncSchedule) {
D
dapan1121 已提交
2369
    SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2370
    tsem_wait(&pJob->rspSem);
D
dapan1121 已提交
2371 2372
  }

D
dapan1121 已提交
2373
  SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2374

D
dapan1121 已提交
2375
  schReleaseJob(pJob->refId);
L
Liu Jicong 已提交
2376

D
dapan1121 已提交
2377
  return TSDB_CODE_SUCCESS;
2378

D
dapan1121 已提交
2379
_return:
D
dapan1121 已提交
2380

D
dapan1121 已提交
2381
  schFreeJobImpl(pJob);
D
dapan1121 已提交
2382
  SCH_RET(code);
2383
}
D
dapan1121 已提交
2384

D
dapan1121 已提交
2385
int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
H
Hongze Cheng 已提交
2386
                             bool syncSchedule) {
D
dapan1121 已提交
2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397
  qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);

  int32_t  code = 0;
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
    qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->sql = sql;
  pJob->attr.queryJob = true;
D
dapan1121 已提交
2398
  pJob->attr.explainMode = pDag->explainInfo.mode;
D
dapan1121 已提交
2399 2400
  pJob->queryId = pDag->queryId;
  pJob->subPlans = pDag->pSubplans;
D
dapan1121 已提交
2401

D
dapan1121 已提交
2402
  SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData));
D
dapan1121 已提交
2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432

  int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
  if (refId < 0) {
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
    SCH_ERR_JRET(terrno);
  }

  if (NULL == schAcquireJob(refId)) {
    SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
    SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  pJob->refId = refId;

  SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);

  pJob->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
  *job = pJob->refId;
  SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));

  schReleaseJob(pJob->refId);

  return TSDB_CODE_SUCCESS;

_return:

  schFreeJobImpl(pJob);
  SCH_RET(code);
}

D
dapan1121 已提交
2433
int32_t schedulerInit(SSchedulerCfg *cfg) {
D
dapan1121 已提交
2434
  if (schMgmt.jobRef >= 0) {
D
dapan1121 已提交
2435 2436 2437 2438 2439 2440
    qError("scheduler already initialized");
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  if (cfg) {
    schMgmt.cfg = *cfg;
L
Liu Jicong 已提交
2441

D
dapan1121 已提交
2442
    if (schMgmt.cfg.maxJobNum == 0) {
D
dapan1121 已提交
2443
      schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
D
dapan1121 已提交
2444
    }
D
dapan1121 已提交
2445 2446 2447
    if (schMgmt.cfg.maxNodeTableNum <= 0) {
      schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
    }
D
dapan1121 已提交
2448
  } else {
D
dapan1121 已提交
2449 2450
    schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
    schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
D
dapan1121 已提交
2451
  }
L
Liu Jicong 已提交
2452

D
dapan1121 已提交
2453 2454
  schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl);
  if (schMgmt.jobRef < 0) {
D
dapan1121 已提交
2455 2456 2457 2458 2459 2460 2461
    qError("init schduler jobRef failed, num:%u", schMgmt.cfg.maxJobNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  schMgmt.hbConnections = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
  if (NULL == schMgmt.hbConnections) {
    qError("taosHashInit hb connections failed");
D
dapan1121 已提交
2462 2463 2464
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
2465
  if (taosGetSystemUUID((char *)&schMgmt.sId, sizeof(schMgmt.sId))) {
D
dapan1121 已提交
2466 2467 2468 2469
    qError("generate schdulerId failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR);
  }

L
Liu Jicong 已提交
2470 2471
  qInfo("scheduler %" PRIx64 " initizlized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum);

D
dapan1121 已提交
2472 2473 2474
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2475
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
D
dapan1121 已提交
2476
                         int64_t startTs, SQueryResult *pRes) {
H
Haojun Liao 已提交
2477
  if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
D
dapan1121 已提交
2478 2479 2480
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
2481
  if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
D
dapan1121 已提交
2482
    SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true));
D
dapan1121 已提交
2483
  } else {
D
dapan1121 已提交
2484
    SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true));
D
dapan1121 已提交
2485
  }
D
dapan1121 已提交
2486

D
dapan1121 已提交
2487
  SSchJob *job = schAcquireJob(*pJob);
D
dapan1121 已提交
2488

D
dapan1121 已提交
2489 2490
  pRes->code = atomic_load_32(&job->errCode);
  pRes->numOfRows = job->resNumOfRows;
L
Liu Jicong 已提交
2491

D
dapan1121 已提交
2492
  schReleaseJob(*pJob);
L
Liu Jicong 已提交
2493

D
dapan1121 已提交
2494 2495 2496
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2497
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, const char *sql, int64_t *pJob) {
2498
  if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
D
dapan1121 已提交
2499 2500 2501
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
2502 2503 2504 2505 2506
  if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
    SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false));
  } else {
    SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false));
  }
L
Liu Jicong 已提交
2507

D
dapan1121 已提交
2508
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2509 2510
}

L
Liu Jicong 已提交
2511
#if 0
X
Xiaoyu Wang 已提交
2512
int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks) {
X
Xiaoyu Wang 已提交
2513
  if (NULL == pDag || pDag->numOfSubplans <= 0 || LIST_LENGTH(pDag->pSubplans) == 0) {
D
dapan1121 已提交
2514 2515 2516
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

X
Xiaoyu Wang 已提交
2517
  int32_t levelNum = LIST_LENGTH(pDag->pSubplans);
D
dapan1121 已提交
2518 2519 2520 2521 2522
  if (1 != levelNum) {
    qError("invalid level num: %d", levelNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

X
Xiaoyu Wang 已提交
2523 2524
  SNodeListNode *plans = (SNodeListNode*)nodesListGetNode(pDag->pSubplans, 0);
  int32_t taskNum = LIST_LENGTH(plans->pNodeList);
D
dapan1121 已提交
2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541
  if (taskNum <= 0) {
    qError("invalid task num: %d", taskNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SArray *info = taosArrayInit(taskNum, sizeof(STaskInfo));
  if (NULL == info) {
    qError("taosArrayInit %d taskInfo failed", taskNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  STaskInfo tInfo = {0};
  char *msg = NULL;
  int32_t msgLen = 0;
  int32_t code = 0;
  
  for (int32_t i = 0; i < taskNum; ++i) {
X
Xiaoyu Wang 已提交
2542
    SSubplan *plan = (SSubplan*)nodesListGetNode(plans->pNodeList, i);
D
dapan1121 已提交
2543 2544 2545
    tInfo.addr = plan->execNode;

    code = qSubPlanToString(plan, &msg, &msgLen);
D
dapan1121 已提交
2546
    if (TSDB_CODE_SUCCESS != code) {
D
dapan1121 已提交
2547 2548 2549 2550 2551 2552 2553 2554 2555 2556
      qError("subplanToString error, code:%x, msg:%p, len:%d", code, msg, msgLen);
      SCH_ERR_JRET(code);
    }

    int32_t msgSize = sizeof(SSubQueryMsg) + msgLen;
    if (NULL == msg) {
      qError("calloc %d failed", msgSize);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
    
wafwerar's avatar
wafwerar 已提交
2557
    SSubQueryMsg* pMsg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2558
    
L
Liu Jicong 已提交
2559
    pMsg->header.vgId = tInfo.addr.nodeId;
D
dapan1121 已提交
2560
    
2561 2562 2563
    pMsg->sId      = schMgmt.sId;
    pMsg->queryId  = plan->id.queryId;
    pMsg->taskId   = schGenUUID();
D
dapan1121 已提交
2564
    pMsg->taskType = TASK_TYPE_PERSISTENT;
2565 2566
    pMsg->phyLen   = msgLen;
    pMsg->sqlLen   = 0;
L
Liu Jicong 已提交
2567
    memcpy(pMsg->msg, msg, msgLen);
L
fix tq  
Liu Jicong 已提交
2568
    /*memcpy(pMsg->msg, ((SSubQueryMsg*)msg)->msg, msgLen);*/
D
dapan1121 已提交
2569 2570 2571 2572 2573

    tInfo.msg = pMsg;

    if (NULL == taosArrayPush(info, &tInfo)) {
      qError("taosArrayPush failed, idx:%d", i);
wafwerar's avatar
wafwerar 已提交
2574
      taosMemoryFree(msg);
D
dapan1121 已提交
2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

  *pTasks = info;
  info = NULL;
  
_return:
  schedulerFreeTaskList(info);
  SCH_RET(code);
}

D
dapan1121 已提交
2587 2588 2589 2590 2591 2592
int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) {
  if (NULL == src || NULL == dst || copyNum <= 0) {
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
H
Haojun Liao 已提交
2593

D
dapan1121 已提交
2594 2595 2596 2597 2598 2599
  *dst = taosArrayInit(copyNum, sizeof(STaskInfo));
  if (NULL == *dst) {
    qError("taosArrayInit %d taskInfo failed", copyNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

2600
  int32_t msgSize = src->msg->phyLen + sizeof(*src->msg);
D
dapan1121 已提交
2601 2602 2603
  STaskInfo info = {0};

  info.addr = src->addr;
H
Haojun Liao 已提交
2604

D
dapan1121 已提交
2605
  for (int32_t i = 0; i < copyNum; ++i) {
wafwerar's avatar
wafwerar 已提交
2606
    info.msg = taosMemoryMalloc(msgSize);
D
dapan1121 已提交
2607 2608 2609 2610 2611 2612 2613 2614
    if (NULL == info.msg) {
      qError("malloc %d failed", msgSize);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    memcpy(info.msg, src->msg, msgSize);

    info.msg->taskId = schGenUUID();
H
Haojun Liao 已提交
2615

D
dapan1121 已提交
2616 2617
    if (NULL == taosArrayPush(*dst, &info)) {
      qError("taosArrayPush failed, idx:%d", i);
wafwerar's avatar
wafwerar 已提交
2618
      taosMemoryFree(info.msg);
D
dapan1121 已提交
2619 2620 2621 2622 2623 2624 2625 2626 2627 2628
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

  return TSDB_CODE_SUCCESS;

_return:

  schedulerFreeTaskList(*dst);
  *dst = NULL;
H
Haojun Liao 已提交
2629

D
dapan1121 已提交
2630 2631
  SCH_RET(code);
}
L
Liu Jicong 已提交
2632
#endif
D
dapan1121 已提交
2633

L
Liu Jicong 已提交
2634
int32_t schedulerFetchRows(int64_t job, void **pData) {
D
dapan1121 已提交
2635
  if (NULL == pData) {
D
dapan1121 已提交
2636
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan 已提交
2637 2638
  }

L
Liu Jicong 已提交
2639
  int32_t  code = 0;
D
dapan1121 已提交
2640
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2641 2642 2643 2644
  if (NULL == pJob) {
    qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
D
dapan1121 已提交
2645

D
dapan1121 已提交
2646 2647
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (status == JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
2648
    SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2649
    schReleaseJob(job);
D
dapan1121 已提交
2650
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
2651 2652
  }

D
dapan1121 已提交
2653
  if (!SCH_JOB_NEED_FETCH(pJob)) {
D
dapan1121 已提交
2654
    SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2655
    schReleaseJob(job);
D
dapan1121 已提交
2656
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
2657 2658
  }

D
dapan1121 已提交
2659 2660
  if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) {
    SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch));
D
dapan1121 已提交
2661
    schReleaseJob(job);
D
dapan1121 已提交
2662
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan 已提交
2663 2664
  }

D
dapan1121 已提交
2665
  if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
D
dapan1121 已提交
2666
    SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2667 2668
    SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
  } else if (status == JOB_TASK_STATUS_SUCCEED) {
D
dapan1121 已提交
2669
    SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2670 2671
    goto _return;
  } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
D
dapan1121 已提交
2672
    if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
D
dapan1121 已提交
2673 2674
      SCH_ERR_JRET(schFetchFromRemote(pJob));
      tsem_wait(&pJob->rspSem);
H
Hongze Cheng 已提交
2675
    }
D
dapan1121 已提交
2676 2677 2678
  } else {
    SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
    SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan 已提交
2679 2680
  }

D
dapan1121 已提交
2681
  status = SCH_GET_JOB_STATUS(pJob);
D
dapan 已提交
2682

D
dapan1121 已提交
2683
  if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
D
dapan1121 已提交
2684
    SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2685
    SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
D
dapan 已提交
2686
  }
L
Liu Jicong 已提交
2687

D
dapan1121 已提交
2688
  if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) {
D
dapan1121 已提交
2689
    SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED));
D
dapan 已提交
2690 2691
  }

D
dapan1121 已提交
2692
  while (true) {
D
dapan1121 已提交
2693 2694
    *pData = atomic_load_ptr(&pJob->resData);
    if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) {
D
dapan1121 已提交
2695 2696 2697 2698 2699
      continue;
    }

    break;
  }
D
dapan 已提交
2700

D
dapan1121 已提交
2701
  if (NULL == *pData) {
wafwerar's avatar
wafwerar 已提交
2702
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
D
dapan1121 已提交
2703 2704 2705 2706 2707
    if (rsp) {
      rsp->completed = 1;
    }

    *pData = rsp;
D
dapan1121 已提交
2708
    SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
D
dapan1121 已提交
2709
  }
D
dapan1121 已提交
2710

2711
  SCH_JOB_DLOG("fetch done, totalRows:%d, code:%s", pJob->resNumOfRows, tstrerror(code));
D
dapan1121 已提交
2712 2713 2714 2715

_return:

  atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
L
Liu Jicong 已提交
2716

D
dapan1121 已提交
2717
  schReleaseJob(job);
D
dapan 已提交
2718

D
dapan1121 已提交
2719
  SCH_RET(code);
D
dapan 已提交
2720
}
D
dapan1121 已提交
2721

D
dapan1121 已提交
2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
  int32_t  code = 0;
  SSchJob *pJob = schAcquireJob(job);
  if (NULL == pJob) {
    qDebug("acquire job from jobRef list failed, may not started or dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  if (pJob->status < JOB_TASK_STATUS_NOT_START || pJob->levelNum <= 0 || NULL == pJob->levels) {
    qDebug("job not initialized or not executable job, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  for (int32_t i = pJob->levelNum - 1; i >= 0; --i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
H
Hongze Cheng 已提交
2737

D
dapan1121 已提交
2738
    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
X
Xiaoyu Wang 已提交
2739
      SSchTask     *pTask = taosArrayGet(pLevel->subTasks, m);
D
dapan1121 已提交
2740
      SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status};
H
Hongze Cheng 已提交
2741

D
dapan1121 已提交
2742 2743 2744 2745 2746 2747 2748
      taosArrayPush(pSub, &subDesc);
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2749
int32_t scheduleCancelJob(int64_t job) {
D
dapan1121 已提交
2750
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2751 2752 2753 2754
  if (NULL == pJob) {
    qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
D
dapan1121 已提交
2755

D
dapan1121 已提交
2756 2757
  int32_t code = schCancelJob(pJob);

D
dapan1121 已提交
2758
  schReleaseJob(job);
D
dapan1121 已提交
2759 2760

  SCH_RET(code);
D
dapan1121 已提交
2761 2762
}

D
dapan1121 已提交
2763
void schedulerFreeJob(int64_t job) {
D
dapan1121 已提交
2764
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2765
  if (NULL == pJob) {
D
dapan1121 已提交
2766
    qDebug("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
D
dapan 已提交
2767 2768
    return;
  }
D
dapan1121 已提交
2769

D
dapan1121 已提交
2770 2771
  if (atomic_load_8(&pJob->userFetch) > 0) {
    schProcessOnJobDropped(pJob, TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
2772
  }
D
dapan1121 已提交
2773

D
dapan1121 已提交
2774
  SCH_JOB_DLOG("start to remove job from jobRef list, refId:%" PRIx64, job);
2775

D
dapan1121 已提交
2776 2777
  if (taosRemoveRef(schMgmt.jobRef, job)) {
    SCH_JOB_ELOG("remove job from job list failed, refId:%" PRIx64, job);
2778
  }
D
dapan1121 已提交
2779 2780

  schReleaseJob(job);
D
dapan1121 已提交
2781
}
D
dapan1121 已提交
2782 2783 2784 2785 2786 2787 2788 2789 2790

void schedulerFreeTaskList(SArray *taskList) {
  if (NULL == taskList) {
    return;
  }

  int32_t taskNum = taosArrayGetSize(taskList);
  for (int32_t i = 0; i < taskNum; ++i) {
    STaskInfo *info = taosArrayGet(taskList, i);
wafwerar's avatar
wafwerar 已提交
2791
    taosMemoryFreeClear(info->msg);
D
dapan1121 已提交
2792 2793 2794 2795
  }

  taosArrayDestroy(taskList);
}
L
Liu Jicong 已提交
2796

D
dapan1121 已提交
2797
void schedulerDestroy(void) {
2798 2799
  atomic_store_8((int8_t *)&schMgmt.exit, 1);

D
dapan1121 已提交
2800
  if (schMgmt.jobRef >= 0) {
D
dapan1121 已提交
2801
    SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0);
H
Hongze Cheng 已提交
2802
    int64_t  refId = 0;
C
Cary Xu 已提交
2803

D
dapan1121 已提交
2804
    while (pJob) {
D
dapan1121 已提交
2805
      refId = pJob->refId;
C
Cary Xu 已提交
2806 2807 2808
      if (refId == 0) {
        break;
      }
D
dapan1121 已提交
2809
      taosRemoveRef(schMgmt.jobRef, pJob->refId);
L
Liu Jicong 已提交
2810

D
dapan1121 已提交
2811
      pJob = taosIterateRef(schMgmt.jobRef, refId);
D
dapan1121 已提交
2812
    }
D
dapan1121 已提交
2813
  }
D
dapan1121 已提交
2814 2815

  if (schMgmt.hbConnections) {
H
Hongze Cheng 已提交
2816
    void *pIter = taosHashIterate(schMgmt.hbConnections, NULL);
D
dapan1121 已提交
2817 2818 2819 2820
    while (pIter != NULL) {
      SSchHbTrans *hb = pIter;
      schFreeRpcCtx(&hb->rpcCtx);
      pIter = taosHashIterate(schMgmt.hbConnections, pIter);
H
Hongze Cheng 已提交
2821
    }
D
dapan1121 已提交
2822 2823 2824
    taosHashCleanup(schMgmt.hbConnections);
    schMgmt.hbConnections = NULL;
  }
D
dapan1121 已提交
2825
}