scheduler.c 86.1 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15
 */

L
Liu Jicong 已提交
16
#include "catalog.h"
H
Hongze Cheng 已提交
17
#include "command.h"
L
Liu Jicong 已提交
18
#include "query.h"
D
dapan1121 已提交
19
#include "schedulerInt.h"
H
Hongze Cheng 已提交
20
#include "tmsg.h"
D
dapan1121 已提交
21
#include "tref.h"
D
dapan1121 已提交
22
#include "trpc.h"
23

D
dapan1121 已提交
24
SSchedulerMgmt schMgmt = {
25
    .jobRef = -1,
D
dapan1121 已提交
26
};
D
dapan1121 已提交
27

L
Liu Jicong 已提交
28
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); }
D
dapan1121 已提交
29

L
Liu Jicong 已提交
30
FORCE_INLINE int32_t schReleaseJob(int64_t refId) { return taosReleaseRef(schMgmt.jobRef, refId); }
D
dapan1121 已提交
31

L
Liu Jicong 已提交
32
uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }
D
dapan1121 已提交
33

L
Liu Jicong 已提交
34
#if 0
D
dapan1121 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
uint64_t schGenUUID(void) {
  static uint64_t hashId = 0;
  static int32_t requestSerialId = 0;

  if (hashId == 0) {
    char    uid[64];
    int32_t code = taosGetSystemUUID(uid, tListLen(uid));
    if (code != TSDB_CODE_SUCCESS) {
      qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    } else {
      hashId = MurmurHash3_32(uid, strlen(uid));
    }
  }

  int64_t ts      = taosGetTimestampMs();
  uint64_t pid    = taosGetPId();
  int32_t val     = atomic_add_fetch_32(&requestSerialId, 1);

  uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
  return id;
}
L
Liu Jicong 已提交
56
#endif
D
dapan1121 已提交
57

L
Liu Jicong 已提交
58 59 60
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
  pTask->plan = pPlan;
  pTask->level = pLevel;
D
dapan1121 已提交
61
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
D
dapan1121 已提交
62
  pTask->taskId = schGenTaskId();
D
dapan1121 已提交
63 64 65
  pTask->execNodes = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SSchNodeInfo));
  if (NULL == pTask->execNodes) {
    SCH_TASK_ELOG("taosArrayInit %d execNodes failed", SCH_MAX_CANDIDATE_EP_NUM);
D
dapan1121 已提交
66 67 68 69 70 71
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
72
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql,
D
dapan 已提交
73
                   int64_t startTs, bool needRes, bool syncSchedule) {
H
Hongze Cheng 已提交
74
  int32_t  code = 0;
75
  int64_t  refId = -1;
D
dapan1121 已提交
76 77 78 79 80 81 82 83
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
    qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->attr.explainMode = pDag->explainInfo.mode;
  pJob->attr.syncSchedule = syncSchedule;
D
dapan 已提交
84
  pJob->attr.needRes = needRes;
D
dapan1121 已提交
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
  pJob->transport = transport;
  pJob->sql = sql;

  if (pNodeList != NULL) {
    pJob->nodeList = taosArrayDup(pNodeList);
  }

  SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_JRET(qExecExplainBegin(pDag, &pJob->explainCtx, startTs));
  }

  pJob->execTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->execTasks) {
    SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->succTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->succTasks) {
    SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->failTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->failTasks) {
    SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  tsem_init(&pJob->rspSem, 0, 0);

D
dapan1121 已提交
121
  refId = taosAddRef(schMgmt.jobRef, pJob);
D
dapan1121 已提交
122 123 124 125 126
  if (refId < 0) {
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
    SCH_ERR_JRET(terrno);
  }

D
dapan1121 已提交
127
  atomic_add_fetch_32(&schMgmt.jobNum, 1);
128

D
dapan1121 已提交
129 130
  if (NULL == schAcquireJob(refId)) {
    SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
D
dapan1121 已提交
131
    SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144 145
  }

  pJob->refId = refId;

  SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);

  pJob->status = JOB_TASK_STATUS_NOT_START;

  *pSchJob = pJob;

  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
146 147 148 149 150
  if (refId < 0) {
    schFreeJobImpl(pJob);
  } else {
    taosRemoveRef(schMgmt.jobRef, refId);
  }
D
dapan1121 已提交
151 152 153
  SCH_RET(code);
}

D
dapan1121 已提交
154 155 156 157 158 159 160 161
void schFreeRpcCtx(SRpcCtx *pCtx) {
  if (NULL == pCtx) {
    return;
  }
  void *pIter = taosHashIterate(pCtx->args, NULL);
  while (pIter) {
    SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter;

D
dapan1121 已提交
162
    (*ctxVal->freeFunc)(ctxVal->val);
L
Liu Jicong 已提交
163

D
dapan1121 已提交
164 165
    pIter = taosHashIterate(pCtx->args, pIter);
  }
L
Liu Jicong 已提交
166

D
dapan1121 已提交
167
  taosHashCleanup(pCtx->args);
D
dapan1121 已提交
168

D
dapan1121 已提交
169 170
  if (pCtx->brokenVal.freeFunc) {
    (*pCtx->brokenVal.freeFunc)(pCtx->brokenVal.val);
D
dapan1121 已提交
171
  }
D
dapan1121 已提交
172 173
}

L
Liu Jicong 已提交
174
void schFreeTask(SSchTask *pTask) {
D
dapan1121 已提交
175 176 177 178
  if (pTask->candidateAddrs) {
    taosArrayDestroy(pTask->candidateAddrs);
  }

wafwerar's avatar
wafwerar 已提交
179
  taosMemoryFreeClear(pTask->msg);
D
dapan1121 已提交
180 181 182 183 184 185 186 187

  if (pTask->children) {
    taosArrayDestroy(pTask->children);
  }

  if (pTask->parents) {
    taosArrayDestroy(pTask->parents);
  }
H
Haojun Liao 已提交
188

D
dapan1121 已提交
189 190
  if (pTask->execNodes) {
    taosArrayDestroy(pTask->execNodes);
H
Haojun Liao 已提交
191
  }
D
dapan1121 已提交
192 193
}

D
dapan1121 已提交
194 195 196 197 198 199
static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (pStatus) {
    *pStatus = status;
  }

L
Liu Jicong 已提交
200 201 202
  return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED ||
          status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING ||
          status == JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
203 204
}

D
dapan1121 已提交
205
int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
D
dapan1121 已提交
206
  int32_t lastMsgType = SCH_GET_TASK_LASTMSG_TYPE(pTask);
D
dapan1121 已提交
207
  int32_t taskStatus = SCH_GET_TASK_STATUS(pTask);
D
dapan1121 已提交
208
  int32_t reqMsgType = msgType - 1;
D
dapan1121 已提交
209
  switch (msgType) {
D
dapan1121 已提交
210
    case TDMT_SCH_LINK_BROKEN:
D
dapan1121 已提交
211
    case TDMT_VND_EXPLAIN_RSP:
D
dapan1121 已提交
212
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
213
    case TDMT_VND_QUERY_RSP:  // query_rsp may be processed later than ready_rsp
L
Liu Jicong 已提交
214 215 216
      if (lastMsgType != reqMsgType && -1 != lastMsgType && TDMT_VND_FETCH != lastMsgType) {
        SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
217
      }
L
Liu Jicong 已提交
218

D
dapan1121 已提交
219
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
220 221
        SCH_TASK_DLOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
222
      }
L
Liu Jicong 已提交
223

D
dapan1121 已提交
224
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan1121 已提交
225
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
226 227
    case TDMT_VND_RES_READY_RSP:
      reqMsgType = TDMT_VND_QUERY;
D
dapan1121 已提交
228
      if (lastMsgType != reqMsgType && -1 != lastMsgType) {
L
Liu Jicong 已提交
229 230
        SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s",
                      (lastMsgType > 0 ? TMSG_INFO(lastMsgType) : "null"), TMSG_INFO(msgType));
D
dapan1121 已提交
231 232
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
233

D
dapan1121 已提交
234
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
235 236
        SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
237 238 239
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }

D
dapan1121 已提交
240 241 242
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
      return TSDB_CODE_SUCCESS;
    case TDMT_VND_FETCH_RSP:
L
Liu Jicong 已提交
243 244 245
      if (lastMsgType != reqMsgType && -1 != lastMsgType) {
        SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
246 247
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
248

D
dapan1121 已提交
249
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
250 251
        SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
252 253
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
254

D
dapan1121 已提交
255 256
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
257
    case TDMT_VND_CREATE_TABLE_RSP:
X
Xiaoyu Wang 已提交
258
    case TDMT_VND_DROP_TABLE_RSP:
D
dapan1121 已提交
259
    case TDMT_VND_SUBMIT_RSP:
D
dapan1121 已提交
260 261
      break;
    default:
D
dapan1121 已提交
262
      SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus));
D
dapan1121 已提交
263 264 265
      SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
266
  if (lastMsgType != reqMsgType) {
L
Liu Jicong 已提交
267 268
    SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                  TMSG_INFO(msgType));
D
dapan1121 已提交
269 270
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
L
Liu Jicong 已提交
271

D
dapan1121 已提交
272
  if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
273 274
    SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                  TMSG_INFO(msgType));
D
dapan1121 已提交
275 276 277
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

D
dapan1121 已提交
278
  SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan 已提交
279

D
dapan1121 已提交
280 281 282 283
  return TSDB_CODE_SUCCESS;
}

int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
D
dapan 已提交
284 285
  int32_t code = 0;

D
dapan1121 已提交
286
  int8_t oriStatus = 0;
D
dapan1121 已提交
287

D
dapan1121 已提交
288 289 290 291 292 293
  while (true) {
    oriStatus = SCH_GET_JOB_STATUS(pJob);

    if (oriStatus == newStatus) {
      SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
L
Liu Jicong 已提交
294

D
dapan1121 已提交
295 296 297 298 299
    switch (oriStatus) {
      case JOB_TASK_STATUS_NULL:
        if (newStatus != JOB_TASK_STATUS_NOT_START) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
300

D
dapan1121 已提交
301 302 303 304 305
        break;
      case JOB_TASK_STATUS_NOT_START:
        if (newStatus != JOB_TASK_STATUS_EXECUTING) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
306

D
dapan1121 已提交
307 308
        break;
      case JOB_TASK_STATUS_EXECUTING:
L
Liu Jicong 已提交
309 310 311
        if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED && newStatus != JOB_TASK_STATUS_FAILED &&
            newStatus != JOB_TASK_STATUS_CANCELLING && newStatus != JOB_TASK_STATUS_CANCELLED &&
            newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
312 313
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
314

D
dapan1121 已提交
315 316
        break;
      case JOB_TASK_STATUS_PARTIAL_SUCCEED:
L
Liu Jicong 已提交
317 318
        if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_SUCCEED &&
            newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
319 320
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
321

D
dapan1121 已提交
322 323 324 325 326 327 328
        break;
      case JOB_TASK_STATUS_SUCCEED:
      case JOB_TASK_STATUS_FAILED:
      case JOB_TASK_STATUS_CANCELLING:
        if (newStatus != JOB_TASK_STATUS_DROPPING) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
329

D
dapan1121 已提交
330 331 332
        break;
      case JOB_TASK_STATUS_CANCELLED:
      case JOB_TASK_STATUS_DROPPING:
D
dapan1121 已提交
333
        SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
334
        break;
L
Liu Jicong 已提交
335

D
dapan1121 已提交
336
      default:
D
dapan1121 已提交
337
        SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));
D
dapan 已提交
338
        SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
339 340 341 342 343
    }

    if (oriStatus != atomic_val_compare_exchange_8(&pJob->status, oriStatus, newStatus)) {
      continue;
    }
D
dapan 已提交
344

D
dapan1121 已提交
345
    SCH_JOB_DLOG("job status updated from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan1121 已提交
346

D
dapan1121 已提交
347 348
    break;
  }
D
dapan1121 已提交
349

D
dapan 已提交
350 351 352 353
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
354
  SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan 已提交
355
  SCH_ERR_RET(code);
356
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
357 358
}

D
dapan1121 已提交
359
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
D
dapan1121 已提交
360 361
  for (int32_t i = 0; i < pJob->levelNum; ++i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
L
Liu Jicong 已提交
362

D
dapan1121 已提交
363 364 365
    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
      SSubplan *pPlan = pTask->plan;
L
Liu Jicong 已提交
366 367
      int32_t   childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0;
      int32_t   parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0;
D
dapan1121 已提交
368 369

      if (childNum > 0) {
D
dapan1121 已提交
370 371 372 373
        if (pJob->levelIdx == pLevel->level) {
          SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
L
Liu Jicong 已提交
374

D
dapan1121 已提交
375 376 377
        pTask->children = taosArrayInit(childNum, POINTER_BYTES);
        if (NULL == pTask->children) {
          SCH_TASK_ELOG("taosArrayInit %d children failed", childNum);
D
dapan1121 已提交
378 379 380 381 382
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      for (int32_t n = 0; n < childNum; ++n) {
X
Xiaoyu Wang 已提交
383
        SSubplan  *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
X
Xiaoyu Wang 已提交
384
        SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
D
dapan 已提交
385
        if (NULL == childTask || NULL == *childTask) {
D
dapan1121 已提交
386
          SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
387 388 389
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan1121 已提交
390 391
        if (NULL == taosArrayPush(pTask->children, childTask)) {
          SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
392 393 394 395 396
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      if (parentNum > 0) {
D
dapan1121 已提交
397 398 399 400
        if (0 == pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
L
Liu Jicong 已提交
401

D
dapan1121 已提交
402 403 404
        pTask->parents = taosArrayInit(parentNum, POINTER_BYTES);
        if (NULL == pTask->parents) {
          SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
D
dapan1121 已提交
405 406
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
D
dapan1121 已提交
407 408 409 410 411
      } else {
        if (0 != pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:%d, parentNum:%d", pLevel->level, parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
D
dapan1121 已提交
412 413 414
      }

      for (int32_t n = 0; n < parentNum; ++n) {
X
Xiaoyu Wang 已提交
415
        SSubplan  *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
X
Xiaoyu Wang 已提交
416
        SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
D
dapan 已提交
417
        if (NULL == parentTask || NULL == *parentTask) {
D
dapan1121 已提交
418
          SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
419 420 421
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan1121 已提交
422 423
        if (NULL == taosArrayPush(pTask->parents, parentTask)) {
          SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
424 425
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
L
Liu Jicong 已提交
426
      }
D
dapan1121 已提交
427 428

      SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
D
dapan1121 已提交
429 430 431
    }
  }

D
dapan1121 已提交
432
  SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
D
dapan1121 已提交
433
  if (SCH_IS_QUERY_JOB(pJob) && pLevel->taskNum > 1) {
D
dapan1121 已提交
434
    SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
D
dapan 已提交
435 436 437
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

D
dapan1121 已提交
438 439 440
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
441
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
442
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
D
dapan1121 已提交
443
  if (NULL == addr) {
L
Liu Jicong 已提交
444 445
    SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
                  (int32_t)taosArrayGetSize(pTask->candidateAddrs));
D
dapan1121 已提交
446 447
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
D
dapan1121 已提交
448 449

  pTask->succeedAddr = *addr;
450

D
dapan1121 已提交
451
  return TSDB_CODE_SUCCESS;
452 453
}

D
dapan1121 已提交
454 455
int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle) {
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = handle};
L
Liu Jicong 已提交
456

D
dapan1121 已提交
457 458
  if (NULL == taosArrayPush(pTask->execNodes, &nodeInfo)) {
    SCH_TASK_ELOG("taosArrayPush nodeInfo to execNodes list failed, errno:%d", errno);
D
dapan1121 已提交
459 460 461
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
462 463
  SCH_TASK_DLOG("task execNode recorded, handle:%p", handle);

D
dapan1121 已提交
464
  return TSDB_CODE_SUCCESS;
465
}
D
dapan1121 已提交
466

X
Xiaoyu Wang 已提交
467
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
D
dapan1121 已提交
468
  int32_t code = 0;
D
dapan1121 已提交
469
  pJob->queryId = pDag->queryId;
L
Liu Jicong 已提交
470

D
dapan1121 已提交
471 472
  if (pDag->numOfSubplans <= 0) {
    SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
D
dapan 已提交
473 474
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
L
Liu Jicong 已提交
475

X
Xiaoyu Wang 已提交
476
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
477
  if (levelNum <= 0) {
D
dapan1121 已提交
478
    SCH_JOB_ELOG("invalid level num:%d", levelNum);
D
dapan1121 已提交
479
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
480 481
  }

L
Liu Jicong 已提交
482 483 484 485
  SHashObj *planToTask = taosHashInit(
      SCHEDULE_DEFAULT_MAX_TASK_NUM,
      taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
      HASH_NO_LOCK);
D
dapan1121 已提交
486
  if (NULL == planToTask) {
D
dapan1121 已提交
487
    SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
D
dapan1121 已提交
488 489
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
490

D
dapan1121 已提交
491 492
  pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
  if (NULL == pJob->levels) {
D
dapan1121 已提交
493
    SCH_JOB_ELOG("taosArrayInit %d failed", levelNum);
D
dapan1121 已提交
494
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
495 496
  }

D
dapan1121 已提交
497 498
  pJob->levelNum = levelNum;
  pJob->levelIdx = levelNum - 1;
499

D
dapan1121 已提交
500
  pJob->subPlans = pDag->pSubplans;
501

L
Liu Jicong 已提交
502
  SSchLevel      level = {0};
X
Xiaoyu Wang 已提交
503
  SNodeListNode *plans = NULL;
L
Liu Jicong 已提交
504
  int32_t        taskNum = 0;
X
Xiaoyu Wang 已提交
505
  SSchLevel     *pLevel = NULL;
506

D
dapan1121 已提交
507
  level.status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
508

509
  for (int32_t i = 0; i < levelNum; ++i) {
D
dapan1121 已提交
510
    if (NULL == taosArrayPush(pJob->levels, &level)) {
D
dapan1121 已提交
511
      SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i);
D
dapan1121 已提交
512 513 514
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

D
dapan1121 已提交
515
    pLevel = taosArrayGet(pJob->levels, i);
D
dapan1121 已提交
516
    pLevel->level = i;
L
Liu Jicong 已提交
517 518

    plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
D
dapan1121 已提交
519 520
    if (NULL == plans) {
      SCH_JOB_ELOG("empty level plan, level:%d", i);
D
dapan1121 已提交
521
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
522 523
    }

X
Xiaoyu Wang 已提交
524
    taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
D
dapan1121 已提交
525 526
    if (taskNum <= 0) {
      SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
D
dapan1121 已提交
527
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
528 529
    }

D
dapan1121 已提交
530
    pLevel->taskNum = taskNum;
L
Liu Jicong 已提交
531

D
dapan1121 已提交
532
    pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
D
dapan1121 已提交
533
    if (NULL == pLevel->subTasks) {
D
dapan1121 已提交
534
      SCH_JOB_ELOG("taosArrayInit %d failed", taskNum);
D
dapan1121 已提交
535
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
536
    }
L
Liu Jicong 已提交
537

D
dapan1121 已提交
538
    for (int32_t n = 0; n < taskNum; ++n) {
L
Liu Jicong 已提交
539
      SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
D
dapan 已提交
540

D
dapan1121 已提交
541
      SCH_SET_JOB_TYPE(pJob, plan->subplanType);
D
dapan1121 已提交
542 543 544

      SSchTask  task = {0};
      SSchTask *pTask = &task;
545

D
dapan1121 已提交
546
      SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel));
L
Liu Jicong 已提交
547

D
dapan1121 已提交
548
      void *p = taosArrayPush(pLevel->subTasks, &task);
D
dapan1121 已提交
549
      if (NULL == p) {
D
dapan1121 已提交
550
        SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
D
dapan1121 已提交
551 552
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
553

D
dapan1121 已提交
554
      if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
D
dapan1121 已提交
555
        SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
D
dapan1121 已提交
556 557
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
558 559

      ++pJob->taskNum;
D
dapan1121 已提交
560
    }
D
dapan1121 已提交
561

D
dapan1121 已提交
562
    SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum);
D
dapan1121 已提交
563
  }
D
dapan1121 已提交
564 565

  SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
D
dapan1121 已提交
566 567

_return:
D
dapan1121 已提交
568 569 570 571
  if (planToTask) {
    taosHashCleanup(planToTask);
  }

D
dapan1121 已提交
572
  SCH_RET(code);
573 574
}

D
dapan1121 已提交
575 576
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
  if (NULL != pTask->candidateAddrs) {
D
dapan 已提交
577 578 579
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
580
  pTask->candidateIdx = 0;
581
  pTask->candidateAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
D
dapan1121 已提交
582
  if (NULL == pTask->candidateAddrs) {
583
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CANDIDATE_EP_NUM);
D
dapan1121 已提交
584 585 586
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

L
Liu Jicong 已提交
587
  if (pTask->plan->execNode.epSet.numOfEps > 0) {
D
dapan1121 已提交
588 589
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
D
dapan1121 已提交
590 591 592
      SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

L
Liu Jicong 已提交
593
    SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
D
dapan1121 已提交
594

D
dapan1121 已提交
595 596 597 598
    return TSDB_CODE_SUCCESS;
  }

  int32_t addNum = 0;
D
dapan 已提交
599
  int32_t nodeNum = 0;
600
  if (pJob->nodeList) {
D
dapan 已提交
601
    nodeNum = taosArrayGetSize(pJob->nodeList);
L
Liu Jicong 已提交
602

603
    for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
D
dapan 已提交
604
      SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i);
L
Liu Jicong 已提交
605

606
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
D
dapan 已提交
607 608 609
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
610 611

      ++addNum;
D
dapan1121 已提交
612
    }
D
dapan1121 已提交
613 614
  }

D
dapan1121 已提交
615
  if (addNum <= 0) {
H
Haojun Liao 已提交
616
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
D
dapan1121 已提交
617
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
618 619
  }

L
Liu Jicong 已提交
620 621 622 623 624 625 626 627
  /*
    for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
      strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
      epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];

      ++epSet->numOfEps;
    }
  */
D
dapan 已提交
628 629

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
630
}
D
dapan1121 已提交
631

D
dapan1121 已提交
632
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
633 634 635
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
636
      SCH_TASK_ELOG("task already in execTask list, code:%x", code);
D
dapan1121 已提交
637 638
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }
L
Liu Jicong 已提交
639

D
dapan1121 已提交
640
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
D
dapan 已提交
641 642 643
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
644
  SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
645

D
dapan 已提交
646 647 648
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
649 650
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
651
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan 已提交
652 653
  } else {
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
D
dapan 已提交
654 655
  }

D
dapan1121 已提交
656 657 658 659
  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
D
dapan1121 已提交
660
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
661 662
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
663

D
dapan1121 已提交
664
    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno);
D
dapan1121 已提交
665 666 667 668
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
D
dapan1121 已提交
669 670

  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));
L
Liu Jicong 已提交
671

D
dapan1121 已提交
672 673 674
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
675 676
int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  *moved = false;
L
Liu Jicong 已提交
677

D
dapan1121 已提交
678
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
679
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
680 681
  }

D
dapan1121 已提交
682 683 684 685
  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
L
Liu Jicong 已提交
686

D
dapan1121 已提交
687
      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
688 689
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
690

D
dapan1121 已提交
691
    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno);
D
dapan 已提交
692 693 694 695
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
D
dapan1121 已提交
696 697

  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));
L
Liu Jicong 已提交
698

D
dapan 已提交
699 700 701
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
702 703
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
704
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
705 706 707 708 709 710
  }

  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
L
Liu Jicong 已提交
711

D
dapan1121 已提交
712
      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
713 714
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
715

D
dapan1121 已提交
716 717 718 719 720 721 722
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
L
Liu Jicong 已提交
723

D
dapan1121 已提交
724 725 726
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
727
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
D
dapan1121 已提交
728 729
  int8_t status = 0;
  ++pTask->tryTimes;
L
Liu Jicong 已提交
730

D
dapan1121 已提交
731 732 733 734 735
  if (schJobNeedToStop(pJob, &status)) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry cause of job status, job status:%s", jobTaskStatusStr(status));
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
736

D
dapan1121 已提交
737 738 739 740 741
  if (pTask->tryTimes >= REQUEST_MAX_TRY_TIMES) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry since reach max try times, tryTimes:%d", pTask->tryTimes);
    return TSDB_CODE_SUCCESS;
  }
L
Liu Jicong 已提交
742

D
dapan1121 已提交
743 744 745 746 747
  if (!NEED_SCHEDULER_RETRY_ERROR(errCode)) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
748

L
Liu Jicong 已提交
749
  // TODO CHECK epList/condidateList
D
dapan1121 已提交
750 751 752
  if (SCH_IS_DATA_SRC_TASK(pTask)) {
    if (pTask->tryTimes >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
      *needRetry = false;
L
Liu Jicong 已提交
753 754
      SCH_TASK_DLOG("task no more retry since all ep tried, tryTimes:%d, epNum:%d", pTask->tryTimes,
                    SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
D
dapan1121 已提交
755 756
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
757 758
  } else {
    int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
L
Liu Jicong 已提交
759

D
dapan1121 已提交
760
    if ((pTask->candidateIdx + 1) >= candidateNum) {
D
dapan1121 已提交
761
      *needRetry = false;
L
Liu Jicong 已提交
762 763
      SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
                    pTask->candidateIdx, candidateNum);
D
dapan1121 已提交
764 765 766 767
      return TSDB_CODE_SUCCESS;
    }
  }

D
dapan1121 已提交
768 769
  *needRetry = true;
  SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->tryTimes, errCode, tstrerror(errCode));
L
Liu Jicong 已提交
770

D
dapan1121 已提交
771
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
772 773 774 775 776 777 778
}

int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
  atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);

  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));
D
dapan1121 已提交
779
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
D
dapan1121 已提交
780 781
  }

D
dapan1121 已提交
782
  if (SCH_IS_DATA_SRC_TASK(pTask)) {
D
dapan1121 已提交
783 784 785 786 787 788 789 790
    SCH_SWITCH_EPSET(&pTask->plan->execNode);
  } else {
    ++pTask->candidateIdx;
  }

  SCH_ERR_RET(schLaunchTask(pJob, pTask));

  return TSDB_CODE_SUCCESS;
D
dapan 已提交
791 792
}

D
dapan1121 已提交
793
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
D
dapan1121 已提交
794 795 796 797 798 799
  int32_t      code = 0;
  SSchHbTrans *hb = NULL;

  hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
    qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
D
dapan1121 已提交
800
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
801 802
  }

D
dapan1121 已提交
803
  SCH_LOCK(SCH_WRITE, &hb->lock);
D
dapan1121 已提交
804
  memcpy(&hb->trans, trans, sizeof(*trans));
D
dapan1121 已提交
805
  SCH_UNLOCK(SCH_WRITE, &hb->lock);
D
dapan1121 已提交
806

L
Liu Jicong 已提交
807 808
  qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, handle:%p", schMgmt.sId,
         epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->transInst, trans->transHandle);
L
Liu Jicong 已提交
809

D
dapan1121 已提交
810 811 812
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
  if (TSDB_CODE_SUCCESS == errCode) {
    return;
  }

  int32_t origCode = atomic_load_32(&pJob->errCode);
  if (TSDB_CODE_SUCCESS == origCode) {
    if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) {
      goto _return;
    }

    origCode = atomic_load_32(&pJob->errCode);
  }

  if (NEED_CLIENT_HANDLE_ERROR(origCode)) {
    return;
  }
L
Liu Jicong 已提交
830

D
dapan1121 已提交
831
  if (NEED_CLIENT_HANDLE_ERROR(errCode)) {
D
dapan1121 已提交
832
    atomic_store_32(&pJob->errCode, errCode);
D
dapan1121 已提交
833
    goto _return;
D
dapan1121 已提交
834 835
  }

D
dapan1121 已提交
836
  return;
L
Liu Jicong 已提交
837 838

_return:
D
dapan1121 已提交
839 840 841 842 843 844 845 846 847 848

  SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode));
}

int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) {
  // if already FAILED, no more processing
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, status));

  schUpdateJobErrCode(pJob, errCode);

D
dapan1121 已提交
849
  if (atomic_load_8(&pJob->userFetch) || pJob->attr.syncSchedule) {
D
dapan1121 已提交
850 851 852
    tsem_post(&pJob->rspSem);
  }

D
dapan1121 已提交
853
  int32_t code = atomic_load_32(&pJob->errCode);
D
dapan1121 已提交
854

D
dapan1121 已提交
855
  SCH_JOB_DLOG("job failed with error: %s", tstrerror(code));
L
Liu Jicong 已提交
856

D
dapan1121 已提交
857
  SCH_RET(code);
D
dapan1121 已提交
858 859
}

D
dapan1121 已提交
860
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
861 862 863 864 865 866 867 868 869
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
  SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAILED, errCode));
}

// Note: no more error processing, handled in function internal
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) {
  SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_DROPPING, errCode));
}

D
dapan1121 已提交
870
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
871 872
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
  int32_t code = 0;
L
Liu Jicong 已提交
873

D
dapan1121 已提交
874
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_PARTIAL_SUCCEED));
D
dapan 已提交
875

D
dapan1121 已提交
876
  if (pJob->attr.syncSchedule) {
D
dapan 已提交
877
    tsem_post(&pJob->rspSem);
D
dapan 已提交
878
  }
L
Liu Jicong 已提交
879

D
dapan1121 已提交
880 881 882
  if (atomic_load_8(&pJob->userFetch)) {
    SCH_ERR_JRET(schFetchFromRemote(pJob));
  }
D
dapan 已提交
883

D
dapan 已提交
884
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
885 886 887

_return:

D
dapan1121 已提交
888
  SCH_RET(schProcessOnJobFailure(pJob, code));
D
dapan 已提交
889 890
}

891
void schProcessOnDataFetched(SSchJob *job) {
D
dapan 已提交
892 893
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
  tsem_post(&job->rspSem);
D
dapan 已提交
894 895
}

D
dapan1121 已提交
896
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
897
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
D
dapan1121 已提交
898
  int8_t status = 0;
L
Liu Jicong 已提交
899

D
dapan1121 已提交
900
  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
901
    SCH_TASK_DLOG("task failed not processed cause of job status, job status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
902 903 904
    SCH_RET(atomic_load_32(&pJob->errCode));
  }

L
Liu Jicong 已提交
905 906
  bool    needRetry = false;
  bool    moved = false;
D
dapan1121 已提交
907
  int32_t taskDone = 0;
D
dapan1121 已提交
908
  int32_t code = 0;
D
dapan1121 已提交
909

H
Haojun Liao 已提交
910
  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
L
Liu Jicong 已提交
911

D
dapan1121 已提交
912
  SCH_ERR_JRET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
L
Liu Jicong 已提交
913

D
dapan1121 已提交
914
  if (!needRetry) {
H
Haojun Liao 已提交
915
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
D
dapan1121 已提交
916 917

    if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
D
dapan1121 已提交
918 919
      SCH_ERR_JRET(schMoveTaskToFailList(pJob, pTask, &moved));
    } else {
D
dapan1121 已提交
920
      SCH_TASK_ELOG("task not in executing list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
921
      SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
922 923 924
    }

    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED);
L
Liu Jicong 已提交
925

D
dapan1121 已提交
926
    if (SCH_IS_WAIT_ALL_JOB(pJob)) {
D
dapan1121 已提交
927 928 929 930 931
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskFailed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);

D
dapan1121 已提交
932
      schUpdateJobErrCode(pJob, errCode);
L
Liu Jicong 已提交
933

D
dapan1121 已提交
934
      if (taskDone < pTask->level->taskNum) {
L
Liu Jicong 已提交
935
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
D
dapan 已提交
936
        SCH_RET(errCode);
D
dapan1121 已提交
937 938 939
      }
    }
  } else {
D
dapan1121 已提交
940
    SCH_ERR_JRET(schHandleTaskRetry(pJob, pTask));
L
Liu Jicong 已提交
941

D
dapan 已提交
942 943
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
944

D
dapan1121 已提交
945 946
_return:

D
dapan1121 已提交
947
  SCH_RET(schProcessOnJobFailure(pJob, errCode));
D
dapan1121 已提交
948 949
}

D
dapan1121 已提交
950
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
951
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
952
  bool    moved = false;
D
dapan1121 已提交
953 954
  int32_t code = 0;

D
dapan1121 已提交
955
  SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan 已提交
956

D
dapan1121 已提交
957
  SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved));
D
dapan1121 已提交
958

D
dapan1121 已提交
959
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED);
D
dapan1121 已提交
960

D
dapan1121 已提交
961
  SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask));
D
dapan1121 已提交
962 963

  SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
L
Liu Jicong 已提交
964

D
dapan1121 已提交
965
  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
D
dapan 已提交
966
  if (parentNum == 0) {
L
Liu Jicong 已提交
967
    int32_t taskDone = 0;
D
dapan1121 已提交
968
    if (SCH_IS_WAIT_ALL_JOB(pJob)) {
D
dapan1121 已提交
969 970 971 972
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskSucceed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
L
Liu Jicong 已提交
973

D
dapan1121 已提交
974
      if (taskDone < pTask->level->taskNum) {
S
Shengliang Guan 已提交
975
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
976
        return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
977
      } else if (taskDone > pTask->level->taskNum) {
D
dapan1121 已提交
978
        SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
979 980
      }

D
dapan1121 已提交
981
      if (pTask->level->taskFailed > 0) {
D
dapan1121 已提交
982 983 984
        SCH_RET(schProcessOnJobFailure(pJob, 0));
      } else {
        SCH_RET(schProcessOnJobPartialSuccess(pJob));
D
dapan1121 已提交
985 986
      }
    } else {
D
dapan1121 已提交
987
      pJob->resNode = pTask->succeedAddr;
D
dapan1121 已提交
988
    }
D
dapan 已提交
989

D
dapan1121 已提交
990
    pJob->fetchTask = pTask;
D
dapan1121 已提交
991

D
dapan1121 已提交
992
    SCH_ERR_JRET(schMoveTaskToExecList(pJob, pTask, &moved));
L
Liu Jicong 已提交
993

D
dapan1121 已提交
994
    SCH_RET(schProcessOnJobPartialSuccess(pJob));
D
dapan 已提交
995 996
  }

L
Liu Jicong 已提交
997 998 999 1000
  /*
    if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
      strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
      job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
D
dapan 已提交
1001

L
Liu Jicong 已提交
1002 1003 1004
      ++job->dataSrcEps.numOfEps;
    }
  */
D
dapan 已提交
1005

D
dapan 已提交
1006
  for (int32_t i = 0; i < parentNum; ++i) {
D
dapan1121 已提交
1007
    SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i);
L
Liu Jicong 已提交
1008
    int32_t   readyNum = atomic_add_fetch_32(&par->childReady, 1);
D
dapan 已提交
1009

D
dapan1121 已提交
1010
    SCH_LOCK(SCH_WRITE, &par->lock);
L
Liu Jicong 已提交
1011 1012 1013 1014
    SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE,
                                    .taskId = pTask->taskId,
                                    .schedId = schMgmt.sId,
                                    .addr = pTask->succeedAddr};
X
Xiaoyu Wang 已提交
1015
    qSetSubplanExecutionNode(par->plan, pTask->plan->id.groupId, &source);
D
dapan1121 已提交
1016
    SCH_UNLOCK(SCH_WRITE, &par->lock);
L
Liu Jicong 已提交
1017

D
dapan 已提交
1018
    if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) {
D
dapan1121 已提交
1019
      SCH_ERR_RET(schLaunchTaskImpl(pJob, par));
D
dapan 已提交
1020 1021 1022 1023 1024
    }
  }

  return TSDB_CODE_SUCCESS;

D
dapan1121 已提交
1025
_return:
D
dapan 已提交
1026

D
dapan1121 已提交
1027 1028
  SCH_RET(schProcessOnJobFailure(pJob, code));
}
D
dapan 已提交
1029

D
dapan1121 已提交
1030 1031 1032
// Note: no more error processing, handled in function internal
int32_t schFetchFromRemote(SSchJob *pJob) {
  int32_t code = 0;
L
Liu Jicong 已提交
1033

D
dapan1121 已提交
1034 1035 1036 1037 1038
  if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) {
    SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch));
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1039 1040
  void *resData = atomic_load_ptr(&pJob->resData);
  if (resData) {
D
dapan1121 已提交
1041 1042
    atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1043
    SCH_JOB_DLOG("res already fetched, res:%p", resData);
D
dapan1121 已提交
1044 1045 1046 1047 1048 1049
    return TSDB_CODE_SUCCESS;
  }

  SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH));

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1050

D
dapan1121 已提交
1051 1052 1053 1054
_return:

  atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1055
  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
D
dapan1121 已提交
1056 1057
}

D
dapan1121 已提交
1058 1059
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
  SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
H
Hongze Cheng 已提交
1060

D
dapan1121 已提交
1061 1062 1063 1064
  atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
  atomic_store_ptr(&pJob->resData, pRsp);

  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
H
Hongze Cheng 已提交
1065

D
dapan1121 已提交
1066 1067 1068 1069 1070
  schProcessOnDataFetched(pJob);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1071
// Note: no more task error processing, handled in function internal
L
Liu Jicong 已提交
1072 1073
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize,
                             int32_t rspCode) {
D
dapan1121 已提交
1074
  int32_t code = 0;
L
Liu Jicong 已提交
1075 1076
  int8_t  status = 0;

D
dapan1121 已提交
1077
  if (schJobNeedToStop(pJob, &status)) {
L
Liu Jicong 已提交
1078 1079
    SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status),
                  rspCode);
D
dapan1121 已提交
1080 1081
    SCH_RET(atomic_load_32(&pJob->errCode));
  }
H
Haojun Liao 已提交
1082

D
dapan1121 已提交
1083 1084
  SCH_ERR_JRET(schValidateTaskReceivedMsgType(pJob, pTask, msgType));

D
dapan1121 已提交
1085
  switch (msgType) {
H
Haojun Liao 已提交
1086
    case TDMT_VND_CREATE_TABLE_RSP: {
X
Xiaoyu Wang 已提交
1087 1088
      SVCreateTbBatchRsp batchRsp = {0};
      if (msg) {
H
Hongze Cheng 已提交
1089 1090
        SDecoder coder = {0};
        tDecoderInit(&coder, msg, msgSize);
1091 1092 1093 1094
        code = tDecodeSVCreateTbBatchRsp(&coder, &batchRsp);
        if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
          for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
            SVCreateTbRsp *rsp = batchRsp.pRsps + i;
X
Xiaoyu Wang 已提交
1095
            if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) {
H
Hongze Cheng 已提交
1096
              tDecoderClear(&coder);
X
Xiaoyu Wang 已提交
1097
              SCH_ERR_JRET(rsp->code);
1098 1099
            } else if (TSDB_CODE_SUCCESS != rsp->code) {
              code = rsp->code;
D
dapan 已提交
1100 1101
            }
          }
D
dapan1121 已提交
1102
        }
H
Hongze Cheng 已提交
1103
        tDecoderClear(&coder);
1104
        SCH_ERR_JRET(code);
L
Liu Jicong 已提交
1105 1106
      }

L
Liu Jicong 已提交
1107 1108 1109 1110
      SCH_ERR_JRET(rspCode);
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
      break;
    }
X
Xiaoyu Wang 已提交
1111 1112 1113
    case TDMT_VND_DROP_TABLE_RSP: {
      SVDropTbBatchRsp batchRsp = {0};
      if (msg) {
H
Hongze Cheng 已提交
1114 1115
        SDecoder coder = {0};
        tDecoderInit(&coder, msg, msgSize);
X
Xiaoyu Wang 已提交
1116
        code = tDecodeSVDropTbBatchRsp(&coder, &batchRsp);
1117 1118 1119
        if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
          for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
            SVDropTbRsp *rsp = batchRsp.pRsps + i;
X
Xiaoyu Wang 已提交
1120
            if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) {
H
Hongze Cheng 已提交
1121
              tDecoderClear(&coder);
X
Xiaoyu Wang 已提交
1122
              SCH_ERR_JRET(rsp->code);
1123 1124
            } else if (TSDB_CODE_SUCCESS != rsp->code) {
              code = rsp->code;
X
Xiaoyu Wang 已提交
1125 1126 1127
            }
          }
        }
H
Hongze Cheng 已提交
1128
        tDecoderClear(&coder);
X
Xiaoyu Wang 已提交
1129 1130 1131 1132 1133 1134 1135
        SCH_ERR_JRET(code);
      }

      SCH_ERR_JRET(rspCode);
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
      break;
    }
D
dapan1121 已提交
1136
    case TDMT_VND_SUBMIT_RSP: {
L
Liu Jicong 已提交
1137
      SCH_ERR_JRET(rspCode);
D
dapan1121 已提交
1138

D
dapan 已提交
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169
      if (msg) {
        SDecoder coder = {0};
        SSubmitRsp *rsp = taosMemoryMalloc(sizeof(*rsp));
        tDecoderInit(&coder, msg, msgSize);
        code = tDecodeSSubmitRsp(&coder, rsp);
        if (code) {
          SCH_TASK_ELOG("decode submitRsp failed, code:%d", code);
          tFreeSSubmitRsp(rsp);
          SCH_ERR_JRET(code);
        }
        
        atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
        SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows);
        
        if (pJob->attr.needRes) {
          SCH_LOCK(SCH_WRITE, &pJob->resLock);
          if (pJob->resData) {
            SSubmitRsp *sum = pJob->resData;
            sum->affectedRows += rsp->affectedRows;
            sum->nBlocks += rsp->nBlocks;
            sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks));
            memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks));
            taosMemoryFree(rsp->pBlocks);
            taosMemoryFree(rsp);
          } else {        
            pJob->resData = rsp;
          }
          SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
        } else {
          tFreeSSubmitRsp(rsp);
        }
L
Liu Jicong 已提交
1170
      }
D
dapan1121 已提交
1171

L
Liu Jicong 已提交
1172
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
D
dapan1121 已提交
1173

L
Liu Jicong 已提交
1174 1175
      break;
    }
D
dapan1121 已提交
1176
    case TDMT_VND_QUERY_RSP: {
L
Liu Jicong 已提交
1177 1178
      SQueryTableRsp rsp = {0};
      if (msg) {
D
dapan1121 已提交
1179
        SCH_ERR_JRET(tDeserializeSQueryTableRsp(msg, msgSize, &rsp));
L
Liu Jicong 已提交
1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191
        SCH_ERR_JRET(rsp.code);
      }

      SCH_ERR_JRET(rspCode);

      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }

      // SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY));

      break;
L
Liu Jicong 已提交
1192
    }
D
dapan1121 已提交
1193
    case TDMT_VND_RES_READY_RSP: {
L
Liu Jicong 已提交
1194 1195 1196 1197 1198
      SResReadyRsp *rsp = (SResReadyRsp *)msg;

      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
1199
      }
L
Liu Jicong 已提交
1200 1201
      SCH_ERR_JRET(rsp->code);
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
D
dapan1121 已提交
1202

L
Liu Jicong 已提交
1203 1204
      break;
    }
D
dapan1121 已提交
1205 1206 1207 1208 1209
    case TDMT_VND_EXPLAIN_RSP: {
      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }
H
Hongze Cheng 已提交
1210

D
dapan1121 已提交
1211 1212 1213 1214 1215 1216 1217 1218 1219 1220
      if (!SCH_IS_EXPLAIN_JOB(pJob)) {
        SCH_TASK_ELOG("invalid msg received for none explain query, msg type:%s", TMSG_INFO(msgType));
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }

      if (pJob->resData) {
        SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->resData);
        SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
      }

D
dapan1121 已提交
1221 1222 1223 1224 1225 1226
      SExplainRsp rsp = {0};
      if (tDeserializeSExplainRsp(msg, msgSize, &rsp)) {
        taosMemoryFree(rsp.subplanInfo);
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

D
dapan1121 已提交
1227
      SRetrieveTableRsp *pRsp = NULL;
D
dapan1121 已提交
1228
      SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, &rsp, pTask->plan->id.groupId, &pRsp));
D
dapan1121 已提交
1229 1230

      if (pRsp) {
D
dapan1121 已提交
1231
        SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
D
dapan1121 已提交
1232 1233 1234
      }
      break;
    }
L
Liu Jicong 已提交
1235 1236
    case TDMT_VND_FETCH_RSP: {
      SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
D
dapan1121 已提交
1237

L
Liu Jicong 已提交
1238 1239 1240 1241
      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }
D
dapan1121 已提交
1242

D
dapan1121 已提交
1243
      if (SCH_IS_EXPLAIN_JOB(pJob)) {
H
Hongze Cheng 已提交
1244
        if (rsp->completed) {
D
dapan1121 已提交
1245 1246 1247 1248 1249
          SRetrieveTableRsp *pRsp = NULL;
          SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
          if (pRsp) {
            SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
          }
H
Hongze Cheng 已提交
1250

D
dapan1121 已提交
1251 1252 1253
          return TSDB_CODE_SUCCESS;
        }

D
dapan1121 已提交
1254 1255
        atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1256 1257 1258 1259 1260
        SCH_ERR_JRET(schFetchFromRemote(pJob));

        return TSDB_CODE_SUCCESS;
      }

X
Xiaoyu Wang 已提交
1261 1262
      if (pJob->resData) {
        SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->resData);
wafwerar's avatar
wafwerar 已提交
1263
        taosMemoryFreeClear(rsp);
L
Liu Jicong 已提交
1264 1265
        SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
      }
H
Haojun Liao 已提交
1266

X
Xiaoyu Wang 已提交
1267
      atomic_store_ptr(&pJob->resData, rsp);
L
Liu Jicong 已提交
1268
      atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
D
dapan1121 已提交
1269

L
Liu Jicong 已提交
1270 1271
      if (rsp->completed) {
        SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
1272
      }
L
Liu Jicong 已提交
1273 1274 1275 1276 1277 1278

      SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);

      schProcessOnDataFetched(pJob);
      break;
    }
D
dapan1121 已提交
1279
    case TDMT_VND_DROP_TASK_RSP: {
L
Liu Jicong 已提交
1280 1281 1282 1283 1284
      // SHOULD NEVER REACH HERE
      SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:%" PRIx64, pJob->refId);
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
      break;
    }
D
dapan1121 已提交
1285 1286 1287 1288
    case TDMT_SCH_LINK_BROKEN:
      SCH_TASK_ELOG("link broken received, error:%x - %s", rspCode, tstrerror(rspCode));
      SCH_ERR_JRET(rspCode);
      break;
D
dapan1121 已提交
1289
    default:
D
dapan1121 已提交
1290
      SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%s", msgType, SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
1291
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
1292 1293 1294 1295 1296
  }

  return TSDB_CODE_SUCCESS;

_return:
D
dapan1121 已提交
1297

D
dapan1121 已提交
1298
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
1299 1300
}

D
dapan1121 已提交
1301
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
H
Hongze Cheng 已提交
1302 1303 1304 1305
  int32_t s = taosHashGetSize(pTaskList);
  if (s <= 0) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1306

H
Hongze Cheng 已提交
1307 1308 1309 1310
  SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
  if (NULL == task || NULL == (*task)) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1311

H
Hongze Cheng 已提交
1312 1313 1314
  *pTask = *task;

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1315 1316
}

D
dapan1121 已提交
1317
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) {
H
Hongze Cheng 已提交
1318 1319
  if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 ||
      taosArrayGetSize(pTask->execNodes) <= 0) {
D
dapan1121 已提交
1320 1321 1322 1323 1324 1325 1326 1327 1328
    return TSDB_CODE_SUCCESS;
  }

  SSchNodeInfo *nodeInfo = taosArrayGet(pTask->execNodes, 0);
  nodeInfo->handle = handle;

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1329
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
L
Liu Jicong 已提交
1330
  int32_t                code = 0;
D
dapan1121 已提交
1331
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
X
Xiaoyu Wang 已提交
1332
  SSchTask              *pTask = NULL;
L
Liu Jicong 已提交
1333

D
dapan1121 已提交
1334
  SSchJob *pJob = schAcquireJob(pParam->refId);
D
dapan1121 已提交
1335
  if (NULL == pJob) {
D
dapan1121 已提交
1336
    qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:%" PRIx64,
L
Liu Jicong 已提交
1337
          pParam->queryId, pParam->taskId, pParam->refId);
D
dapan1121 已提交
1338
    SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
1339 1340
  }

D
dapan1121 已提交
1341 1342 1343 1344 1345
  schGetTaskFromTaskList(pJob->execTasks, pParam->taskId, &pTask);
  if (NULL == pTask) {
    if (TDMT_VND_EXPLAIN_RSP == msgType) {
      schGetTaskFromTaskList(pJob->succTasks, pParam->taskId, &pTask);
    } else {
H
Hongze Cheng 已提交
1346 1347
      SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
                   pParam->taskId);
D
dapan1121 已提交
1348 1349
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }
D
dapan1121 已提交
1350
  }
H
Hongze Cheng 已提交
1351

D
dapan1121 已提交
1352
  if (NULL == pTask) {
H
Hongze Cheng 已提交
1353 1354
    SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
                 pParam->taskId);
D
dapan1121 已提交
1355 1356
    SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
D
dapan1121 已提交
1357

D
dapan1121 已提交
1358
  SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
L
Liu Jicong 已提交
1359

L
Liu Jicong 已提交
1360
  SCH_SET_TASK_HANDLE(pTask, pMsg->handle);
D
dapan1121 已提交
1361
  schUpdateTaskExecNodeHandle(pTask, pMsg->handle, rspCode);
D
dapan1121 已提交
1362
  SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
D
dapan1121 已提交
1363

H
Haojun Liao 已提交
1364
_return:
D
dapan1121 已提交
1365
  if (pJob) {
D
dapan1121 已提交
1366
    schReleaseJob(pParam->refId);
D
dapan1121 已提交
1367 1368
  }

wafwerar's avatar
wafwerar 已提交
1369
  taosMemoryFreeClear(param);
D
dapan1121 已提交
1370 1371 1372
  SCH_RET(code);
}

L
Liu Jicong 已提交
1373
int32_t schHandleSubmitCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1374 1375
  return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
}
H
Haojun Liao 已提交
1376

L
Liu Jicong 已提交
1377
int32_t schHandleCreateTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
H
Haojun Liao 已提交
1378 1379 1380
  return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
}

X
Xiaoyu Wang 已提交
1381 1382 1383 1384
int32_t schHandleDropTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_DROP_TABLE_RSP, code);
}

L
Liu Jicong 已提交
1385
int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1386 1387
  return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
H
Haojun Liao 已提交
1388

L
Liu Jicong 已提交
1389
int32_t schHandleFetchCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1390 1391
  return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
}
H
Haojun Liao 已提交
1392

L
Liu Jicong 已提交
1393
int32_t schHandleReadyCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1394 1395
  return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
}
H
Haojun Liao 已提交
1396

D
dapan1121 已提交
1397 1398 1399 1400
int32_t schHandleExplainCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_EXPLAIN_RSP, code);
}

L
Liu Jicong 已提交
1401
int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1402
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
L
Liu Jicong 已提交
1403
  qDebug("QID:%" PRIx64 ",TID:%" PRIx64 " drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code);
1404
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1405 1406
}

L
Liu Jicong 已提交
1407
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1408 1409 1410 1411
  if (code) {
    qError("hb rsp error:%s", tstrerror(code));
    SCH_ERR_RET(code);
  }
L
Liu Jicong 已提交
1412

D
dapan1121 已提交
1413 1414 1415 1416 1417 1418
  SSchedulerHbRsp rsp = {0};
  if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) {
    qError("invalid hb rsp msg, size:%d", pMsg->len);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
1419 1420
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;

D
dapan1121 已提交
1421 1422 1423
  SSchTrans trans = {0};
  trans.transInst = pParam->transport;
  trans.transHandle = pMsg->handle;
L
Liu Jicong 已提交
1424

D
dapan1121 已提交
1425
  SCH_ERR_RET(schUpdateHbConnection(&rsp.epId, &trans));
D
dapan1121 已提交
1426 1427

  int32_t taskNum = (int32_t)taosArrayGetSize(rsp.taskStatus);
L
Liu Jicong 已提交
1428 1429
  qDebug("%d task status in hb rsp, nodeId:%d, fqdn:%s, port:%d", taskNum, rsp.epId.nodeId, rsp.epId.ep.fqdn,
         rsp.epId.ep.port);
D
dapan1121 已提交
1430

D
dapan1121 已提交
1431 1432
  for (int32_t i = 0; i < taskNum; ++i) {
    STaskStatus *taskStatus = taosArrayGet(rsp.taskStatus, i);
L
Liu Jicong 已提交
1433

D
dapan1121 已提交
1434 1435
    SSchJob *pJob = schAcquireJob(taskStatus->refId);
    if (NULL == pJob) {
L
Liu Jicong 已提交
1436 1437 1438
      qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId,
            taskStatus->queryId, taskStatus->taskId);
      // TODO DROP TASK FROM SERVER!!!!
D
dapan1121 已提交
1439 1440
      continue;
    }
L
Liu Jicong 已提交
1441

D
dapan1121 已提交
1442
    // TODO
L
Liu Jicong 已提交
1443 1444 1445

    SCH_JOB_DLOG("TID:0x%" PRIx64 " task status in server: %s", taskStatus->taskId,
                 jobTaskStatusStr(taskStatus->status));
D
dapan1121 已提交
1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456

    schReleaseJob(taskStatus->refId);
  }

_return:

  tFreeSSchedulerHbRsp(&rsp);

  SCH_RET(code);
}

D
dapan1121 已提交
1457 1458 1459 1460
int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param;
  rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);

D
dapan1121 已提交
1461 1462
  qDebug("handle %p is broken", pMsg->handle);

D
dapan1121 已提交
1463 1464
  if (head->isHbParam) {
    SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param;
L
Liu Jicong 已提交
1465
    SSchTrans            trans = {.transInst = hbParam->transport, .transHandle = NULL};
D
dapan1121 已提交
1466 1467 1468 1469 1470 1471 1472 1473 1474 1475
    SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans));

    SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId));
  } else {
    SCH_ERR_RET(schHandleCallback(param, pMsg, TDMT_SCH_LINK_BROKEN, code));
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1476
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
D
dapan1121 已提交
1477
  switch (msgType) {
H
Haojun Liao 已提交
1478 1479 1480
    case TDMT_VND_CREATE_TABLE:
      *fp = schHandleCreateTableCallback;
      break;
X
Xiaoyu Wang 已提交
1481 1482 1483
    case TDMT_VND_DROP_TABLE:
      *fp = schHandleDropTableCallback;
      break;
L
Liu Jicong 已提交
1484
    case TDMT_VND_SUBMIT:
D
dapan1121 已提交
1485 1486
      *fp = schHandleSubmitCallback;
      break;
L
Liu Jicong 已提交
1487
    case TDMT_VND_QUERY:
D
dapan1121 已提交
1488 1489
      *fp = schHandleQueryCallback;
      break;
L
Liu Jicong 已提交
1490
    case TDMT_VND_RES_READY:
D
dapan1121 已提交
1491 1492
      *fp = schHandleReadyCallback;
      break;
D
dapan1121 已提交
1493 1494 1495
    case TDMT_VND_EXPLAIN:
      *fp = schHandleExplainCallback;
      break;
L
Liu Jicong 已提交
1496
    case TDMT_VND_FETCH:
D
dapan1121 已提交
1497 1498 1499 1500 1501
      *fp = schHandleFetchCallback;
      break;
    case TDMT_VND_DROP_TASK:
      *fp = schHandleDropCallback;
      break;
D
dapan1121 已提交
1502 1503 1504
    case TDMT_VND_QUERY_HEARTBEAT:
      *fp = schHandleHbCallback;
      break;
D
dapan1121 已提交
1505 1506 1507
    case TDMT_SCH_LINK_BROKEN:
      *fp = schHandleLinkBrokenCallback;
      break;
D
dapan1121 已提交
1508
    default:
D
dapan1121 已提交
1509
      qError("unknown msg type for callback, msgType:%d", msgType);
D
dapan1121 已提交
1510 1511 1512 1513 1514 1515
      SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1516
int32_t schGenerateTaskCallBackAHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, SMsgSendInfo **pMsgSendInfo) {
H
Hongze Cheng 已提交
1517
  int32_t       code = 0;
D
dapan1121 已提交
1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552
  SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (NULL == msgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));

  param->queryId = pJob->queryId;
  param->refId = pJob->refId;
  param->taskId = SCH_TASK_ID(pTask);
  param->transport = pJob->transport;

  msgSendInfo->param = param;
  msgSendInfo->fp = fp;

  *pMsgSendInfo = msgSendInfo;

  return TSDB_CODE_SUCCESS;

_return:

  taosMemoryFree(param);
  taosMemoryFree(msgSendInfo);

  SCH_RET(code);
}

D
dapan1121 已提交
1553
void schFreeRpcCtxVal(const void *arg) {
D
dapan1121 已提交
1554 1555 1556
  if (NULL == arg) {
    return;
  }
L
Liu Jicong 已提交
1557 1558

  SMsgSendInfo *pMsgSendInfo = (SMsgSendInfo *)arg;
D
dapan1121 已提交
1559 1560
  taosMemoryFreeClear(pMsgSendInfo->param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1561
}
D
dapan1121 已提交
1562

D
dapan1121 已提交
1563
int32_t schMakeTaskCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
D
dapan1121 已提交
1564
  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  param->queryId = pJob->queryId;
  param->refId = pJob->refId;
  param->taskId = SCH_TASK_ID(pTask);
  param->transport = pJob->transport;

  *pParam = param;

  return TSDB_CODE_SUCCESS;
}

int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
D
dapan1121 已提交
1581
  SSchHbCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  param->head.isHbParam = true;

  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);

  param->nodeEpId.nodeId = addr->nodeId;
  memcpy(&param->nodeEpId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
  param->transport = pJob->transport;

  *pParam = param;

  return TSDB_CODE_SUCCESS;
}

int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb) {
L
Liu Jicong 已提交
1601 1602 1603
  int32_t       code = 0;
  SMsgSendInfo *pMsgSendInfo = NULL;

D
dapan1121 已提交
1604
  pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615
  if (NULL == pMsgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  if (isHb) {
    SCH_ERR_JRET(schMakeHbCallbackParam(pJob, pTask, &pMsgSendInfo->param));
  } else {
    SCH_ERR_JRET(schMakeTaskCallbackParam(pJob, pTask, &pMsgSendInfo->param));
  }

L
Liu Jicong 已提交
1616
  int32_t              msgType = TDMT_SCH_LINK_BROKEN;
D
dapan1121 已提交
1617 1618
  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
L
Liu Jicong 已提交
1619

D
dapan1121 已提交
1620 1621 1622 1623
  pMsgSendInfo->fp = fp;

  brokenVal->msgType = msgType;
  brokenVal->val = pMsgSendInfo;
D
dapan1121 已提交
1624
  brokenVal->clone = schCloneSMsgSendInfo;
D
dapan1121 已提交
1625
  brokenVal->freeFunc = schFreeRpcCtxVal;
L
Liu Jicong 已提交
1626

D
dapan1121 已提交
1627 1628 1629 1630
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
1631 1632
  taosMemoryFreeClear(pMsgSendInfo->param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1633 1634 1635 1636

  SCH_RET(code);
}

D
dapan1121 已提交
1637
int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
H
Hongze Cheng 已提交
1638
  int32_t       code = 0;
D
dapan1121 已提交
1639 1640
  SMsgSendInfo *pReadyMsgSendInfo = NULL;
  SMsgSendInfo *pExplainMsgSendInfo = NULL;
D
dapan1121 已提交
1641 1642 1643 1644 1645 1646

  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pCtx->args) {
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1647

D
dapan1121 已提交
1648 1649
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_RES_READY, &pReadyMsgSendInfo));
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_EXPLAIN, &pExplainMsgSendInfo));
D
dapan1121 已提交
1650

H
Hongze Cheng 已提交
1651
  int32_t    msgType = TDMT_VND_RES_READY_RSP;
D
dapan1121 已提交
1652 1653 1654
  SRpcCtxVal ctxVal = {.val = pReadyMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
D
dapan1121 已提交
1655 1656 1657
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1658 1659
  msgType = TDMT_VND_EXPLAIN_RSP;
  ctxVal.val = pExplainMsgSendInfo;
D
dapan1121 已提交
1660 1661 1662 1663
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
1664

D
dapan1121 已提交
1665 1666
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, false));

D
dapan1121 已提交
1667 1668 1669 1670 1671
  return TSDB_CODE_SUCCESS;

_return:

  taosHashCleanup(pCtx->args);
H
Hongze Cheng 已提交
1672

D
dapan1121 已提交
1673 1674 1675 1676 1677 1678 1679 1680 1681
  if (pReadyMsgSendInfo) {
    taosMemoryFreeClear(pReadyMsgSendInfo->param);
    taosMemoryFreeClear(pReadyMsgSendInfo);
  }

  if (pExplainMsgSendInfo) {
    taosMemoryFreeClear(pExplainMsgSendInfo->param);
    taosMemoryFreeClear(pExplainMsgSendInfo);
  }
D
dapan1121 已提交
1682 1683 1684 1685 1686

  SCH_RET(code);
}

int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
L
Liu Jicong 已提交
1687
  int32_t              code = 0;
D
dapan1121 已提交
1688
  SSchHbCallbackParam *param = NULL;
X
Xiaoyu Wang 已提交
1689 1690
  SMsgSendInfo        *pMsgSendInfo = NULL;
  SQueryNodeAddr      *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
L
Liu Jicong 已提交
1691
  SQueryNodeEpId       epId = {0};
D
dapan1121 已提交
1692 1693 1694

  epId.nodeId = addr->nodeId;
  memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
D
dapan1121 已提交
1695 1696 1697 1698 1699 1700

  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pCtx->args) {
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1701

D
dapan1121 已提交
1702
  pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1703 1704 1705 1706 1707
  if (NULL == pMsgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1708
  param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1709
  if (NULL == param) {
D
dapan1121 已提交
1710
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1711 1712 1713
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

L
Liu Jicong 已提交
1714
  int32_t              msgType = TDMT_VND_QUERY_HEARTBEAT_RSP;
D
dapan1121 已提交
1715 1716 1717
  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_QUERY_HEARTBEAT, &fp));

D
dapan1121 已提交
1718
  param->nodeEpId = epId;
D
dapan1121 已提交
1719
  param->transport = pJob->transport;
L
Liu Jicong 已提交
1720

D
dapan1121 已提交
1721 1722 1723
  pMsgSendInfo->param = param;
  pMsgSendInfo->fp = fp;

D
dapan1121 已提交
1724
  SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
D
dapan1121 已提交
1725 1726 1727 1728 1729
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1730 1731
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, true));

D
dapan1121 已提交
1732 1733 1734 1735 1736
  return TSDB_CODE_SUCCESS;

_return:

  taosHashCleanup(pCtx->args);
D
dapan1121 已提交
1737 1738
  taosMemoryFreeClear(param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1739 1740 1741 1742

  SCH_RET(code);
}

D
dapan1121 已提交
1743
int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
L
Liu Jicong 已提交
1744
  int32_t     code = 0;
D
dapan1121 已提交
1745 1746 1747
  SSchHbTrans hb = {0};

  hb.trans.transInst = pJob->transport;
L
Liu Jicong 已提交
1748

D
dapan1121 已提交
1749 1750 1751 1752 1753
  SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx));

  code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans));
  if (code) {
    schFreeRpcCtx(&hb.rpcCtx);
L
Liu Jicong 已提交
1754

D
dapan1121 已提交
1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768
    if (HASH_NODE_EXIST(code)) {
      *exist = true;
      return TSDB_CODE_SUCCESS;
    }

    qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
    SCH_ERR_RET(code);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHeader **pDst) {
  if (pSrc->isHbParam) {
D
dapan1121 已提交
1769
    SSchHbCallbackParam *dst = taosMemoryMalloc(sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780
    if (NULL == dst) {
      qError("malloc SSchHbCallbackParam failed");
      SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    memcpy(dst, pSrc, sizeof(*dst));
    *pDst = (SSchCallbackParamHeader *)dst;

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1781
  SSchTaskCallbackParam *dst = taosMemoryMalloc(sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
1782 1783 1784 1785
  if (NULL == dst) {
    qError("malloc SSchTaskCallbackParam failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1786

D
dapan1121 已提交
1787 1788
  memcpy(dst, pSrc, sizeof(*dst));
  *pDst = (SSchCallbackParamHeader *)dst;
L
Liu Jicong 已提交
1789

D
dapan1121 已提交
1790 1791 1792
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1793 1794
int32_t schCloneSMsgSendInfo(void *src, void **dst) {
  SMsgSendInfo *pSrc = src;
L
Liu Jicong 已提交
1795
  int32_t       code = 0;
D
dapan1121 已提交
1796
  SMsgSendInfo *pDst = taosMemoryMalloc(sizeof(*pSrc));
D
dapan1121 已提交
1797
  if (NULL == pDst) {
D
dapan1121 已提交
1798 1799 1800 1801
    qError("malloc SMsgSendInfo for rpcCtx failed, len:%d", (int32_t)sizeof(*pSrc));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1802 1803
  memcpy(pDst, pSrc, sizeof(*pSrc));
  pDst->param = NULL;
D
dapan1121 已提交
1804

D
dapan1121 已提交
1805
  SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
D
dapan1121 已提交
1806

D
dapan1121 已提交
1807
  *dst = pDst;
D
dapan1121 已提交
1808

D
dapan1121 已提交
1809
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1810

D
dapan1121 已提交
1811 1812
_return:

D
dapan1121 已提交
1813
  taosMemoryFreeClear(pDst);
D
dapan1121 已提交
1814 1815 1816 1817 1818 1819 1820
  SCH_RET(code);
}

int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
  int32_t code = 0;
  memcpy(&pDst->brokenVal, &pSrc->brokenVal, sizeof(pSrc->brokenVal));
  pDst->brokenVal.val = NULL;
L
Liu Jicong 已提交
1821

D
dapan1121 已提交
1822
  SCH_ERR_RET(schCloneSMsgSendInfo(pSrc->brokenVal.val, &pDst->brokenVal.val));
D
dapan1121 已提交
1823 1824 1825 1826 1827 1828 1829 1830

  pDst->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pDst->args) {
    qError("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SRpcCtxVal dst = {0};
X
Xiaoyu Wang 已提交
1831
  void      *pIter = taosHashIterate(pSrc->args, NULL);
D
dapan1121 已提交
1832 1833
  while (pIter) {
    SRpcCtxVal *pVal = (SRpcCtxVal *)pIter;
X
Xiaoyu Wang 已提交
1834
    int32_t    *msgType = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
1835

D
dapan1121 已提交
1836 1837
    dst = *pVal;
    dst.val = NULL;
L
Liu Jicong 已提交
1838

D
dapan1121 已提交
1839
    SCH_ERR_JRET(schCloneSMsgSendInfo(pVal->val, &dst.val));
L
Liu Jicong 已提交
1840

D
dapan1121 已提交
1841
    if (taosHashPut(pDst->args, msgType, sizeof(*msgType), &dst, sizeof(dst))) {
D
dapan1121 已提交
1842
      qError("taosHashPut msg %d to rpcCtx failed", *msgType);
D
dapan1121 已提交
1843
      (*dst.freeFunc)(dst.val);
D
dapan1121 已提交
1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    pIter = taosHashIterate(pSrc->args, pIter);
  }

  return TSDB_CODE_SUCCESS;

_return:

  schFreeRpcCtx(pDst);
  SCH_RET(code);
}

L
Liu Jicong 已提交
1858 1859
int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet *epSet, int32_t msgType, void *msg,
                        uint32_t msgSize, bool persistHandle, SRpcCtx *ctx) {
D
dapan1121 已提交
1860
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
1861 1862 1863

  SSchTrans *trans = (SSchTrans *)transport;

D
dapan1121 已提交
1864 1865
  SMsgSendInfo *pMsgSendInfo = NULL;
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, msgType, &pMsgSendInfo));
D
dapan1121 已提交
1866 1867 1868

  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
L
Liu Jicong 已提交
1869
  pMsgSendInfo->msgInfo.handle = trans->transHandle;
D
dapan1121 已提交
1870
  pMsgSendInfo->msgType = msgType;
D
dapan1121 已提交
1871

L
Liu Jicong 已提交
1872 1873 1874 1875 1876
  qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "instance:%p, handle:%p", TMSG_INFO(msgType),
         ntohl(((SMsgHead *)msg)->vgId), epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, pJob->refId,
         trans->transInst, trans->transHandle);

  int64_t transporterId = 0;
D
dapan1121 已提交
1877
  code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
D
dapan1121 已提交
1878 1879 1880
  if (code) {
    SCH_ERR_JRET(code);
  }
D
dapan1121 已提交
1881

D
dapan1121 已提交
1882
  SCH_TASK_DLOG("req msg sent, refId:%" PRIx64 ", type:%d, %s", pJob->refId, msgType, TMSG_INFO(msgType));
D
dapan1121 已提交
1883 1884 1885
  return TSDB_CODE_SUCCESS;

_return:
L
Liu Jicong 已提交
1886

D
dapan1121 已提交
1887 1888 1889 1890
  if (pMsgSendInfo) {
    taosMemoryFreeClear(pMsgSendInfo->param);
    taosMemoryFreeClear(pMsgSendInfo);
  }
H
Hongze Cheng 已提交
1891

D
dapan1121 已提交
1892 1893 1894
  SCH_RET(code);
}

D
dapan1121 已提交
1895 1896
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
  SSchedulerHbReq req = {0};
L
Liu Jicong 已提交
1897 1898 1899 1900
  int32_t         code = 0;
  SRpcCtx         rpcCtx = {0};
  SSchTrans       trans = {0};
  int32_t         msgType = TDMT_VND_QUERY_HEARTBEAT;
D
dapan1121 已提交
1901

L
Liu Jicong 已提交
1902
  req.header.vgId = nodeEpId->nodeId;
D
dapan1121 已提交
1903 1904 1905 1906 1907
  req.sId = schMgmt.sId;
  memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));

  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
L
Liu Jicong 已提交
1908 1909
    qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
           nodeEpId->ep.port);
D
dapan1121 已提交
1910 1911 1912 1913 1914 1915 1916
    SCH_ERR_RET(code);
  }

  SCH_LOCK(SCH_WRITE, &hb->lock);
  code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx);
  memcpy(&trans, &hb->trans, sizeof(trans));
  SCH_UNLOCK(SCH_WRITE, &hb->lock);
L
Liu Jicong 已提交
1917

D
dapan1121 已提交
1918
  SCH_ERR_RET(code);
L
Liu Jicong 已提交
1919

D
dapan1121 已提交
1920 1921 1922 1923 1924
  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
  if (msgSize < 0) {
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
1925
  void *msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1926 1927 1928 1929
  if (NULL == msg) {
    qError("calloc hb req %d failed", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1930

D
dapan1121 已提交
1931 1932 1933 1934 1935
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1936
  SMsgSendInfo *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1937 1938 1939 1940 1941
  if (NULL == pMsgSendInfo) {
    qError("calloc SMsgSendInfo failed");
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1942
  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958
  if (NULL == param) {
    qError("calloc SSchTaskCallbackParam failed");
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));

  param->transport = trans.transInst;

  pMsgSendInfo->param = param;
  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
  pMsgSendInfo->msgInfo.handle = trans.transHandle;
  pMsgSendInfo->msgType = msgType;
  pMsgSendInfo->fp = fp;
L
Liu Jicong 已提交
1959 1960 1961

  int64_t transporterId = 0;
  SEpSet  epSet = {.inUse = 0, .numOfEps = 1};
D
dapan1121 已提交
1962
  memcpy(&epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
D
dapan1121 已提交
1963

L
Liu Jicong 已提交
1964 1965 1966
  qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle,
         nodeEpId->ep.fqdn, nodeEpId->ep.port);

D
dapan1121 已提交
1967 1968
  code = asyncSendMsgToServerExt(trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx);
  if (code) {
L
Liu Jicong 已提交
1969 1970
    qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s", trans.transInst,
           trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code));
D
dapan1121 已提交
1971 1972 1973
    SCH_ERR_JRET(code);
  }

D
dapan1121 已提交
1974
  qDebug("hb msg sent");
D
dapan1121 已提交
1975 1976 1977 1978
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
1979 1980 1981
  taosMemoryFreeClear(msg);
  taosMemoryFreeClear(param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1982 1983 1984 1985
  schFreeRpcCtx(&rpcCtx);
  SCH_RET(code);
}

D
dapan1121 已提交
1986
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
D
dapan1121 已提交
1987
  uint32_t msgSize = 0;
X
Xiaoyu Wang 已提交
1988
  void    *msg = NULL;
L
Liu Jicong 已提交
1989 1990 1991 1992 1993
  int32_t  code = 0;
  bool     isCandidateAddr = false;
  bool     persistHandle = false;
  SRpcCtx  rpcCtx = {0};

D
dapan1121 已提交
1994
  if (NULL == addr) {
D
dapan1121 已提交
1995
    addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
D
dapan1121 已提交
1996 1997 1998
    isCandidateAddr = true;
  }

L
Liu Jicong 已提交
1999
  SEpSet epSet = addr->epSet;
H
Haojun Liao 已提交
2000

D
dapan1121 已提交
2001
  switch (msgType) {
H
Haojun Liao 已提交
2002
    case TDMT_VND_CREATE_TABLE:
X
Xiaoyu Wang 已提交
2003
    case TDMT_VND_DROP_TABLE:
D
dapan1121 已提交
2004
    case TDMT_VND_SUBMIT: {
D
dapan1121 已提交
2005
      msgSize = pTask->msgLen;
wafwerar's avatar
wafwerar 已提交
2006
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2007 2008 2009 2010 2011 2012
      if (NULL == msg) {
        SCH_TASK_ELOG("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      memcpy(msg, pTask->msg, msgSize);
D
dapan1121 已提交
2013 2014
      break;
    }
2015

D
dapan1121 已提交
2016
    case TDMT_VND_QUERY: {
D
dapan1121 已提交
2017
      SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
L
Liu Jicong 已提交
2018

2019 2020
      uint32_t len = strlen(pJob->sql);
      msgSize = sizeof(SSubQueryMsg) + pTask->msgLen + len;
wafwerar's avatar
wafwerar 已提交
2021
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2022
      if (NULL == msg) {
D
dapan 已提交
2023
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2024 2025 2026 2027
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      SSubQueryMsg *pMsg = msg;
D
dapan 已提交
2028
      pMsg->header.vgId = htonl(addr->nodeId);
L
Liu Jicong 已提交
2029 2030 2031 2032 2033
      pMsg->sId = htobe64(schMgmt.sId);
      pMsg->queryId = htobe64(pJob->queryId);
      pMsg->taskId = htobe64(pTask->taskId);
      pMsg->refId = htobe64(pJob->refId);
      pMsg->taskType = TASK_TYPE_TEMP;
D
dapan1121 已提交
2034
      pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob);
L
Liu Jicong 已提交
2035 2036
      pMsg->phyLen = htonl(pTask->msgLen);
      pMsg->sqlLen = htonl(len);
2037 2038 2039

      memcpy(pMsg->msg, pJob->sql, len);
      memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
D
dapan1121 已提交
2040 2041

      persistHandle = true;
D
dapan1121 已提交
2042
      break;
2043 2044
    }

D
dapan1121 已提交
2045
    case TDMT_VND_RES_READY: {
S
Shengliang Guan 已提交
2046
      msgSize = sizeof(SResReadyReq);
wafwerar's avatar
wafwerar 已提交
2047
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2048
      if (NULL == msg) {
D
dapan 已提交
2049
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2050 2051 2052
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

S
Shengliang Guan 已提交
2053
      SResReadyReq *pMsg = msg;
L
Liu Jicong 已提交
2054 2055 2056 2057

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
2058
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
2059
      pMsg->taskId = htobe64(pTask->taskId);
D
dapan1121 已提交
2060 2061 2062
      break;
    }
    case TDMT_VND_FETCH: {
S
Shengliang Guan 已提交
2063
      msgSize = sizeof(SResFetchReq);
wafwerar's avatar
wafwerar 已提交
2064
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2065
      if (NULL == msg) {
D
dapan 已提交
2066
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2067 2068
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
2069

S
Shengliang Guan 已提交
2070
      SResFetchReq *pMsg = msg;
L
Liu Jicong 已提交
2071 2072 2073 2074

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
2075
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
2076
      pMsg->taskId = htobe64(pTask->taskId);
D
dapan1121 已提交
2077

D
dapan1121 已提交
2078 2079
      break;
    }
L
Liu Jicong 已提交
2080
    case TDMT_VND_DROP_TASK: {
S
Shengliang Guan 已提交
2081
      msgSize = sizeof(STaskDropReq);
wafwerar's avatar
wafwerar 已提交
2082
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2083
      if (NULL == msg) {
D
dapan 已提交
2084
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2085 2086
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
2087

S
Shengliang Guan 已提交
2088
      STaskDropReq *pMsg = msg;
L
Liu Jicong 已提交
2089 2090 2091 2092

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
2093
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
2094 2095
      pMsg->taskId = htobe64(pTask->taskId);
      pMsg->refId = htobe64(pJob->refId);
D
dapan1121 已提交
2096 2097 2098
      break;
    }
    case TDMT_VND_QUERY_HEARTBEAT: {
D
dapan1121 已提交
2099
      SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &rpcCtx));
L
Liu Jicong 已提交
2100

D
dapan1121 已提交
2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111
      SSchedulerHbReq req = {0};
      req.sId = schMgmt.sId;
      req.header.vgId = addr->nodeId;
      req.epId.nodeId = addr->nodeId;
      memcpy(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));

      msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
      if (msgSize < 0) {
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
wafwerar's avatar
wafwerar 已提交
2112
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2113 2114 2115 2116 2117 2118 2119 2120
      if (NULL == msg) {
        SCH_JOB_ELOG("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
      if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
2121 2122

      persistHandle = true;
D
dapan1121 已提交
2123 2124 2125
      break;
    }
    default:
D
dapan1121 已提交
2126
      SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType);
D
dapan1121 已提交
2127 2128 2129 2130
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      break;
  }

D
dapan1121 已提交
2131
  SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType);
D
dapan1121 已提交
2132

D
dapan1121 已提交
2133
  SSchTrans trans = {.transInst = pJob->transport, .transHandle = SCH_GET_TASK_HANDLE(pTask)};
L
Liu Jicong 已提交
2134 2135
  SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle,
                               (rpcCtx.args ? &rpcCtx : NULL)));
D
dapan1121 已提交
2136

D
dapan1121 已提交
2137 2138
  if (msgType == TDMT_VND_QUERY) {
    SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.transHandle));
D
dapan1121 已提交
2139
  }
L
Liu Jicong 已提交
2140

D
dapan1121 已提交
2141 2142 2143 2144
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
2145
  SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan1121 已提交
2146
  schFreeRpcCtx(&rpcCtx);
L
Liu Jicong 已提交
2147

wafwerar's avatar
wafwerar 已提交
2148
  taosMemoryFreeClear(msg);
D
dapan1121 已提交
2149 2150
  SCH_RET(code);
}
D
dapan1121 已提交
2151

D
dapan1121 已提交
2152 2153
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
L
Liu Jicong 已提交
2154
  SQueryNodeEpId  epId = {0};
D
dapan1121 已提交
2155 2156 2157

  epId.nodeId = addr->nodeId;
  memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
L
Liu Jicong 已提交
2158

D
dapan1121 已提交
2159
#if 1
D
dapan1121 已提交
2160 2161
  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
D
dapan1121 已提交
2162
    bool exist = false;
D
dapan1121 已提交
2163
    SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist));
D
dapan1121 已提交
2164
    if (!exist) {
D
dapan1121 已提交
2165
      SCH_ERR_RET(schBuildAndSendHbMsg(&epId));
D
dapan1121 已提交
2166
    }
D
dapan1121 已提交
2167
  }
D
dapan1121 已提交
2168
#endif
D
dapan1121 已提交
2169 2170 2171

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
2172

D
dapan1121 已提交
2173
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
2174
  int8_t  status = 0;
D
dapan1121 已提交
2175
  int32_t code = 0;
D
dapan1121 已提交
2176 2177

  atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
L
Liu Jicong 已提交
2178

D
dapan1121 已提交
2179
  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
2180
    SCH_TASK_DLOG("no need to launch task cause of job status, job status:%s", jobTaskStatusStr(status));
L
Liu Jicong 已提交
2181

D
dapan1121 已提交
2182
    SCH_RET(atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
2183
  }
D
dapan1121 已提交
2184 2185 2186 2187 2188 2189

  // NOTE: race condition: the task should be put into the hash table before send msg to server
  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) {
    SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING);
  }
L
Liu Jicong 已提交
2190

D
dapan1121 已提交
2191
  SSubplan *plan = pTask->plan;
D
dapan1121 已提交
2192

L
Liu Jicong 已提交
2193
  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
D
dapan1121 已提交
2194
    code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen);
D
dapan1121 已提交
2195
    if (TSDB_CODE_SUCCESS != code) {
L
Liu Jicong 已提交
2196 2197
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
                    pTask->msgLen);
D
dapan1121 已提交
2198
      SCH_ERR_RET(code);
2199
    } else {
D
dapan1121 已提交
2200
      SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg);
D
dapan1121 已提交
2201
    }
D
dapan1121 已提交
2202
  }
L
Liu Jicong 已提交
2203

D
dapan1121 已提交
2204
  SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
D
dapan1121 已提交
2205

D
dapan1121 已提交
2206 2207 2208
  if (SCH_IS_QUERY_JOB(pJob)) {
    SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
  }
L
Liu Jicong 已提交
2209

D
dapan1121 已提交
2210
  SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
L
Liu Jicong 已提交
2211

D
dapan1121 已提交
2212 2213 2214 2215 2216
  return TSDB_CODE_SUCCESS;
}

// Note: no more error processing, handled in function internal
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
2217
  bool    enough = false;
D
dapan1121 已提交
2218 2219
  int32_t code = 0;

D
dapan1121 已提交
2220 2221
  SCH_SET_TASK_HANDLE(pTask, NULL);

D
dapan1121 已提交
2222 2223 2224 2225 2226 2227 2228 2229 2230 2231
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));

    if (enough) {
      SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
    }
  } else {
    SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
  }

D
dapan1121 已提交
2232
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2233 2234

_return:
D
dapan1121 已提交
2235

D
dapan1121 已提交
2236
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
2237
}
D
dapan1121 已提交
2238

D
dapan1121 已提交
2239
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
D
dapan 已提交
2240
  for (int32_t i = 0; i < level->taskNum; ++i) {
D
dapan1121 已提交
2241
    SSchTask *pTask = taosArrayGet(level->subTasks, i);
D
dapan1121 已提交
2242

D
dapan1121 已提交
2243
    SCH_ERR_RET(schLaunchTask(pJob, pTask));
D
dapan1121 已提交
2244
  }
D
dapan1121 已提交
2245 2246 2247 2248 2249 2250

  return TSDB_CODE_SUCCESS;
}

int32_t schLaunchJob(SSchJob *pJob) {
  SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
L
Liu Jicong 已提交
2251

D
dapan1121 已提交
2252 2253 2254 2255 2256 2257
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_EXECUTING));

  SCH_ERR_RET(schCheckJobNeedFlowCtrl(pJob, level));

  SCH_ERR_RET(schLaunchLevelTasks(pJob, level));

D
dapan1121 已提交
2258
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2259 2260
}

D
dapan1121 已提交
2261
void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
2262
  if (NULL == pTask->execNodes) {
D
dapan1121 已提交
2263
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
2264 2265
    return;
  }
H
Haojun Liao 已提交
2266

D
dapan1121 已提交
2267
  int32_t size = (int32_t)taosArrayGetSize(pTask->execNodes);
L
Liu Jicong 已提交
2268

D
dapan1121 已提交
2269
  if (size <= 0) {
D
dapan1121 已提交
2270
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
2271 2272
    return;
  }
H
Haojun Liao 已提交
2273

D
dapan1121 已提交
2274
  SSchNodeInfo *nodeInfo = NULL;
D
dapan1121 已提交
2275
  for (int32_t i = 0; i < size; ++i) {
D
dapan1121 已提交
2276 2277
    nodeInfo = (SSchNodeInfo *)taosArrayGet(pTask->execNodes, i);
    SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
D
dapan1121 已提交
2278

D
dapan1121 已提交
2279
    schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK);
D
dapan1121 已提交
2280
  }
D
dapan1121 已提交
2281 2282

  SCH_TASK_DLOG("task has %d exec address", size);
D
dapan1121 已提交
2283 2284 2285
}

void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
D
dapan1121 已提交
2286 2287 2288 2289
  if (!SCH_IS_NEED_DROP_JOB(pJob)) {
    return;
  }

D
dapan1121 已提交
2290
  void *pIter = taosHashIterate(list, NULL);
D
dapan1121 已提交
2291
  while (pIter) {
D
dapan1121 已提交
2292
    SSchTask *pTask = *(SSchTask **)pIter;
H
Haojun Liao 已提交
2293

D
dapan1121 已提交
2294
    schDropTaskOnExecutedNode(pJob, pTask);
L
Liu Jicong 已提交
2295

D
dapan1121 已提交
2296
    pIter = taosHashIterate(list, pIter);
L
Liu Jicong 已提交
2297
  }
D
dapan1121 已提交
2298
}
H
Haojun Liao 已提交
2299

D
dapan1121 已提交
2300 2301 2302 2303
void schDropJobAllTasks(SSchJob *pJob) {
  schDropTaskInHashList(pJob, pJob->execTasks);
  schDropTaskInHashList(pJob, pJob->succTasks);
  schDropTaskInHashList(pJob, pJob->failTasks);
D
dapan1121 已提交
2304
}
2305

D
dapan1121 已提交
2306
int32_t schCancelJob(SSchJob *pJob) {
L
Liu Jicong 已提交
2307
  // TODO
2308
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
2309
  // TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
D
dapan1121 已提交
2310 2311
}

D
dapan1121 已提交
2312
void schCloseJobRef(void) {
2313
  if (!atomic_load_8((int8_t *)&schMgmt.exit)) {
D
dapan1121 已提交
2314 2315
    return;
  }
2316

D
dapan1121 已提交
2317 2318 2319 2320 2321 2322 2323 2324
  SCH_LOCK(SCH_WRITE, &schMgmt.lock);
  if (atomic_load_32(&schMgmt.jobNum) <= 0 && schMgmt.jobRef >= 0) {
    taosCloseRef(schMgmt.jobRef);
    schMgmt.jobRef = -1;
  }
  SCH_UNLOCK(SCH_WRITE, &schMgmt.lock);
}

D
dapan1121 已提交
2325 2326 2327 2328 2329 2330 2331
void schFreeJobImpl(void *job) {
  if (NULL == job) {
    return;
  }

  SSchJob *pJob = job;
  uint64_t queryId = pJob->queryId;
L
Liu Jicong 已提交
2332
  int64_t  refId = pJob->refId;
D
dapan1121 已提交
2333 2334 2335 2336 2337 2338 2339

  if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
    schCancelJob(pJob);
  }

  schDropJobAllTasks(pJob);

L
Liu Jicong 已提交
2340 2341
  pJob->subPlans = NULL;  // it is a reference to pDag->pSubplans

D
dapan1121 已提交
2342
  int32_t numOfLevels = taosArrayGetSize(pJob->levels);
L
Liu Jicong 已提交
2343
  for (int32_t i = 0; i < numOfLevels; ++i) {
D
dapan1121 已提交
2344 2345
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);

D
dapan1121 已提交
2346
    schFreeFlowCtrl(pLevel);
L
Liu Jicong 已提交
2347

D
dapan1121 已提交
2348
    int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
L
Liu Jicong 已提交
2349 2350
    for (int32_t j = 0; j < numOfTasks; ++j) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
D
dapan1121 已提交
2351 2352 2353 2354 2355
      schFreeTask(pTask);
    }

    taosArrayDestroy(pLevel->subTasks);
  }
L
Liu Jicong 已提交
2356

D
dapan1121 已提交
2357 2358 2359
  taosHashCleanup(pJob->execTasks);
  taosHashCleanup(pJob->failTasks);
  taosHashCleanup(pJob->succTasks);
L
Liu Jicong 已提交
2360

D
dapan1121 已提交
2361 2362
  taosArrayDestroy(pJob->levels);
  taosArrayDestroy(pJob->nodeList);
L
Liu Jicong 已提交
2363

D
dapan1121 已提交
2364 2365
  qExplainFreeCtx(pJob->explainCtx);

wafwerar's avatar
wafwerar 已提交
2366 2367
  taosMemoryFreeClear(pJob->resData);
  taosMemoryFreeClear(pJob);
D
dapan1121 已提交
2368

L
Liu Jicong 已提交
2369
  qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob);
D
dapan1121 已提交
2370 2371 2372 2373

  atomic_sub_fetch_32(&schMgmt.jobNum, 1);

  schCloseJobRef();
D
dapan1121 已提交
2374 2375
}

L
Liu Jicong 已提交
2376
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
D
dapan 已提交
2377
                              int64_t startTs, bool needRes, bool syncSchedule) {
L
Liu Jicong 已提交
2378
  qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
H
Haojun Liao 已提交
2379

D
dapan1121 已提交
2380
  if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) {
L
Liu Jicong 已提交
2381
    qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pDag->queryId);
D
dapan1121 已提交
2382 2383
  }

L
Liu Jicong 已提交
2384
  int32_t  code = 0;
D
dapan1121 已提交
2385
  SSchJob *pJob = NULL;
D
dapan 已提交
2386
  SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, needRes, syncSchedule));
D
dapan1121 已提交
2387

D
dapan1121 已提交
2388
  SCH_ERR_JRET(schLaunchJob(pJob));
2389

D
dapan1121 已提交
2390
  *job = pJob->refId;
L
Liu Jicong 已提交
2391

D
dapan 已提交
2392
  if (syncSchedule) {
D
dapan1121 已提交
2393
    SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2394
    tsem_wait(&pJob->rspSem);
D
dapan1121 已提交
2395 2396
  }

D
dapan1121 已提交
2397
  SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2398

D
dapan1121 已提交
2399
  schReleaseJob(pJob->refId);
L
Liu Jicong 已提交
2400

D
dapan1121 已提交
2401
  return TSDB_CODE_SUCCESS;
2402

D
dapan1121 已提交
2403
_return:
D
dapan1121 已提交
2404

D
dapan1121 已提交
2405
  schFreeJobImpl(pJob);
D
dapan1121 已提交
2406
  SCH_RET(code);
2407
}
D
dapan1121 已提交
2408

D
dapan1121 已提交
2409
int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
H
Hongze Cheng 已提交
2410
                             bool syncSchedule) {
D
dapan1121 已提交
2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421
  qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);

  int32_t  code = 0;
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
    qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->sql = sql;
  pJob->attr.queryJob = true;
D
dapan1121 已提交
2422
  pJob->attr.explainMode = pDag->explainInfo.mode;
D
dapan1121 已提交
2423 2424
  pJob->queryId = pDag->queryId;
  pJob->subPlans = pDag->pSubplans;
D
dapan1121 已提交
2425

D
dapan1121 已提交
2426
  SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData));
D
dapan1121 已提交
2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456

  int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
  if (refId < 0) {
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
    SCH_ERR_JRET(terrno);
  }

  if (NULL == schAcquireJob(refId)) {
    SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
    SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  pJob->refId = refId;

  SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);

  pJob->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
  *job = pJob->refId;
  SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));

  schReleaseJob(pJob->refId);

  return TSDB_CODE_SUCCESS;

_return:

  schFreeJobImpl(pJob);
  SCH_RET(code);
}

D
dapan1121 已提交
2457
int32_t schedulerInit(SSchedulerCfg *cfg) {
D
dapan1121 已提交
2458
  if (schMgmt.jobRef >= 0) {
D
dapan1121 已提交
2459 2460 2461 2462 2463 2464
    qError("scheduler already initialized");
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  if (cfg) {
    schMgmt.cfg = *cfg;
L
Liu Jicong 已提交
2465

D
dapan1121 已提交
2466
    if (schMgmt.cfg.maxJobNum == 0) {
D
dapan1121 已提交
2467
      schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
D
dapan1121 已提交
2468
    }
D
dapan1121 已提交
2469 2470 2471
    if (schMgmt.cfg.maxNodeTableNum <= 0) {
      schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
    }
D
dapan1121 已提交
2472
  } else {
D
dapan1121 已提交
2473 2474
    schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
    schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
D
dapan1121 已提交
2475
  }
L
Liu Jicong 已提交
2476

D
dapan1121 已提交
2477 2478
  schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl);
  if (schMgmt.jobRef < 0) {
D
dapan1121 已提交
2479 2480 2481 2482 2483 2484 2485
    qError("init schduler jobRef failed, num:%u", schMgmt.cfg.maxJobNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  schMgmt.hbConnections = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
  if (NULL == schMgmt.hbConnections) {
    qError("taosHashInit hb connections failed");
D
dapan1121 已提交
2486 2487 2488
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
2489
  if (taosGetSystemUUID((char *)&schMgmt.sId, sizeof(schMgmt.sId))) {
D
dapan1121 已提交
2490 2491 2492 2493
    qError("generate schdulerId failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR);
  }

L
Liu Jicong 已提交
2494 2495
  qInfo("scheduler %" PRIx64 " initizlized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum);

D
dapan1121 已提交
2496 2497 2498
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2499
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
D
dapan 已提交
2500
                         int64_t startTs, bool needRes, SQueryResult *pRes) {
H
Haojun Liao 已提交
2501
  if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
D
dapan1121 已提交
2502 2503 2504
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
2505
  if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
D
dapan1121 已提交
2506
    SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true));
D
dapan1121 已提交
2507
  } else {
D
dapan 已提交
2508
    SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, needRes, true));
D
dapan1121 已提交
2509
  }
D
dapan1121 已提交
2510

D
dapan1121 已提交
2511
  SSchJob *job = schAcquireJob(*pJob);
D
dapan1121 已提交
2512

D
dapan1121 已提交
2513 2514
  pRes->code = atomic_load_32(&job->errCode);
  pRes->numOfRows = job->resNumOfRows;
D
dapan 已提交
2515 2516 2517 2518
  if (needRes) {
    pRes->res = job->resData;
    job->resData = NULL;
  }
L
Liu Jicong 已提交
2519

D
dapan1121 已提交
2520
  schReleaseJob(*pJob);
L
Liu Jicong 已提交
2521

D
dapan1121 已提交
2522 2523 2524
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2525
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, const char *sql, int64_t *pJob) {
2526
  if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
D
dapan1121 已提交
2527 2528 2529
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
2530 2531 2532
  if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
    SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false));
  } else {
D
dapan 已提交
2533
    SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false, false));
D
dapan1121 已提交
2534
  }
L
Liu Jicong 已提交
2535

D
dapan1121 已提交
2536
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2537 2538
}

L
Liu Jicong 已提交
2539
#if 0
X
Xiaoyu Wang 已提交
2540
int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks) {
X
Xiaoyu Wang 已提交
2541
  if (NULL == pDag || pDag->numOfSubplans <= 0 || LIST_LENGTH(pDag->pSubplans) == 0) {
D
dapan1121 已提交
2542 2543 2544
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

X
Xiaoyu Wang 已提交
2545
  int32_t levelNum = LIST_LENGTH(pDag->pSubplans);
D
dapan1121 已提交
2546 2547 2548 2549 2550
  if (1 != levelNum) {
    qError("invalid level num: %d", levelNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

X
Xiaoyu Wang 已提交
2551 2552
  SNodeListNode *plans = (SNodeListNode*)nodesListGetNode(pDag->pSubplans, 0);
  int32_t taskNum = LIST_LENGTH(plans->pNodeList);
D
dapan1121 已提交
2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569
  if (taskNum <= 0) {
    qError("invalid task num: %d", taskNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SArray *info = taosArrayInit(taskNum, sizeof(STaskInfo));
  if (NULL == info) {
    qError("taosArrayInit %d taskInfo failed", taskNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  STaskInfo tInfo = {0};
  char *msg = NULL;
  int32_t msgLen = 0;
  int32_t code = 0;
  
  for (int32_t i = 0; i < taskNum; ++i) {
X
Xiaoyu Wang 已提交
2570
    SSubplan *plan = (SSubplan*)nodesListGetNode(plans->pNodeList, i);
D
dapan1121 已提交
2571 2572 2573
    tInfo.addr = plan->execNode;

    code = qSubPlanToString(plan, &msg, &msgLen);
D
dapan1121 已提交
2574
    if (TSDB_CODE_SUCCESS != code) {
D
dapan1121 已提交
2575 2576 2577 2578 2579 2580 2581 2582 2583 2584
      qError("subplanToString error, code:%x, msg:%p, len:%d", code, msg, msgLen);
      SCH_ERR_JRET(code);
    }

    int32_t msgSize = sizeof(SSubQueryMsg) + msgLen;
    if (NULL == msg) {
      qError("calloc %d failed", msgSize);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
    
wafwerar's avatar
wafwerar 已提交
2585
    SSubQueryMsg* pMsg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2586
    
L
Liu Jicong 已提交
2587
    pMsg->header.vgId = tInfo.addr.nodeId;
D
dapan1121 已提交
2588
    
2589 2590 2591
    pMsg->sId      = schMgmt.sId;
    pMsg->queryId  = plan->id.queryId;
    pMsg->taskId   = schGenUUID();
D
dapan1121 已提交
2592
    pMsg->taskType = TASK_TYPE_PERSISTENT;
2593 2594
    pMsg->phyLen   = msgLen;
    pMsg->sqlLen   = 0;
L
Liu Jicong 已提交
2595
    memcpy(pMsg->msg, msg, msgLen);
L
fix tq  
Liu Jicong 已提交
2596
    /*memcpy(pMsg->msg, ((SSubQueryMsg*)msg)->msg, msgLen);*/
D
dapan1121 已提交
2597 2598 2599 2600 2601

    tInfo.msg = pMsg;

    if (NULL == taosArrayPush(info, &tInfo)) {
      qError("taosArrayPush failed, idx:%d", i);
wafwerar's avatar
wafwerar 已提交
2602
      taosMemoryFree(msg);
D
dapan1121 已提交
2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

  *pTasks = info;
  info = NULL;
  
_return:
  schedulerFreeTaskList(info);
  SCH_RET(code);
}

D
dapan1121 已提交
2615 2616 2617 2618 2619 2620
int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) {
  if (NULL == src || NULL == dst || copyNum <= 0) {
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
H
Haojun Liao 已提交
2621

D
dapan1121 已提交
2622 2623 2624 2625 2626 2627
  *dst = taosArrayInit(copyNum, sizeof(STaskInfo));
  if (NULL == *dst) {
    qError("taosArrayInit %d taskInfo failed", copyNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

2628
  int32_t msgSize = src->msg->phyLen + sizeof(*src->msg);
D
dapan1121 已提交
2629 2630 2631
  STaskInfo info = {0};

  info.addr = src->addr;
H
Haojun Liao 已提交
2632

D
dapan1121 已提交
2633
  for (int32_t i = 0; i < copyNum; ++i) {
wafwerar's avatar
wafwerar 已提交
2634
    info.msg = taosMemoryMalloc(msgSize);
D
dapan1121 已提交
2635 2636 2637 2638 2639 2640 2641 2642
    if (NULL == info.msg) {
      qError("malloc %d failed", msgSize);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    memcpy(info.msg, src->msg, msgSize);

    info.msg->taskId = schGenUUID();
H
Haojun Liao 已提交
2643

D
dapan1121 已提交
2644 2645
    if (NULL == taosArrayPush(*dst, &info)) {
      qError("taosArrayPush failed, idx:%d", i);
wafwerar's avatar
wafwerar 已提交
2646
      taosMemoryFree(info.msg);
D
dapan1121 已提交
2647 2648 2649 2650 2651 2652 2653 2654 2655 2656
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

  return TSDB_CODE_SUCCESS;

_return:

  schedulerFreeTaskList(*dst);
  *dst = NULL;
H
Haojun Liao 已提交
2657

D
dapan1121 已提交
2658 2659
  SCH_RET(code);
}
L
Liu Jicong 已提交
2660
#endif
D
dapan1121 已提交
2661

L
Liu Jicong 已提交
2662
int32_t schedulerFetchRows(int64_t job, void **pData) {
D
dapan1121 已提交
2663
  if (NULL == pData) {
D
dapan1121 已提交
2664
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan 已提交
2665 2666
  }

L
Liu Jicong 已提交
2667
  int32_t  code = 0;
D
dapan1121 已提交
2668
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2669 2670 2671 2672
  if (NULL == pJob) {
    qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
D
dapan1121 已提交
2673

D
dapan1121 已提交
2674 2675
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (status == JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
2676
    SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2677
    schReleaseJob(job);
D
dapan1121 已提交
2678
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
2679 2680
  }

D
dapan1121 已提交
2681
  if (!SCH_JOB_NEED_FETCH(pJob)) {
D
dapan1121 已提交
2682
    SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2683
    schReleaseJob(job);
D
dapan1121 已提交
2684
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
2685 2686
  }

D
dapan1121 已提交
2687 2688
  if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) {
    SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch));
D
dapan1121 已提交
2689
    schReleaseJob(job);
D
dapan1121 已提交
2690
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan 已提交
2691 2692
  }

D
dapan1121 已提交
2693
  if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
D
dapan1121 已提交
2694
    SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2695 2696
    SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
  } else if (status == JOB_TASK_STATUS_SUCCEED) {
D
dapan1121 已提交
2697
    SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2698 2699
    goto _return;
  } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
D
dapan1121 已提交
2700
    if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
D
dapan1121 已提交
2701 2702
      SCH_ERR_JRET(schFetchFromRemote(pJob));
      tsem_wait(&pJob->rspSem);
H
Hongze Cheng 已提交
2703
    }
D
dapan1121 已提交
2704 2705 2706
  } else {
    SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
    SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan 已提交
2707 2708
  }

D
dapan1121 已提交
2709
  status = SCH_GET_JOB_STATUS(pJob);
D
dapan 已提交
2710

D
dapan1121 已提交
2711
  if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
D
dapan1121 已提交
2712
    SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2713
    SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
D
dapan 已提交
2714
  }
L
Liu Jicong 已提交
2715

D
dapan1121 已提交
2716
  if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) {
D
dapan1121 已提交
2717
    SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED));
D
dapan 已提交
2718 2719
  }

D
dapan1121 已提交
2720
  while (true) {
D
dapan1121 已提交
2721 2722
    *pData = atomic_load_ptr(&pJob->resData);
    if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) {
D
dapan1121 已提交
2723 2724 2725 2726 2727
      continue;
    }

    break;
  }
D
dapan 已提交
2728

D
dapan1121 已提交
2729
  if (NULL == *pData) {
wafwerar's avatar
wafwerar 已提交
2730
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
D
dapan1121 已提交
2731 2732 2733 2734 2735
    if (rsp) {
      rsp->completed = 1;
    }

    *pData = rsp;
D
dapan1121 已提交
2736
    SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
D
dapan1121 已提交
2737
  }
D
dapan1121 已提交
2738

2739
  SCH_JOB_DLOG("fetch done, totalRows:%d, code:%s", pJob->resNumOfRows, tstrerror(code));
D
dapan1121 已提交
2740 2741 2742 2743

_return:

  atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
L
Liu Jicong 已提交
2744

D
dapan1121 已提交
2745
  schReleaseJob(job);
D
dapan 已提交
2746

D
dapan1121 已提交
2747
  SCH_RET(code);
D
dapan 已提交
2748
}
D
dapan1121 已提交
2749

D
dapan1121 已提交
2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
  int32_t  code = 0;
  SSchJob *pJob = schAcquireJob(job);
  if (NULL == pJob) {
    qDebug("acquire job from jobRef list failed, may not started or dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  if (pJob->status < JOB_TASK_STATUS_NOT_START || pJob->levelNum <= 0 || NULL == pJob->levels) {
    qDebug("job not initialized or not executable job, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  for (int32_t i = pJob->levelNum - 1; i >= 0; --i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
H
Hongze Cheng 已提交
2765

D
dapan1121 已提交
2766
    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
X
Xiaoyu Wang 已提交
2767
      SSchTask     *pTask = taosArrayGet(pLevel->subTasks, m);
D
dapan1121 已提交
2768
      SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status};
H
Hongze Cheng 已提交
2769

D
dapan1121 已提交
2770 2771 2772 2773 2774 2775 2776
      taosArrayPush(pSub, &subDesc);
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2777
int32_t scheduleCancelJob(int64_t job) {
D
dapan1121 已提交
2778
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2779 2780 2781 2782
  if (NULL == pJob) {
    qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
D
dapan1121 已提交
2783

D
dapan1121 已提交
2784 2785
  int32_t code = schCancelJob(pJob);

D
dapan1121 已提交
2786
  schReleaseJob(job);
D
dapan1121 已提交
2787 2788

  SCH_RET(code);
D
dapan1121 已提交
2789 2790
}

D
dapan1121 已提交
2791
void schedulerFreeJob(int64_t job) {
D
dapan1121 已提交
2792
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2793
  if (NULL == pJob) {
D
dapan1121 已提交
2794
    qDebug("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
D
dapan 已提交
2795 2796
    return;
  }
D
dapan1121 已提交
2797

D
dapan1121 已提交
2798 2799
  if (atomic_load_8(&pJob->userFetch) > 0) {
    schProcessOnJobDropped(pJob, TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
2800
  }
D
dapan1121 已提交
2801

D
dapan1121 已提交
2802
  SCH_JOB_DLOG("start to remove job from jobRef list, refId:%" PRIx64, job);
2803

D
dapan1121 已提交
2804 2805
  if (taosRemoveRef(schMgmt.jobRef, job)) {
    SCH_JOB_ELOG("remove job from job list failed, refId:%" PRIx64, job);
2806
  }
D
dapan1121 已提交
2807 2808

  schReleaseJob(job);
D
dapan1121 已提交
2809
}
D
dapan1121 已提交
2810 2811 2812 2813 2814 2815 2816 2817 2818

void schedulerFreeTaskList(SArray *taskList) {
  if (NULL == taskList) {
    return;
  }

  int32_t taskNum = taosArrayGetSize(taskList);
  for (int32_t i = 0; i < taskNum; ++i) {
    STaskInfo *info = taosArrayGet(taskList, i);
wafwerar's avatar
wafwerar 已提交
2819
    taosMemoryFreeClear(info->msg);
D
dapan1121 已提交
2820 2821 2822 2823
  }

  taosArrayDestroy(taskList);
}
L
Liu Jicong 已提交
2824

D
dapan1121 已提交
2825
void schedulerDestroy(void) {
2826 2827
  atomic_store_8((int8_t *)&schMgmt.exit, 1);

D
dapan1121 已提交
2828
  if (schMgmt.jobRef >= 0) {
D
dapan1121 已提交
2829
    SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0);
H
Hongze Cheng 已提交
2830
    int64_t  refId = 0;
C
Cary Xu 已提交
2831

D
dapan1121 已提交
2832
    while (pJob) {
D
dapan1121 已提交
2833
      refId = pJob->refId;
C
Cary Xu 已提交
2834 2835 2836
      if (refId == 0) {
        break;
      }
D
dapan1121 已提交
2837
      taosRemoveRef(schMgmt.jobRef, pJob->refId);
L
Liu Jicong 已提交
2838

D
dapan1121 已提交
2839
      pJob = taosIterateRef(schMgmt.jobRef, refId);
D
dapan1121 已提交
2840
    }
D
dapan1121 已提交
2841
  }
D
dapan1121 已提交
2842 2843

  if (schMgmt.hbConnections) {
H
Hongze Cheng 已提交
2844
    void *pIter = taosHashIterate(schMgmt.hbConnections, NULL);
D
dapan1121 已提交
2845 2846 2847 2848
    while (pIter != NULL) {
      SSchHbTrans *hb = pIter;
      schFreeRpcCtx(&hb->rpcCtx);
      pIter = taosHashIterate(schMgmt.hbConnections, pIter);
H
Hongze Cheng 已提交
2849
    }
D
dapan1121 已提交
2850 2851 2852
    taosHashCleanup(schMgmt.hbConnections);
    schMgmt.hbConnections = NULL;
  }
D
dapan1121 已提交
2853
}