scheduler.c 87.1 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15
 */

L
Liu Jicong 已提交
16
#include "catalog.h"
H
Hongze Cheng 已提交
17
#include "command.h"
L
Liu Jicong 已提交
18
#include "query.h"
D
dapan1121 已提交
19
#include "schedulerInt.h"
H
Hongze Cheng 已提交
20
#include "tmsg.h"
D
dapan1121 已提交
21
#include "tref.h"
D
dapan1121 已提交
22
#include "trpc.h"
23

D
dapan1121 已提交
24
SSchedulerMgmt schMgmt = {
25
    .jobRef = -1,
D
dapan1121 已提交
26
};
D
dapan1121 已提交
27

L
Liu Jicong 已提交
28
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); }
D
dapan1121 已提交
29

L
Liu Jicong 已提交
30
FORCE_INLINE int32_t schReleaseJob(int64_t refId) { return taosReleaseRef(schMgmt.jobRef, refId); }
D
dapan1121 已提交
31

L
Liu Jicong 已提交
32
uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }
D
dapan1121 已提交
33

L
Liu Jicong 已提交
34
#if 0
D
dapan1121 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
uint64_t schGenUUID(void) {
  static uint64_t hashId = 0;
  static int32_t requestSerialId = 0;

  if (hashId == 0) {
    char    uid[64];
    int32_t code = taosGetSystemUUID(uid, tListLen(uid));
    if (code != TSDB_CODE_SUCCESS) {
      qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    } else {
      hashId = MurmurHash3_32(uid, strlen(uid));
    }
  }

  int64_t ts      = taosGetTimestampMs();
  uint64_t pid    = taosGetPId();
  int32_t val     = atomic_add_fetch_32(&requestSerialId, 1);

  uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
  return id;
}
L
Liu Jicong 已提交
56
#endif
D
dapan1121 已提交
57

L
Liu Jicong 已提交
58 59 60
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
  pTask->plan = pPlan;
  pTask->level = pLevel;
D
dapan1121 已提交
61
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
D
dapan1121 已提交
62
  pTask->taskId = schGenTaskId();
D
dapan1121 已提交
63 64 65
  pTask->execNodes = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SSchNodeInfo));
  if (NULL == pTask->execNodes) {
    SCH_TASK_ELOG("taosArrayInit %d execNodes failed", SCH_MAX_CANDIDATE_EP_NUM);
D
dapan1121 已提交
66 67 68 69 70 71
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
72
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql,
D
dapan 已提交
73
                   int64_t startTs, bool needRes, bool syncSchedule) {
H
Hongze Cheng 已提交
74
  int32_t  code = 0;
75
  int64_t  refId = -1;
D
dapan1121 已提交
76 77 78 79 80 81 82 83
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
    qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->attr.explainMode = pDag->explainInfo.mode;
  pJob->attr.syncSchedule = syncSchedule;
D
dapan 已提交
84
  pJob->attr.needRes = needRes;
D
dapan1121 已提交
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
  pJob->transport = transport;
  pJob->sql = sql;

  if (pNodeList != NULL) {
    pJob->nodeList = taosArrayDup(pNodeList);
  }

  SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_JRET(qExecExplainBegin(pDag, &pJob->explainCtx, startTs));
  }

  pJob->execTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->execTasks) {
    SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->succTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->succTasks) {
    SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->failTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->failTasks) {
    SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  tsem_init(&pJob->rspSem, 0, 0);

D
dapan1121 已提交
121
  refId = taosAddRef(schMgmt.jobRef, pJob);
D
dapan1121 已提交
122 123 124 125 126
  if (refId < 0) {
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
    SCH_ERR_JRET(terrno);
  }

D
dapan1121 已提交
127
  atomic_add_fetch_32(&schMgmt.jobNum, 1);
128

D
dapan1121 已提交
129 130
  if (NULL == schAcquireJob(refId)) {
    SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
D
dapan1121 已提交
131
    SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144 145
  }

  pJob->refId = refId;

  SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);

  pJob->status = JOB_TASK_STATUS_NOT_START;

  *pSchJob = pJob;

  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
146 147 148 149 150
  if (refId < 0) {
    schFreeJobImpl(pJob);
  } else {
    taosRemoveRef(schMgmt.jobRef, refId);
  }
D
dapan1121 已提交
151 152 153
  SCH_RET(code);
}

D
dapan1121 已提交
154 155 156 157 158 159 160 161
void schFreeRpcCtx(SRpcCtx *pCtx) {
  if (NULL == pCtx) {
    return;
  }
  void *pIter = taosHashIterate(pCtx->args, NULL);
  while (pIter) {
    SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter;

D
dapan1121 已提交
162
    (*ctxVal->freeFunc)(ctxVal->val);
L
Liu Jicong 已提交
163

D
dapan1121 已提交
164 165
    pIter = taosHashIterate(pCtx->args, pIter);
  }
L
Liu Jicong 已提交
166

D
dapan1121 已提交
167
  taosHashCleanup(pCtx->args);
D
dapan1121 已提交
168

D
dapan1121 已提交
169 170
  if (pCtx->brokenVal.freeFunc) {
    (*pCtx->brokenVal.freeFunc)(pCtx->brokenVal.val);
D
dapan1121 已提交
171
  }
D
dapan1121 已提交
172 173
}

L
Liu Jicong 已提交
174
void schFreeTask(SSchTask *pTask) {
D
dapan1121 已提交
175 176 177 178
  if (pTask->candidateAddrs) {
    taosArrayDestroy(pTask->candidateAddrs);
  }

wafwerar's avatar
wafwerar 已提交
179
  taosMemoryFreeClear(pTask->msg);
D
dapan1121 已提交
180 181 182 183 184 185 186 187

  if (pTask->children) {
    taosArrayDestroy(pTask->children);
  }

  if (pTask->parents) {
    taosArrayDestroy(pTask->parents);
  }
H
Haojun Liao 已提交
188

D
dapan1121 已提交
189 190
  if (pTask->execNodes) {
    taosArrayDestroy(pTask->execNodes);
H
Haojun Liao 已提交
191
  }
D
dapan1121 已提交
192 193
}

D
dapan1121 已提交
194 195 196 197 198 199
static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (pStatus) {
    *pStatus = status;
  }

L
Liu Jicong 已提交
200 201 202
  return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED ||
          status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING ||
          status == JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
203 204
}

D
dapan1121 已提交
205
int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
D
dapan1121 已提交
206
  int32_t lastMsgType = SCH_GET_TASK_LASTMSG_TYPE(pTask);
D
dapan1121 已提交
207
  int32_t taskStatus = SCH_GET_TASK_STATUS(pTask);
D
dapan1121 已提交
208
  int32_t reqMsgType = msgType - 1;
D
dapan1121 已提交
209
  switch (msgType) {
D
dapan1121 已提交
210
    case TDMT_SCH_LINK_BROKEN:
D
dapan1121 已提交
211
    case TDMT_VND_EXPLAIN_RSP:
D
dapan1121 已提交
212
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
213
    case TDMT_VND_QUERY_RSP:  // query_rsp may be processed later than ready_rsp
L
Liu Jicong 已提交
214 215 216
      if (lastMsgType != reqMsgType && -1 != lastMsgType && TDMT_VND_FETCH != lastMsgType) {
        SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
217
      }
L
Liu Jicong 已提交
218

D
dapan1121 已提交
219
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
220 221
        SCH_TASK_DLOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
222
      }
L
Liu Jicong 已提交
223

D
dapan1121 已提交
224
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan1121 已提交
225
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
226 227
    case TDMT_VND_RES_READY_RSP:
      reqMsgType = TDMT_VND_QUERY;
D
dapan1121 已提交
228
      if (lastMsgType != reqMsgType && -1 != lastMsgType) {
L
Liu Jicong 已提交
229 230
        SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s",
                      (lastMsgType > 0 ? TMSG_INFO(lastMsgType) : "null"), TMSG_INFO(msgType));
D
dapan1121 已提交
231 232
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
233

D
dapan1121 已提交
234
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
235 236
        SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
237 238 239
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }

D
dapan1121 已提交
240 241 242
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
      return TSDB_CODE_SUCCESS;
    case TDMT_VND_FETCH_RSP:
L
Liu Jicong 已提交
243 244 245
      if (lastMsgType != reqMsgType && -1 != lastMsgType) {
        SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
246 247
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
248

D
dapan1121 已提交
249
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
250 251
        SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
252 253
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
254

D
dapan1121 已提交
255 256
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
257
    case TDMT_VND_CREATE_TABLE_RSP:
X
Xiaoyu Wang 已提交
258
    case TDMT_VND_DROP_TABLE_RSP:
X
Xiaoyu Wang 已提交
259
    case TDMT_VND_ALTER_TABLE_RSP:
D
dapan1121 已提交
260
    case TDMT_VND_SUBMIT_RSP:
D
dapan1121 已提交
261 262
      break;
    default:
D
dapan1121 已提交
263
      SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus));
D
dapan1121 已提交
264 265 266
      SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
267
  if (lastMsgType != reqMsgType) {
L
Liu Jicong 已提交
268 269
    SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                  TMSG_INFO(msgType));
D
dapan1121 已提交
270 271
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
L
Liu Jicong 已提交
272

D
dapan1121 已提交
273
  if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
274 275
    SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                  TMSG_INFO(msgType));
D
dapan1121 已提交
276 277 278
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

D
dapan1121 已提交
279
  SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan 已提交
280

D
dapan1121 已提交
281 282 283 284
  return TSDB_CODE_SUCCESS;
}

int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
D
dapan 已提交
285 286
  int32_t code = 0;

D
dapan1121 已提交
287
  int8_t oriStatus = 0;
D
dapan1121 已提交
288

D
dapan1121 已提交
289 290 291 292 293 294
  while (true) {
    oriStatus = SCH_GET_JOB_STATUS(pJob);

    if (oriStatus == newStatus) {
      SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
L
Liu Jicong 已提交
295

D
dapan1121 已提交
296 297 298 299 300
    switch (oriStatus) {
      case JOB_TASK_STATUS_NULL:
        if (newStatus != JOB_TASK_STATUS_NOT_START) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
301

D
dapan1121 已提交
302 303 304 305 306
        break;
      case JOB_TASK_STATUS_NOT_START:
        if (newStatus != JOB_TASK_STATUS_EXECUTING) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
307

D
dapan1121 已提交
308 309
        break;
      case JOB_TASK_STATUS_EXECUTING:
L
Liu Jicong 已提交
310 311 312
        if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED && newStatus != JOB_TASK_STATUS_FAILED &&
            newStatus != JOB_TASK_STATUS_CANCELLING && newStatus != JOB_TASK_STATUS_CANCELLED &&
            newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
313 314
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
315

D
dapan1121 已提交
316 317
        break;
      case JOB_TASK_STATUS_PARTIAL_SUCCEED:
L
Liu Jicong 已提交
318 319
        if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_SUCCEED &&
            newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
320 321
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
322

D
dapan1121 已提交
323 324 325 326 327 328 329
        break;
      case JOB_TASK_STATUS_SUCCEED:
      case JOB_TASK_STATUS_FAILED:
      case JOB_TASK_STATUS_CANCELLING:
        if (newStatus != JOB_TASK_STATUS_DROPPING) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
330

D
dapan1121 已提交
331 332 333
        break;
      case JOB_TASK_STATUS_CANCELLED:
      case JOB_TASK_STATUS_DROPPING:
D
dapan1121 已提交
334
        SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
335
        break;
L
Liu Jicong 已提交
336

D
dapan1121 已提交
337
      default:
D
dapan1121 已提交
338
        SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));
D
dapan 已提交
339
        SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
340 341 342 343 344
    }

    if (oriStatus != atomic_val_compare_exchange_8(&pJob->status, oriStatus, newStatus)) {
      continue;
    }
D
dapan 已提交
345

D
dapan1121 已提交
346
    SCH_JOB_DLOG("job status updated from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan1121 已提交
347

D
dapan1121 已提交
348 349
    break;
  }
D
dapan1121 已提交
350

D
dapan 已提交
351 352 353 354
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
355
  SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan 已提交
356
  SCH_ERR_RET(code);
357
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
358 359
}

D
dapan1121 已提交
360
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
D
dapan1121 已提交
361 362
  for (int32_t i = 0; i < pJob->levelNum; ++i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
L
Liu Jicong 已提交
363

D
dapan1121 已提交
364 365 366
    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
      SSubplan *pPlan = pTask->plan;
L
Liu Jicong 已提交
367 368
      int32_t   childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0;
      int32_t   parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0;
D
dapan1121 已提交
369 370

      if (childNum > 0) {
D
dapan1121 已提交
371 372 373 374
        if (pJob->levelIdx == pLevel->level) {
          SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
L
Liu Jicong 已提交
375

D
dapan1121 已提交
376 377 378
        pTask->children = taosArrayInit(childNum, POINTER_BYTES);
        if (NULL == pTask->children) {
          SCH_TASK_ELOG("taosArrayInit %d children failed", childNum);
D
dapan1121 已提交
379 380 381 382 383
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      for (int32_t n = 0; n < childNum; ++n) {
X
Xiaoyu Wang 已提交
384
        SSubplan  *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
X
Xiaoyu Wang 已提交
385
        SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
D
dapan 已提交
386
        if (NULL == childTask || NULL == *childTask) {
D
dapan1121 已提交
387
          SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
388 389 390
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan1121 已提交
391 392
        if (NULL == taosArrayPush(pTask->children, childTask)) {
          SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
393 394 395 396 397
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      if (parentNum > 0) {
D
dapan1121 已提交
398 399 400 401
        if (0 == pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
L
Liu Jicong 已提交
402

D
dapan1121 已提交
403 404 405
        pTask->parents = taosArrayInit(parentNum, POINTER_BYTES);
        if (NULL == pTask->parents) {
          SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
D
dapan1121 已提交
406 407
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
D
dapan1121 已提交
408 409 410 411 412
      } else {
        if (0 != pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:%d, parentNum:%d", pLevel->level, parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
D
dapan1121 已提交
413 414 415
      }

      for (int32_t n = 0; n < parentNum; ++n) {
X
Xiaoyu Wang 已提交
416
        SSubplan  *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
X
Xiaoyu Wang 已提交
417
        SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
D
dapan 已提交
418
        if (NULL == parentTask || NULL == *parentTask) {
D
dapan1121 已提交
419
          SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
420 421 422
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan1121 已提交
423 424
        if (NULL == taosArrayPush(pTask->parents, parentTask)) {
          SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
425 426
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
L
Liu Jicong 已提交
427
      }
D
dapan1121 已提交
428 429

      SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
D
dapan1121 已提交
430 431 432
    }
  }

D
dapan1121 已提交
433
  SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
D
dapan1121 已提交
434
  if (SCH_IS_QUERY_JOB(pJob) && pLevel->taskNum > 1) {
D
dapan1121 已提交
435
    SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
D
dapan 已提交
436 437 438
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

D
dapan1121 已提交
439 440 441
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
442
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
443
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
D
dapan1121 已提交
444
  if (NULL == addr) {
L
Liu Jicong 已提交
445 446
    SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
                  (int32_t)taosArrayGetSize(pTask->candidateAddrs));
D
dapan1121 已提交
447 448
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
D
dapan1121 已提交
449 450

  pTask->succeedAddr = *addr;
451

D
dapan1121 已提交
452
  return TSDB_CODE_SUCCESS;
453 454
}

D
dapan1121 已提交
455 456
int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle) {
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = handle};
L
Liu Jicong 已提交
457

D
dapan1121 已提交
458 459
  if (NULL == taosArrayPush(pTask->execNodes, &nodeInfo)) {
    SCH_TASK_ELOG("taosArrayPush nodeInfo to execNodes list failed, errno:%d", errno);
D
dapan1121 已提交
460 461 462
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
463 464
  SCH_TASK_DLOG("task execNode recorded, handle:%p", handle);

D
dapan1121 已提交
465
  return TSDB_CODE_SUCCESS;
466
}
D
dapan1121 已提交
467

X
Xiaoyu Wang 已提交
468
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
D
dapan1121 已提交
469
  int32_t code = 0;
D
dapan1121 已提交
470
  pJob->queryId = pDag->queryId;
L
Liu Jicong 已提交
471

D
dapan1121 已提交
472 473
  if (pDag->numOfSubplans <= 0) {
    SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
D
dapan 已提交
474 475
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
L
Liu Jicong 已提交
476

X
Xiaoyu Wang 已提交
477
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
478
  if (levelNum <= 0) {
D
dapan1121 已提交
479
    SCH_JOB_ELOG("invalid level num:%d", levelNum);
D
dapan1121 已提交
480
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
481 482
  }

L
Liu Jicong 已提交
483 484 485 486
  SHashObj *planToTask = taosHashInit(
      SCHEDULE_DEFAULT_MAX_TASK_NUM,
      taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
      HASH_NO_LOCK);
D
dapan1121 已提交
487
  if (NULL == planToTask) {
D
dapan1121 已提交
488
    SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
D
dapan1121 已提交
489 490
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
491

D
dapan1121 已提交
492 493
  pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
  if (NULL == pJob->levels) {
D
dapan1121 已提交
494
    SCH_JOB_ELOG("taosArrayInit %d failed", levelNum);
D
dapan1121 已提交
495
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
496 497
  }

D
dapan1121 已提交
498 499
  pJob->levelNum = levelNum;
  pJob->levelIdx = levelNum - 1;
500

D
dapan1121 已提交
501
  pJob->subPlans = pDag->pSubplans;
502

L
Liu Jicong 已提交
503
  SSchLevel      level = {0};
X
Xiaoyu Wang 已提交
504
  SNodeListNode *plans = NULL;
L
Liu Jicong 已提交
505
  int32_t        taskNum = 0;
X
Xiaoyu Wang 已提交
506
  SSchLevel     *pLevel = NULL;
507

D
dapan1121 已提交
508
  level.status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
509

510
  for (int32_t i = 0; i < levelNum; ++i) {
D
dapan1121 已提交
511
    if (NULL == taosArrayPush(pJob->levels, &level)) {
D
dapan1121 已提交
512
      SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i);
D
dapan1121 已提交
513 514 515
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

D
dapan1121 已提交
516
    pLevel = taosArrayGet(pJob->levels, i);
D
dapan1121 已提交
517
    pLevel->level = i;
L
Liu Jicong 已提交
518 519

    plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
D
dapan1121 已提交
520 521
    if (NULL == plans) {
      SCH_JOB_ELOG("empty level plan, level:%d", i);
D
dapan1121 已提交
522
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
523 524
    }

X
Xiaoyu Wang 已提交
525
    taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
D
dapan1121 已提交
526 527
    if (taskNum <= 0) {
      SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
D
dapan1121 已提交
528
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
529 530
    }

D
dapan1121 已提交
531
    pLevel->taskNum = taskNum;
L
Liu Jicong 已提交
532

D
dapan1121 已提交
533
    pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
D
dapan1121 已提交
534
    if (NULL == pLevel->subTasks) {
D
dapan1121 已提交
535
      SCH_JOB_ELOG("taosArrayInit %d failed", taskNum);
D
dapan1121 已提交
536
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
537
    }
L
Liu Jicong 已提交
538

D
dapan1121 已提交
539
    for (int32_t n = 0; n < taskNum; ++n) {
L
Liu Jicong 已提交
540
      SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
D
dapan 已提交
541

D
dapan1121 已提交
542
      SCH_SET_JOB_TYPE(pJob, plan->subplanType);
D
dapan1121 已提交
543 544 545

      SSchTask  task = {0};
      SSchTask *pTask = &task;
546

D
dapan1121 已提交
547
      SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel));
L
Liu Jicong 已提交
548

D
dapan1121 已提交
549
      void *p = taosArrayPush(pLevel->subTasks, &task);
D
dapan1121 已提交
550
      if (NULL == p) {
D
dapan1121 已提交
551
        SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
D
dapan1121 已提交
552 553
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
554

D
dapan1121 已提交
555
      if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
D
dapan1121 已提交
556
        SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
D
dapan1121 已提交
557 558
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
559 560

      ++pJob->taskNum;
D
dapan1121 已提交
561
    }
D
dapan1121 已提交
562

D
dapan1121 已提交
563
    SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum);
D
dapan1121 已提交
564
  }
D
dapan1121 已提交
565 566

  SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
D
dapan1121 已提交
567 568

_return:
D
dapan1121 已提交
569 570 571 572
  if (planToTask) {
    taosHashCleanup(planToTask);
  }

D
dapan1121 已提交
573
  SCH_RET(code);
574 575
}

D
dapan1121 已提交
576 577
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
  if (NULL != pTask->candidateAddrs) {
D
dapan 已提交
578 579 580
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
581
  pTask->candidateIdx = 0;
582
  pTask->candidateAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
D
dapan1121 已提交
583
  if (NULL == pTask->candidateAddrs) {
584
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CANDIDATE_EP_NUM);
D
dapan1121 已提交
585 586 587
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

L
Liu Jicong 已提交
588
  if (pTask->plan->execNode.epSet.numOfEps > 0) {
D
dapan1121 已提交
589 590
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
D
dapan1121 已提交
591 592 593
      SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

L
Liu Jicong 已提交
594
    SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
D
dapan1121 已提交
595

D
dapan1121 已提交
596 597 598 599
    return TSDB_CODE_SUCCESS;
  }

  int32_t addNum = 0;
D
dapan 已提交
600
  int32_t nodeNum = 0;
601
  if (pJob->nodeList) {
D
dapan 已提交
602
    nodeNum = taosArrayGetSize(pJob->nodeList);
L
Liu Jicong 已提交
603

604
    for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
D
dapan 已提交
605
      SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i);
L
Liu Jicong 已提交
606

607
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
D
dapan 已提交
608 609 610
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
611 612

      ++addNum;
D
dapan1121 已提交
613
    }
D
dapan1121 已提交
614 615
  }

D
dapan1121 已提交
616
  if (addNum <= 0) {
H
Haojun Liao 已提交
617
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
D
dapan1121 已提交
618
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
619 620
  }

L
Liu Jicong 已提交
621 622 623 624 625 626 627 628
  /*
    for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
      strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
      epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];

      ++epSet->numOfEps;
    }
  */
D
dapan 已提交
629 630

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
631
}
D
dapan1121 已提交
632

D
dapan1121 已提交
633
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
634 635 636
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
637
      SCH_TASK_ELOG("task already in execTask list, code:%x", code);
D
dapan1121 已提交
638 639
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }
L
Liu Jicong 已提交
640

D
dapan1121 已提交
641
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
D
dapan 已提交
642 643 644
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
645
  SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
646

D
dapan 已提交
647 648 649
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
650 651
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
652
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan 已提交
653 654
  } else {
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
D
dapan 已提交
655 656
  }

D
dapan1121 已提交
657 658 659 660
  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
D
dapan1121 已提交
661
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
662 663
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
664

D
dapan1121 已提交
665
    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno);
D
dapan1121 已提交
666 667 668 669
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
D
dapan1121 已提交
670 671

  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));
L
Liu Jicong 已提交
672

D
dapan1121 已提交
673 674 675
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
676 677
int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  *moved = false;
L
Liu Jicong 已提交
678

D
dapan1121 已提交
679
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
680
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
681 682
  }

D
dapan1121 已提交
683 684 685 686
  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
L
Liu Jicong 已提交
687

D
dapan1121 已提交
688
      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
689 690
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
691

D
dapan1121 已提交
692
    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno);
D
dapan 已提交
693 694 695 696
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
D
dapan1121 已提交
697 698

  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));
L
Liu Jicong 已提交
699

D
dapan 已提交
700 701 702
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
703 704
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
705
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
706 707 708 709 710 711
  }

  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
L
Liu Jicong 已提交
712

D
dapan1121 已提交
713
      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
714 715
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
716

D
dapan1121 已提交
717 718 719 720 721 722 723
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
L
Liu Jicong 已提交
724

D
dapan1121 已提交
725 726 727
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
728
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
D
dapan1121 已提交
729 730
  int8_t status = 0;
  ++pTask->tryTimes;
L
Liu Jicong 已提交
731

D
dapan1121 已提交
732 733 734 735 736
  if (schJobNeedToStop(pJob, &status)) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry cause of job status, job status:%s", jobTaskStatusStr(status));
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
737

D
dapan1121 已提交
738 739 740 741 742
  if (pTask->tryTimes >= REQUEST_MAX_TRY_TIMES) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry since reach max try times, tryTimes:%d", pTask->tryTimes);
    return TSDB_CODE_SUCCESS;
  }
L
Liu Jicong 已提交
743

D
dapan1121 已提交
744 745 746 747 748
  if (!NEED_SCHEDULER_RETRY_ERROR(errCode)) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
749

L
Liu Jicong 已提交
750
  // TODO CHECK epList/condidateList
D
dapan1121 已提交
751 752 753
  if (SCH_IS_DATA_SRC_TASK(pTask)) {
    if (pTask->tryTimes >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
      *needRetry = false;
L
Liu Jicong 已提交
754 755
      SCH_TASK_DLOG("task no more retry since all ep tried, tryTimes:%d, epNum:%d", pTask->tryTimes,
                    SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
D
dapan1121 已提交
756 757
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
758 759
  } else {
    int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
L
Liu Jicong 已提交
760

D
dapan1121 已提交
761
    if ((pTask->candidateIdx + 1) >= candidateNum) {
D
dapan1121 已提交
762
      *needRetry = false;
L
Liu Jicong 已提交
763 764
      SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
                    pTask->candidateIdx, candidateNum);
D
dapan1121 已提交
765 766 767 768
      return TSDB_CODE_SUCCESS;
    }
  }

D
dapan1121 已提交
769 770
  *needRetry = true;
  SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->tryTimes, errCode, tstrerror(errCode));
L
Liu Jicong 已提交
771

D
dapan1121 已提交
772
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
773 774 775 776 777 778 779
}

int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
  atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);

  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));
D
dapan1121 已提交
780
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
D
dapan1121 已提交
781 782
  }

D
dapan1121 已提交
783
  if (SCH_IS_DATA_SRC_TASK(pTask)) {
D
dapan1121 已提交
784 785 786 787 788 789 790 791
    SCH_SWITCH_EPSET(&pTask->plan->execNode);
  } else {
    ++pTask->candidateIdx;
  }

  SCH_ERR_RET(schLaunchTask(pJob, pTask));

  return TSDB_CODE_SUCCESS;
D
dapan 已提交
792 793
}

D
dapan1121 已提交
794
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
D
dapan1121 已提交
795 796 797 798 799 800
  int32_t      code = 0;
  SSchHbTrans *hb = NULL;

  hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
    qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
D
dapan1121 已提交
801
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
802 803
  }

D
dapan1121 已提交
804
  SCH_LOCK(SCH_WRITE, &hb->lock);
D
dapan1121 已提交
805
  memcpy(&hb->trans, trans, sizeof(*trans));
D
dapan1121 已提交
806
  SCH_UNLOCK(SCH_WRITE, &hb->lock);
D
dapan1121 已提交
807

L
Liu Jicong 已提交
808 809
  qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, handle:%p", schMgmt.sId,
         epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->transInst, trans->transHandle);
L
Liu Jicong 已提交
810

D
dapan1121 已提交
811 812 813
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
  if (TSDB_CODE_SUCCESS == errCode) {
    return;
  }

  int32_t origCode = atomic_load_32(&pJob->errCode);
  if (TSDB_CODE_SUCCESS == origCode) {
    if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) {
      goto _return;
    }

    origCode = atomic_load_32(&pJob->errCode);
  }

  if (NEED_CLIENT_HANDLE_ERROR(origCode)) {
    return;
  }
L
Liu Jicong 已提交
831

D
dapan1121 已提交
832
  if (NEED_CLIENT_HANDLE_ERROR(errCode)) {
D
dapan1121 已提交
833
    atomic_store_32(&pJob->errCode, errCode);
D
dapan1121 已提交
834
    goto _return;
D
dapan1121 已提交
835 836
  }

D
dapan1121 已提交
837
  return;
L
Liu Jicong 已提交
838 839

_return:
D
dapan1121 已提交
840 841 842 843 844 845 846 847 848 849

  SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode));
}

int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) {
  // if already FAILED, no more processing
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, status));

  schUpdateJobErrCode(pJob, errCode);

D
dapan1121 已提交
850
  if (atomic_load_8(&pJob->userFetch) || pJob->attr.syncSchedule) {
D
dapan1121 已提交
851 852 853
    tsem_post(&pJob->rspSem);
  }

D
dapan1121 已提交
854
  int32_t code = atomic_load_32(&pJob->errCode);
D
dapan1121 已提交
855

D
dapan1121 已提交
856
  SCH_JOB_DLOG("job failed with error: %s", tstrerror(code));
L
Liu Jicong 已提交
857

D
dapan1121 已提交
858
  SCH_RET(code);
D
dapan1121 已提交
859 860
}

D
dapan1121 已提交
861
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
862 863 864 865 866 867 868 869 870
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
  SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAILED, errCode));
}

// Note: no more error processing, handled in function internal
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) {
  SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_DROPPING, errCode));
}

D
dapan1121 已提交
871
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
872 873
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
  int32_t code = 0;
L
Liu Jicong 已提交
874

D
dapan1121 已提交
875
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_PARTIAL_SUCCEED));
D
dapan 已提交
876

D
dapan1121 已提交
877
  if (pJob->attr.syncSchedule) {
D
dapan 已提交
878
    tsem_post(&pJob->rspSem);
D
dapan 已提交
879
  }
L
Liu Jicong 已提交
880

D
dapan1121 已提交
881 882 883
  if (atomic_load_8(&pJob->userFetch)) {
    SCH_ERR_JRET(schFetchFromRemote(pJob));
  }
D
dapan 已提交
884

D
dapan 已提交
885
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
886 887 888

_return:

D
dapan1121 已提交
889
  SCH_RET(schProcessOnJobFailure(pJob, code));
D
dapan 已提交
890 891
}

892
void schProcessOnDataFetched(SSchJob *job) {
D
dapan 已提交
893 894
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
  tsem_post(&job->rspSem);
D
dapan 已提交
895 896
}

D
dapan1121 已提交
897
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
898
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
D
dapan1121 已提交
899
  int8_t status = 0;
L
Liu Jicong 已提交
900

D
dapan1121 已提交
901
  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
902
    SCH_TASK_DLOG("task failed not processed cause of job status, job status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
903 904 905
    SCH_RET(atomic_load_32(&pJob->errCode));
  }

L
Liu Jicong 已提交
906 907
  bool    needRetry = false;
  bool    moved = false;
D
dapan1121 已提交
908
  int32_t taskDone = 0;
D
dapan1121 已提交
909
  int32_t code = 0;
D
dapan1121 已提交
910

H
Haojun Liao 已提交
911
  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
L
Liu Jicong 已提交
912

D
dapan1121 已提交
913
  SCH_ERR_JRET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
L
Liu Jicong 已提交
914

D
dapan1121 已提交
915
  if (!needRetry) {
H
Haojun Liao 已提交
916
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
D
dapan1121 已提交
917 918

    if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
D
dapan1121 已提交
919 920
      SCH_ERR_JRET(schMoveTaskToFailList(pJob, pTask, &moved));
    } else {
D
dapan1121 已提交
921
      SCH_TASK_ELOG("task not in executing list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
922
      SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
923 924 925
    }

    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED);
L
Liu Jicong 已提交
926

D
dapan1121 已提交
927
    if (SCH_IS_WAIT_ALL_JOB(pJob)) {
D
dapan1121 已提交
928 929 930 931 932
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskFailed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);

D
dapan1121 已提交
933
      schUpdateJobErrCode(pJob, errCode);
L
Liu Jicong 已提交
934

D
dapan1121 已提交
935
      if (taskDone < pTask->level->taskNum) {
L
Liu Jicong 已提交
936
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
D
dapan 已提交
937
        SCH_RET(errCode);
D
dapan1121 已提交
938 939 940
      }
    }
  } else {
D
dapan1121 已提交
941
    SCH_ERR_JRET(schHandleTaskRetry(pJob, pTask));
L
Liu Jicong 已提交
942

D
dapan 已提交
943 944
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
945

D
dapan1121 已提交
946 947
_return:

D
dapan1121 已提交
948
  SCH_RET(schProcessOnJobFailure(pJob, errCode));
D
dapan1121 已提交
949 950
}

D
dapan1121 已提交
951
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
952
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
953
  bool    moved = false;
D
dapan1121 已提交
954 955
  int32_t code = 0;

D
dapan1121 已提交
956
  SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan 已提交
957

D
dapan1121 已提交
958
  SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved));
D
dapan1121 已提交
959

D
dapan1121 已提交
960
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED);
D
dapan1121 已提交
961

D
dapan1121 已提交
962
  SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask));
D
dapan1121 已提交
963 964

  SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
L
Liu Jicong 已提交
965

D
dapan1121 已提交
966
  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
D
dapan 已提交
967
  if (parentNum == 0) {
L
Liu Jicong 已提交
968
    int32_t taskDone = 0;
D
dapan1121 已提交
969
    if (SCH_IS_WAIT_ALL_JOB(pJob)) {
D
dapan1121 已提交
970 971 972 973
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskSucceed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
L
Liu Jicong 已提交
974

D
dapan1121 已提交
975
      if (taskDone < pTask->level->taskNum) {
S
Shengliang Guan 已提交
976
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
977
        return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
978
      } else if (taskDone > pTask->level->taskNum) {
D
dapan1121 已提交
979
        SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
980 981
      }

D
dapan1121 已提交
982
      if (pTask->level->taskFailed > 0) {
D
dapan1121 已提交
983 984 985
        SCH_RET(schProcessOnJobFailure(pJob, 0));
      } else {
        SCH_RET(schProcessOnJobPartialSuccess(pJob));
D
dapan1121 已提交
986 987
      }
    } else {
D
dapan1121 已提交
988
      pJob->resNode = pTask->succeedAddr;
D
dapan1121 已提交
989
    }
D
dapan 已提交
990

D
dapan1121 已提交
991
    pJob->fetchTask = pTask;
D
dapan1121 已提交
992

D
dapan1121 已提交
993
    SCH_ERR_JRET(schMoveTaskToExecList(pJob, pTask, &moved));
L
Liu Jicong 已提交
994

D
dapan1121 已提交
995
    SCH_RET(schProcessOnJobPartialSuccess(pJob));
D
dapan 已提交
996 997
  }

L
Liu Jicong 已提交
998 999 1000 1001
  /*
    if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
      strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
      job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
D
dapan 已提交
1002

L
Liu Jicong 已提交
1003 1004 1005
      ++job->dataSrcEps.numOfEps;
    }
  */
D
dapan 已提交
1006

D
dapan 已提交
1007
  for (int32_t i = 0; i < parentNum; ++i) {
D
dapan1121 已提交
1008
    SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i);
L
Liu Jicong 已提交
1009
    int32_t   readyNum = atomic_add_fetch_32(&par->childReady, 1);
D
dapan 已提交
1010

D
dapan1121 已提交
1011
    SCH_LOCK(SCH_WRITE, &par->lock);
L
Liu Jicong 已提交
1012 1013 1014 1015
    SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE,
                                    .taskId = pTask->taskId,
                                    .schedId = schMgmt.sId,
                                    .addr = pTask->succeedAddr};
X
Xiaoyu Wang 已提交
1016
    qSetSubplanExecutionNode(par->plan, pTask->plan->id.groupId, &source);
D
dapan1121 已提交
1017
    SCH_UNLOCK(SCH_WRITE, &par->lock);
L
Liu Jicong 已提交
1018

D
dapan 已提交
1019
    if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) {
D
dapan1121 已提交
1020
      SCH_ERR_RET(schLaunchTaskImpl(pJob, par));
D
dapan 已提交
1021 1022 1023 1024 1025
    }
  }

  return TSDB_CODE_SUCCESS;

D
dapan1121 已提交
1026
_return:
D
dapan 已提交
1027

D
dapan1121 已提交
1028 1029
  SCH_RET(schProcessOnJobFailure(pJob, code));
}
D
dapan 已提交
1030

D
dapan1121 已提交
1031 1032 1033
// Note: no more error processing, handled in function internal
int32_t schFetchFromRemote(SSchJob *pJob) {
  int32_t code = 0;
L
Liu Jicong 已提交
1034

D
dapan1121 已提交
1035 1036 1037 1038 1039
  if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) {
    SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch));
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1040 1041
  void *resData = atomic_load_ptr(&pJob->resData);
  if (resData) {
D
dapan1121 已提交
1042 1043
    atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1044
    SCH_JOB_DLOG("res already fetched, res:%p", resData);
D
dapan1121 已提交
1045 1046 1047 1048 1049 1050
    return TSDB_CODE_SUCCESS;
  }

  SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH));

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1051

D
dapan1121 已提交
1052 1053 1054 1055
_return:

  atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1056
  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
D
dapan1121 已提交
1057 1058
}

D
dapan1121 已提交
1059 1060
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
  SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
H
Hongze Cheng 已提交
1061

D
dapan1121 已提交
1062 1063 1064 1065
  atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
  atomic_store_ptr(&pJob->resData, pRsp);

  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
H
Hongze Cheng 已提交
1066

D
dapan1121 已提交
1067 1068 1069 1070 1071
  schProcessOnDataFetched(pJob);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1072
// Note: no more task error processing, handled in function internal
L
Liu Jicong 已提交
1073 1074
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize,
                             int32_t rspCode) {
D
dapan1121 已提交
1075
  int32_t code = 0;
L
Liu Jicong 已提交
1076 1077
  int8_t  status = 0;

D
dapan1121 已提交
1078
  if (schJobNeedToStop(pJob, &status)) {
L
Liu Jicong 已提交
1079 1080
    SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status),
                  rspCode);
D
dapan1121 已提交
1081 1082
    SCH_RET(atomic_load_32(&pJob->errCode));
  }
H
Haojun Liao 已提交
1083

D
dapan1121 已提交
1084 1085
  SCH_ERR_JRET(schValidateTaskReceivedMsgType(pJob, pTask, msgType));

D
dapan1121 已提交
1086
  switch (msgType) {
H
Haojun Liao 已提交
1087
    case TDMT_VND_CREATE_TABLE_RSP: {
X
Xiaoyu Wang 已提交
1088 1089
      SVCreateTbBatchRsp batchRsp = {0};
      if (msg) {
H
Hongze Cheng 已提交
1090 1091
        SDecoder coder = {0};
        tDecoderInit(&coder, msg, msgSize);
1092 1093 1094 1095
        code = tDecodeSVCreateTbBatchRsp(&coder, &batchRsp);
        if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
          for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
            SVCreateTbRsp *rsp = batchRsp.pRsps + i;
X
Xiaoyu Wang 已提交
1096
            if (TSDB_CODE_SUCCESS != rsp->code) {
1097
              code = rsp->code;
X
Xiaoyu Wang 已提交
1098 1099
              tDecoderClear(&coder);
              SCH_ERR_JRET(code);
D
dapan 已提交
1100 1101
            }
          }
D
dapan1121 已提交
1102
        }
H
Hongze Cheng 已提交
1103
        tDecoderClear(&coder);
1104
        SCH_ERR_JRET(code);
L
Liu Jicong 已提交
1105 1106
      }

L
Liu Jicong 已提交
1107 1108 1109 1110
      SCH_ERR_JRET(rspCode);
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
      break;
    }
X
Xiaoyu Wang 已提交
1111 1112 1113
    case TDMT_VND_DROP_TABLE_RSP: {
      SVDropTbBatchRsp batchRsp = {0};
      if (msg) {
H
Hongze Cheng 已提交
1114 1115
        SDecoder coder = {0};
        tDecoderInit(&coder, msg, msgSize);
X
Xiaoyu Wang 已提交
1116
        code = tDecodeSVDropTbBatchRsp(&coder, &batchRsp);
1117 1118 1119
        if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
          for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
            SVDropTbRsp *rsp = batchRsp.pRsps + i;
X
Xiaoyu Wang 已提交
1120
            if (TSDB_CODE_SUCCESS != rsp->code) {
1121
              code = rsp->code;
X
Xiaoyu Wang 已提交
1122 1123
              tDecoderClear(&coder);
              SCH_ERR_JRET(code);
X
Xiaoyu Wang 已提交
1124 1125 1126
            }
          }
        }
H
Hongze Cheng 已提交
1127
        tDecoderClear(&coder);
X
Xiaoyu Wang 已提交
1128 1129 1130 1131 1132 1133 1134
        SCH_ERR_JRET(code);
      }

      SCH_ERR_JRET(rspCode);
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
      break;
    }
X
Xiaoyu Wang 已提交
1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150
    case TDMT_VND_ALTER_TABLE_RSP: {
      SVAlterTbRsp rsp = {0};
      if (msg) {
        SDecoder coder = {0};
        tDecoderInit(&coder, msg, msgSize);
        code = tDecodeSVAlterTbRsp(&coder, &rsp);
        tDecoderClear(&coder);
        SCH_ERR_JRET(code);
        SCH_ERR_JRET(rsp.code);
      }

      SCH_ERR_JRET(rspCode);

      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }
X
Xiaoyu Wang 已提交
1151
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
X
Xiaoyu Wang 已提交
1152 1153
      break;
    }
D
dapan1121 已提交
1154
    case TDMT_VND_SUBMIT_RSP: {
L
Liu Jicong 已提交
1155
      SCH_ERR_JRET(rspCode);
D
dapan1121 已提交
1156

D
dapan 已提交
1157
      if (msg) {
X
Xiaoyu Wang 已提交
1158
        SDecoder    coder = {0};
D
dapan 已提交
1159 1160 1161 1162 1163 1164 1165 1166
        SSubmitRsp *rsp = taosMemoryMalloc(sizeof(*rsp));
        tDecoderInit(&coder, msg, msgSize);
        code = tDecodeSSubmitRsp(&coder, rsp);
        if (code) {
          SCH_TASK_ELOG("decode submitRsp failed, code:%d", code);
          tFreeSSubmitRsp(rsp);
          SCH_ERR_JRET(code);
        }
X
Xiaoyu Wang 已提交
1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178

        if (rsp->nBlocks > 0) {
          for (int32_t i = 0; i < rsp->nBlocks; ++i) {
            SSubmitBlkRsp *blk = rsp->pBlocks + i;
            if (TSDB_CODE_SUCCESS != blk->code) {
              code = blk->code;
              tFreeSSubmitRsp(rsp);
              SCH_ERR_JRET(code);
            }
          }
        }

D
dapan 已提交
1179 1180
        atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
        SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows);
X
Xiaoyu Wang 已提交
1181

D
dapan 已提交
1182 1183 1184 1185 1186 1187 1188 1189 1190 1191
        if (pJob->attr.needRes) {
          SCH_LOCK(SCH_WRITE, &pJob->resLock);
          if (pJob->resData) {
            SSubmitRsp *sum = pJob->resData;
            sum->affectedRows += rsp->affectedRows;
            sum->nBlocks += rsp->nBlocks;
            sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks));
            memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks));
            taosMemoryFree(rsp->pBlocks);
            taosMemoryFree(rsp);
X
Xiaoyu Wang 已提交
1192
          } else {
D
dapan 已提交
1193 1194 1195 1196 1197 1198
            pJob->resData = rsp;
          }
          SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
        } else {
          tFreeSSubmitRsp(rsp);
        }
L
Liu Jicong 已提交
1199
      }
D
dapan1121 已提交
1200

L
Liu Jicong 已提交
1201
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
D
dapan1121 已提交
1202

L
Liu Jicong 已提交
1203 1204
      break;
    }
D
dapan1121 已提交
1205
    case TDMT_VND_QUERY_RSP: {
L
Liu Jicong 已提交
1206 1207
      SQueryTableRsp rsp = {0};
      if (msg) {
D
dapan1121 已提交
1208
        SCH_ERR_JRET(tDeserializeSQueryTableRsp(msg, msgSize, &rsp));
L
Liu Jicong 已提交
1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220
        SCH_ERR_JRET(rsp.code);
      }

      SCH_ERR_JRET(rspCode);

      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }

      // SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY));

      break;
L
Liu Jicong 已提交
1221
    }
D
dapan1121 已提交
1222
    case TDMT_VND_RES_READY_RSP: {
L
Liu Jicong 已提交
1223 1224 1225 1226 1227
      SResReadyRsp *rsp = (SResReadyRsp *)msg;

      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
1228
      }
L
Liu Jicong 已提交
1229 1230
      SCH_ERR_JRET(rsp->code);
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
D
dapan1121 已提交
1231

L
Liu Jicong 已提交
1232 1233
      break;
    }
D
dapan1121 已提交
1234 1235 1236 1237 1238
    case TDMT_VND_EXPLAIN_RSP: {
      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }
H
Hongze Cheng 已提交
1239

D
dapan1121 已提交
1240 1241 1242 1243 1244 1245 1246 1247 1248 1249
      if (!SCH_IS_EXPLAIN_JOB(pJob)) {
        SCH_TASK_ELOG("invalid msg received for none explain query, msg type:%s", TMSG_INFO(msgType));
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }

      if (pJob->resData) {
        SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->resData);
        SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
      }

D
dapan1121 已提交
1250 1251 1252 1253 1254 1255
      SExplainRsp rsp = {0};
      if (tDeserializeSExplainRsp(msg, msgSize, &rsp)) {
        taosMemoryFree(rsp.subplanInfo);
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

D
dapan1121 已提交
1256
      SRetrieveTableRsp *pRsp = NULL;
D
dapan1121 已提交
1257
      SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, &rsp, pTask->plan->id.groupId, &pRsp));
D
dapan1121 已提交
1258 1259

      if (pRsp) {
D
dapan1121 已提交
1260
        SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
D
dapan1121 已提交
1261 1262 1263
      }
      break;
    }
L
Liu Jicong 已提交
1264 1265
    case TDMT_VND_FETCH_RSP: {
      SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
D
dapan1121 已提交
1266

L
Liu Jicong 已提交
1267 1268 1269 1270
      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }
D
dapan1121 已提交
1271

D
dapan1121 已提交
1272
      if (SCH_IS_EXPLAIN_JOB(pJob)) {
H
Hongze Cheng 已提交
1273
        if (rsp->completed) {
D
dapan1121 已提交
1274 1275 1276 1277 1278
          SRetrieveTableRsp *pRsp = NULL;
          SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
          if (pRsp) {
            SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
          }
H
Hongze Cheng 已提交
1279

D
dapan1121 已提交
1280 1281 1282
          return TSDB_CODE_SUCCESS;
        }

D
dapan1121 已提交
1283 1284
        atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1285 1286 1287 1288 1289
        SCH_ERR_JRET(schFetchFromRemote(pJob));

        return TSDB_CODE_SUCCESS;
      }

X
Xiaoyu Wang 已提交
1290 1291
      if (pJob->resData) {
        SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->resData);
wafwerar's avatar
wafwerar 已提交
1292
        taosMemoryFreeClear(rsp);
L
Liu Jicong 已提交
1293 1294
        SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
      }
H
Haojun Liao 已提交
1295

X
Xiaoyu Wang 已提交
1296
      atomic_store_ptr(&pJob->resData, rsp);
L
Liu Jicong 已提交
1297
      atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
D
dapan1121 已提交
1298

L
Liu Jicong 已提交
1299 1300
      if (rsp->completed) {
        SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
1301
      }
L
Liu Jicong 已提交
1302 1303 1304 1305 1306 1307

      SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);

      schProcessOnDataFetched(pJob);
      break;
    }
D
dapan1121 已提交
1308
    case TDMT_VND_DROP_TASK_RSP: {
L
Liu Jicong 已提交
1309 1310 1311 1312 1313
      // SHOULD NEVER REACH HERE
      SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:%" PRIx64, pJob->refId);
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
      break;
    }
D
dapan1121 已提交
1314 1315 1316 1317
    case TDMT_SCH_LINK_BROKEN:
      SCH_TASK_ELOG("link broken received, error:%x - %s", rspCode, tstrerror(rspCode));
      SCH_ERR_JRET(rspCode);
      break;
D
dapan1121 已提交
1318
    default:
D
dapan1121 已提交
1319
      SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%s", msgType, SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
1320
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
1321 1322 1323 1324 1325
  }

  return TSDB_CODE_SUCCESS;

_return:
D
dapan1121 已提交
1326

D
dapan1121 已提交
1327
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
1328 1329
}

D
dapan1121 已提交
1330
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
H
Hongze Cheng 已提交
1331 1332 1333 1334
  int32_t s = taosHashGetSize(pTaskList);
  if (s <= 0) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1335

H
Hongze Cheng 已提交
1336 1337 1338 1339
  SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
  if (NULL == task || NULL == (*task)) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1340

H
Hongze Cheng 已提交
1341 1342 1343
  *pTask = *task;

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1344 1345
}

D
dapan1121 已提交
1346
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) {
H
Hongze Cheng 已提交
1347 1348
  if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 ||
      taosArrayGetSize(pTask->execNodes) <= 0) {
D
dapan1121 已提交
1349 1350 1351 1352 1353 1354 1355 1356 1357
    return TSDB_CODE_SUCCESS;
  }

  SSchNodeInfo *nodeInfo = taosArrayGet(pTask->execNodes, 0);
  nodeInfo->handle = handle;

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1358
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
L
Liu Jicong 已提交
1359
  int32_t                code = 0;
D
dapan1121 已提交
1360
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
X
Xiaoyu Wang 已提交
1361
  SSchTask              *pTask = NULL;
L
Liu Jicong 已提交
1362

D
dapan1121 已提交
1363
  SSchJob *pJob = schAcquireJob(pParam->refId);
D
dapan1121 已提交
1364
  if (NULL == pJob) {
D
dapan1121 已提交
1365
    qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:%" PRIx64,
L
Liu Jicong 已提交
1366
          pParam->queryId, pParam->taskId, pParam->refId);
D
dapan1121 已提交
1367
    SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
1368 1369
  }

D
dapan1121 已提交
1370 1371 1372 1373 1374
  schGetTaskFromTaskList(pJob->execTasks, pParam->taskId, &pTask);
  if (NULL == pTask) {
    if (TDMT_VND_EXPLAIN_RSP == msgType) {
      schGetTaskFromTaskList(pJob->succTasks, pParam->taskId, &pTask);
    } else {
H
Hongze Cheng 已提交
1375 1376
      SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
                   pParam->taskId);
D
dapan1121 已提交
1377 1378
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }
D
dapan1121 已提交
1379
  }
H
Hongze Cheng 已提交
1380

D
dapan1121 已提交
1381
  if (NULL == pTask) {
H
Hongze Cheng 已提交
1382 1383
    SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
                 pParam->taskId);
D
dapan1121 已提交
1384 1385
    SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
D
dapan1121 已提交
1386

D
dapan1121 已提交
1387
  SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
L
Liu Jicong 已提交
1388

L
Liu Jicong 已提交
1389
  SCH_SET_TASK_HANDLE(pTask, pMsg->handle);
D
dapan1121 已提交
1390
  schUpdateTaskExecNodeHandle(pTask, pMsg->handle, rspCode);
D
dapan1121 已提交
1391
  SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
D
dapan1121 已提交
1392

H
Haojun Liao 已提交
1393
_return:
D
dapan1121 已提交
1394
  if (pJob) {
D
dapan1121 已提交
1395
    schReleaseJob(pParam->refId);
D
dapan1121 已提交
1396 1397
  }

wafwerar's avatar
wafwerar 已提交
1398
  taosMemoryFreeClear(param);
D
dapan1121 已提交
1399 1400 1401
  SCH_RET(code);
}

L
Liu Jicong 已提交
1402
int32_t schHandleSubmitCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1403 1404
  return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
}
H
Haojun Liao 已提交
1405

L
Liu Jicong 已提交
1406
int32_t schHandleCreateTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
H
Haojun Liao 已提交
1407 1408 1409
  return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
}

X
Xiaoyu Wang 已提交
1410 1411 1412 1413
int32_t schHandleDropTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_DROP_TABLE_RSP, code);
}

X
Xiaoyu Wang 已提交
1414 1415 1416 1417
int32_t schHandleAlterTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_ALTER_TABLE_RSP, code);
}

L
Liu Jicong 已提交
1418
int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1419 1420
  return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
H
Haojun Liao 已提交
1421

L
Liu Jicong 已提交
1422
int32_t schHandleFetchCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1423 1424
  return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
}
H
Haojun Liao 已提交
1425

L
Liu Jicong 已提交
1426
int32_t schHandleReadyCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1427 1428
  return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
}
H
Haojun Liao 已提交
1429

D
dapan1121 已提交
1430 1431 1432 1433
int32_t schHandleExplainCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_EXPLAIN_RSP, code);
}

L
Liu Jicong 已提交
1434
int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1435
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
L
Liu Jicong 已提交
1436
  qDebug("QID:%" PRIx64 ",TID:%" PRIx64 " drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code);
1437
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1438 1439
}

L
Liu Jicong 已提交
1440
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1441 1442 1443 1444
  if (code) {
    qError("hb rsp error:%s", tstrerror(code));
    SCH_ERR_RET(code);
  }
L
Liu Jicong 已提交
1445

D
dapan1121 已提交
1446 1447 1448 1449 1450 1451
  SSchedulerHbRsp rsp = {0};
  if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) {
    qError("invalid hb rsp msg, size:%d", pMsg->len);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
1452 1453
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;

D
dapan1121 已提交
1454 1455 1456
  SSchTrans trans = {0};
  trans.transInst = pParam->transport;
  trans.transHandle = pMsg->handle;
L
Liu Jicong 已提交
1457

D
dapan1121 已提交
1458
  SCH_ERR_RET(schUpdateHbConnection(&rsp.epId, &trans));
D
dapan1121 已提交
1459 1460

  int32_t taskNum = (int32_t)taosArrayGetSize(rsp.taskStatus);
L
Liu Jicong 已提交
1461 1462
  qDebug("%d task status in hb rsp, nodeId:%d, fqdn:%s, port:%d", taskNum, rsp.epId.nodeId, rsp.epId.ep.fqdn,
         rsp.epId.ep.port);
D
dapan1121 已提交
1463

D
dapan1121 已提交
1464 1465
  for (int32_t i = 0; i < taskNum; ++i) {
    STaskStatus *taskStatus = taosArrayGet(rsp.taskStatus, i);
L
Liu Jicong 已提交
1466

D
dapan1121 已提交
1467 1468
    SSchJob *pJob = schAcquireJob(taskStatus->refId);
    if (NULL == pJob) {
L
Liu Jicong 已提交
1469 1470 1471
      qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId,
            taskStatus->queryId, taskStatus->taskId);
      // TODO DROP TASK FROM SERVER!!!!
D
dapan1121 已提交
1472 1473
      continue;
    }
L
Liu Jicong 已提交
1474

D
dapan1121 已提交
1475
    // TODO
L
Liu Jicong 已提交
1476 1477 1478

    SCH_JOB_DLOG("TID:0x%" PRIx64 " task status in server: %s", taskStatus->taskId,
                 jobTaskStatusStr(taskStatus->status));
D
dapan1121 已提交
1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489

    schReleaseJob(taskStatus->refId);
  }

_return:

  tFreeSSchedulerHbRsp(&rsp);

  SCH_RET(code);
}

D
dapan1121 已提交
1490 1491 1492 1493
int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param;
  rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);

D
dapan1121 已提交
1494 1495
  qDebug("handle %p is broken", pMsg->handle);

D
dapan1121 已提交
1496 1497
  if (head->isHbParam) {
    SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param;
L
Liu Jicong 已提交
1498
    SSchTrans            trans = {.transInst = hbParam->transport, .transHandle = NULL};
D
dapan1121 已提交
1499 1500 1501 1502 1503 1504 1505 1506 1507 1508
    SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans));

    SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId));
  } else {
    SCH_ERR_RET(schHandleCallback(param, pMsg, TDMT_SCH_LINK_BROKEN, code));
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1509
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
D
dapan1121 已提交
1510
  switch (msgType) {
H
Haojun Liao 已提交
1511 1512 1513
    case TDMT_VND_CREATE_TABLE:
      *fp = schHandleCreateTableCallback;
      break;
X
Xiaoyu Wang 已提交
1514 1515 1516
    case TDMT_VND_DROP_TABLE:
      *fp = schHandleDropTableCallback;
      break;
X
Xiaoyu Wang 已提交
1517 1518 1519
    case TDMT_VND_ALTER_TABLE:
      *fp = schHandleAlterTableCallback;
      break;
L
Liu Jicong 已提交
1520
    case TDMT_VND_SUBMIT:
D
dapan1121 已提交
1521 1522
      *fp = schHandleSubmitCallback;
      break;
L
Liu Jicong 已提交
1523
    case TDMT_VND_QUERY:
D
dapan1121 已提交
1524 1525
      *fp = schHandleQueryCallback;
      break;
L
Liu Jicong 已提交
1526
    case TDMT_VND_RES_READY:
D
dapan1121 已提交
1527 1528
      *fp = schHandleReadyCallback;
      break;
D
dapan1121 已提交
1529 1530 1531
    case TDMT_VND_EXPLAIN:
      *fp = schHandleExplainCallback;
      break;
L
Liu Jicong 已提交
1532
    case TDMT_VND_FETCH:
D
dapan1121 已提交
1533 1534 1535 1536 1537
      *fp = schHandleFetchCallback;
      break;
    case TDMT_VND_DROP_TASK:
      *fp = schHandleDropCallback;
      break;
D
dapan1121 已提交
1538 1539 1540
    case TDMT_VND_QUERY_HEARTBEAT:
      *fp = schHandleHbCallback;
      break;
D
dapan1121 已提交
1541 1542 1543
    case TDMT_SCH_LINK_BROKEN:
      *fp = schHandleLinkBrokenCallback;
      break;
D
dapan1121 已提交
1544
    default:
D
dapan1121 已提交
1545
      qError("unknown msg type for callback, msgType:%d", msgType);
D
dapan1121 已提交
1546 1547 1548 1549 1550 1551
      SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1552
int32_t schGenerateTaskCallBackAHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, SMsgSendInfo **pMsgSendInfo) {
H
Hongze Cheng 已提交
1553
  int32_t       code = 0;
D
dapan1121 已提交
1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588
  SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (NULL == msgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));

  param->queryId = pJob->queryId;
  param->refId = pJob->refId;
  param->taskId = SCH_TASK_ID(pTask);
  param->transport = pJob->transport;

  msgSendInfo->param = param;
  msgSendInfo->fp = fp;

  *pMsgSendInfo = msgSendInfo;

  return TSDB_CODE_SUCCESS;

_return:

  taosMemoryFree(param);
  taosMemoryFree(msgSendInfo);

  SCH_RET(code);
}

D
dapan1121 已提交
1589
void schFreeRpcCtxVal(const void *arg) {
D
dapan1121 已提交
1590 1591 1592
  if (NULL == arg) {
    return;
  }
L
Liu Jicong 已提交
1593 1594

  SMsgSendInfo *pMsgSendInfo = (SMsgSendInfo *)arg;
D
dapan1121 已提交
1595 1596
  taosMemoryFreeClear(pMsgSendInfo->param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1597
}
D
dapan1121 已提交
1598

D
dapan1121 已提交
1599
int32_t schMakeTaskCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
D
dapan1121 已提交
1600
  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  param->queryId = pJob->queryId;
  param->refId = pJob->refId;
  param->taskId = SCH_TASK_ID(pTask);
  param->transport = pJob->transport;

  *pParam = param;

  return TSDB_CODE_SUCCESS;
}

int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
D
dapan1121 已提交
1617
  SSchHbCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  param->head.isHbParam = true;

  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);

  param->nodeEpId.nodeId = addr->nodeId;
  memcpy(&param->nodeEpId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
  param->transport = pJob->transport;

  *pParam = param;

  return TSDB_CODE_SUCCESS;
}

int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb) {
L
Liu Jicong 已提交
1637 1638 1639
  int32_t       code = 0;
  SMsgSendInfo *pMsgSendInfo = NULL;

D
dapan1121 已提交
1640
  pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651
  if (NULL == pMsgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  if (isHb) {
    SCH_ERR_JRET(schMakeHbCallbackParam(pJob, pTask, &pMsgSendInfo->param));
  } else {
    SCH_ERR_JRET(schMakeTaskCallbackParam(pJob, pTask, &pMsgSendInfo->param));
  }

L
Liu Jicong 已提交
1652
  int32_t              msgType = TDMT_SCH_LINK_BROKEN;
D
dapan1121 已提交
1653 1654
  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
L
Liu Jicong 已提交
1655

D
dapan1121 已提交
1656 1657 1658 1659
  pMsgSendInfo->fp = fp;

  brokenVal->msgType = msgType;
  brokenVal->val = pMsgSendInfo;
D
dapan1121 已提交
1660
  brokenVal->clone = schCloneSMsgSendInfo;
D
dapan1121 已提交
1661
  brokenVal->freeFunc = schFreeRpcCtxVal;
L
Liu Jicong 已提交
1662

D
dapan1121 已提交
1663 1664 1665 1666
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
1667 1668
  taosMemoryFreeClear(pMsgSendInfo->param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1669 1670 1671 1672

  SCH_RET(code);
}

D
dapan1121 已提交
1673
int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
H
Hongze Cheng 已提交
1674
  int32_t       code = 0;
D
dapan1121 已提交
1675 1676
  SMsgSendInfo *pReadyMsgSendInfo = NULL;
  SMsgSendInfo *pExplainMsgSendInfo = NULL;
D
dapan1121 已提交
1677 1678 1679 1680 1681 1682

  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pCtx->args) {
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1683

D
dapan1121 已提交
1684 1685
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_RES_READY, &pReadyMsgSendInfo));
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_EXPLAIN, &pExplainMsgSendInfo));
D
dapan1121 已提交
1686

H
Hongze Cheng 已提交
1687
  int32_t    msgType = TDMT_VND_RES_READY_RSP;
D
dapan1121 已提交
1688 1689 1690
  SRpcCtxVal ctxVal = {.val = pReadyMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
D
dapan1121 已提交
1691 1692 1693
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1694 1695
  msgType = TDMT_VND_EXPLAIN_RSP;
  ctxVal.val = pExplainMsgSendInfo;
D
dapan1121 已提交
1696 1697 1698 1699
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
1700

D
dapan1121 已提交
1701 1702
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, false));

D
dapan1121 已提交
1703 1704 1705 1706 1707
  return TSDB_CODE_SUCCESS;

_return:

  taosHashCleanup(pCtx->args);
H
Hongze Cheng 已提交
1708

D
dapan1121 已提交
1709 1710 1711 1712 1713 1714 1715 1716 1717
  if (pReadyMsgSendInfo) {
    taosMemoryFreeClear(pReadyMsgSendInfo->param);
    taosMemoryFreeClear(pReadyMsgSendInfo);
  }

  if (pExplainMsgSendInfo) {
    taosMemoryFreeClear(pExplainMsgSendInfo->param);
    taosMemoryFreeClear(pExplainMsgSendInfo);
  }
D
dapan1121 已提交
1718 1719 1720 1721 1722

  SCH_RET(code);
}

int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
L
Liu Jicong 已提交
1723
  int32_t              code = 0;
D
dapan1121 已提交
1724
  SSchHbCallbackParam *param = NULL;
X
Xiaoyu Wang 已提交
1725 1726
  SMsgSendInfo        *pMsgSendInfo = NULL;
  SQueryNodeAddr      *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
L
Liu Jicong 已提交
1727
  SQueryNodeEpId       epId = {0};
D
dapan1121 已提交
1728 1729 1730

  epId.nodeId = addr->nodeId;
  memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
D
dapan1121 已提交
1731 1732 1733 1734 1735 1736

  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pCtx->args) {
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1737

D
dapan1121 已提交
1738
  pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1739 1740 1741 1742 1743
  if (NULL == pMsgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1744
  param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1745
  if (NULL == param) {
D
dapan1121 已提交
1746
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1747 1748 1749
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

L
Liu Jicong 已提交
1750
  int32_t              msgType = TDMT_VND_QUERY_HEARTBEAT_RSP;
D
dapan1121 已提交
1751 1752 1753
  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_QUERY_HEARTBEAT, &fp));

D
dapan1121 已提交
1754
  param->nodeEpId = epId;
D
dapan1121 已提交
1755
  param->transport = pJob->transport;
L
Liu Jicong 已提交
1756

D
dapan1121 已提交
1757 1758 1759
  pMsgSendInfo->param = param;
  pMsgSendInfo->fp = fp;

D
dapan1121 已提交
1760
  SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
D
dapan1121 已提交
1761 1762 1763 1764 1765
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1766 1767
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, true));

D
dapan1121 已提交
1768 1769 1770 1771 1772
  return TSDB_CODE_SUCCESS;

_return:

  taosHashCleanup(pCtx->args);
D
dapan1121 已提交
1773 1774
  taosMemoryFreeClear(param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1775 1776 1777 1778

  SCH_RET(code);
}

D
dapan1121 已提交
1779
int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
L
Liu Jicong 已提交
1780
  int32_t     code = 0;
D
dapan1121 已提交
1781 1782 1783
  SSchHbTrans hb = {0};

  hb.trans.transInst = pJob->transport;
L
Liu Jicong 已提交
1784

D
dapan1121 已提交
1785 1786 1787 1788 1789
  SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx));

  code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans));
  if (code) {
    schFreeRpcCtx(&hb.rpcCtx);
L
Liu Jicong 已提交
1790

D
dapan1121 已提交
1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804
    if (HASH_NODE_EXIST(code)) {
      *exist = true;
      return TSDB_CODE_SUCCESS;
    }

    qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
    SCH_ERR_RET(code);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHeader **pDst) {
  if (pSrc->isHbParam) {
D
dapan1121 已提交
1805
    SSchHbCallbackParam *dst = taosMemoryMalloc(sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816
    if (NULL == dst) {
      qError("malloc SSchHbCallbackParam failed");
      SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    memcpy(dst, pSrc, sizeof(*dst));
    *pDst = (SSchCallbackParamHeader *)dst;

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1817
  SSchTaskCallbackParam *dst = taosMemoryMalloc(sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
1818 1819 1820 1821
  if (NULL == dst) {
    qError("malloc SSchTaskCallbackParam failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1822

D
dapan1121 已提交
1823 1824
  memcpy(dst, pSrc, sizeof(*dst));
  *pDst = (SSchCallbackParamHeader *)dst;
L
Liu Jicong 已提交
1825

D
dapan1121 已提交
1826 1827 1828
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1829 1830
int32_t schCloneSMsgSendInfo(void *src, void **dst) {
  SMsgSendInfo *pSrc = src;
L
Liu Jicong 已提交
1831
  int32_t       code = 0;
D
dapan1121 已提交
1832
  SMsgSendInfo *pDst = taosMemoryMalloc(sizeof(*pSrc));
D
dapan1121 已提交
1833
  if (NULL == pDst) {
D
dapan1121 已提交
1834 1835 1836 1837
    qError("malloc SMsgSendInfo for rpcCtx failed, len:%d", (int32_t)sizeof(*pSrc));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1838 1839
  memcpy(pDst, pSrc, sizeof(*pSrc));
  pDst->param = NULL;
D
dapan1121 已提交
1840

D
dapan1121 已提交
1841
  SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
D
dapan1121 已提交
1842

D
dapan1121 已提交
1843
  *dst = pDst;
D
dapan1121 已提交
1844

D
dapan1121 已提交
1845
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1846

D
dapan1121 已提交
1847 1848
_return:

D
dapan1121 已提交
1849
  taosMemoryFreeClear(pDst);
D
dapan1121 已提交
1850 1851 1852 1853 1854 1855 1856
  SCH_RET(code);
}

int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
  int32_t code = 0;
  memcpy(&pDst->brokenVal, &pSrc->brokenVal, sizeof(pSrc->brokenVal));
  pDst->brokenVal.val = NULL;
L
Liu Jicong 已提交
1857

D
dapan1121 已提交
1858
  SCH_ERR_RET(schCloneSMsgSendInfo(pSrc->brokenVal.val, &pDst->brokenVal.val));
D
dapan1121 已提交
1859 1860 1861 1862 1863 1864 1865 1866

  pDst->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pDst->args) {
    qError("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SRpcCtxVal dst = {0};
X
Xiaoyu Wang 已提交
1867
  void      *pIter = taosHashIterate(pSrc->args, NULL);
D
dapan1121 已提交
1868 1869
  while (pIter) {
    SRpcCtxVal *pVal = (SRpcCtxVal *)pIter;
X
Xiaoyu Wang 已提交
1870
    int32_t    *msgType = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
1871

D
dapan1121 已提交
1872 1873
    dst = *pVal;
    dst.val = NULL;
L
Liu Jicong 已提交
1874

D
dapan1121 已提交
1875
    SCH_ERR_JRET(schCloneSMsgSendInfo(pVal->val, &dst.val));
L
Liu Jicong 已提交
1876

D
dapan1121 已提交
1877
    if (taosHashPut(pDst->args, msgType, sizeof(*msgType), &dst, sizeof(dst))) {
D
dapan1121 已提交
1878
      qError("taosHashPut msg %d to rpcCtx failed", *msgType);
D
dapan1121 已提交
1879
      (*dst.freeFunc)(dst.val);
D
dapan1121 已提交
1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    pIter = taosHashIterate(pSrc->args, pIter);
  }

  return TSDB_CODE_SUCCESS;

_return:

  schFreeRpcCtx(pDst);
  SCH_RET(code);
}

L
Liu Jicong 已提交
1894 1895
int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet *epSet, int32_t msgType, void *msg,
                        uint32_t msgSize, bool persistHandle, SRpcCtx *ctx) {
D
dapan1121 已提交
1896
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
1897 1898 1899

  SSchTrans *trans = (SSchTrans *)transport;

D
dapan1121 已提交
1900 1901
  SMsgSendInfo *pMsgSendInfo = NULL;
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, msgType, &pMsgSendInfo));
D
dapan1121 已提交
1902 1903 1904

  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
L
Liu Jicong 已提交
1905
  pMsgSendInfo->msgInfo.handle = trans->transHandle;
D
dapan1121 已提交
1906
  pMsgSendInfo->msgType = msgType;
D
dapan1121 已提交
1907

L
Liu Jicong 已提交
1908 1909 1910 1911 1912
  qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "instance:%p, handle:%p", TMSG_INFO(msgType),
         ntohl(((SMsgHead *)msg)->vgId), epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, pJob->refId,
         trans->transInst, trans->transHandle);

  int64_t transporterId = 0;
D
dapan1121 已提交
1913
  code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
D
dapan1121 已提交
1914 1915 1916
  if (code) {
    SCH_ERR_JRET(code);
  }
D
dapan1121 已提交
1917

D
dapan1121 已提交
1918
  SCH_TASK_DLOG("req msg sent, refId:%" PRIx64 ", type:%d, %s", pJob->refId, msgType, TMSG_INFO(msgType));
D
dapan1121 已提交
1919 1920 1921
  return TSDB_CODE_SUCCESS;

_return:
L
Liu Jicong 已提交
1922

D
dapan1121 已提交
1923 1924 1925 1926
  if (pMsgSendInfo) {
    taosMemoryFreeClear(pMsgSendInfo->param);
    taosMemoryFreeClear(pMsgSendInfo);
  }
H
Hongze Cheng 已提交
1927

D
dapan1121 已提交
1928 1929 1930
  SCH_RET(code);
}

D
dapan1121 已提交
1931 1932
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
  SSchedulerHbReq req = {0};
L
Liu Jicong 已提交
1933 1934 1935 1936
  int32_t         code = 0;
  SRpcCtx         rpcCtx = {0};
  SSchTrans       trans = {0};
  int32_t         msgType = TDMT_VND_QUERY_HEARTBEAT;
D
dapan1121 已提交
1937

L
Liu Jicong 已提交
1938
  req.header.vgId = nodeEpId->nodeId;
D
dapan1121 已提交
1939 1940 1941 1942 1943
  req.sId = schMgmt.sId;
  memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));

  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
L
Liu Jicong 已提交
1944 1945
    qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
           nodeEpId->ep.port);
D
dapan1121 已提交
1946 1947 1948 1949 1950 1951 1952
    SCH_ERR_RET(code);
  }

  SCH_LOCK(SCH_WRITE, &hb->lock);
  code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx);
  memcpy(&trans, &hb->trans, sizeof(trans));
  SCH_UNLOCK(SCH_WRITE, &hb->lock);
L
Liu Jicong 已提交
1953

D
dapan1121 已提交
1954
  SCH_ERR_RET(code);
L
Liu Jicong 已提交
1955

D
dapan1121 已提交
1956 1957 1958 1959 1960
  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
  if (msgSize < 0) {
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
1961
  void *msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1962 1963 1964 1965
  if (NULL == msg) {
    qError("calloc hb req %d failed", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1966

D
dapan1121 已提交
1967 1968 1969 1970 1971
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1972
  SMsgSendInfo *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1973 1974 1975 1976 1977
  if (NULL == pMsgSendInfo) {
    qError("calloc SMsgSendInfo failed");
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1978
  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994
  if (NULL == param) {
    qError("calloc SSchTaskCallbackParam failed");
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));

  param->transport = trans.transInst;

  pMsgSendInfo->param = param;
  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
  pMsgSendInfo->msgInfo.handle = trans.transHandle;
  pMsgSendInfo->msgType = msgType;
  pMsgSendInfo->fp = fp;
L
Liu Jicong 已提交
1995 1996 1997

  int64_t transporterId = 0;
  SEpSet  epSet = {.inUse = 0, .numOfEps = 1};
D
dapan1121 已提交
1998
  memcpy(&epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
D
dapan1121 已提交
1999

L
Liu Jicong 已提交
2000 2001 2002
  qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle,
         nodeEpId->ep.fqdn, nodeEpId->ep.port);

D
dapan1121 已提交
2003 2004
  code = asyncSendMsgToServerExt(trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx);
  if (code) {
L
Liu Jicong 已提交
2005 2006
    qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s", trans.transInst,
           trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code));
D
dapan1121 已提交
2007 2008 2009
    SCH_ERR_JRET(code);
  }

D
dapan1121 已提交
2010
  qDebug("hb msg sent");
D
dapan1121 已提交
2011 2012 2013 2014
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
2015 2016 2017
  taosMemoryFreeClear(msg);
  taosMemoryFreeClear(param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
2018 2019 2020 2021
  schFreeRpcCtx(&rpcCtx);
  SCH_RET(code);
}

D
dapan1121 已提交
2022
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
D
dapan1121 已提交
2023
  uint32_t msgSize = 0;
X
Xiaoyu Wang 已提交
2024
  void    *msg = NULL;
L
Liu Jicong 已提交
2025 2026 2027 2028 2029
  int32_t  code = 0;
  bool     isCandidateAddr = false;
  bool     persistHandle = false;
  SRpcCtx  rpcCtx = {0};

D
dapan1121 已提交
2030
  if (NULL == addr) {
D
dapan1121 已提交
2031
    addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
D
dapan1121 已提交
2032 2033 2034
    isCandidateAddr = true;
  }

L
Liu Jicong 已提交
2035
  SEpSet epSet = addr->epSet;
H
Haojun Liao 已提交
2036

D
dapan1121 已提交
2037
  switch (msgType) {
H
Haojun Liao 已提交
2038
    case TDMT_VND_CREATE_TABLE:
X
Xiaoyu Wang 已提交
2039
    case TDMT_VND_DROP_TABLE:
X
Xiaoyu Wang 已提交
2040
    case TDMT_VND_ALTER_TABLE:
D
dapan1121 已提交
2041
    case TDMT_VND_SUBMIT: {
D
dapan1121 已提交
2042
      msgSize = pTask->msgLen;
wafwerar's avatar
wafwerar 已提交
2043
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2044 2045 2046 2047 2048 2049
      if (NULL == msg) {
        SCH_TASK_ELOG("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      memcpy(msg, pTask->msg, msgSize);
D
dapan1121 已提交
2050 2051
      break;
    }
2052

D
dapan1121 已提交
2053
    case TDMT_VND_QUERY: {
D
dapan1121 已提交
2054
      SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
L
Liu Jicong 已提交
2055

2056 2057
      uint32_t len = strlen(pJob->sql);
      msgSize = sizeof(SSubQueryMsg) + pTask->msgLen + len;
wafwerar's avatar
wafwerar 已提交
2058
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2059
      if (NULL == msg) {
D
dapan 已提交
2060
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2061 2062 2063 2064
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      SSubQueryMsg *pMsg = msg;
D
dapan 已提交
2065
      pMsg->header.vgId = htonl(addr->nodeId);
L
Liu Jicong 已提交
2066 2067 2068 2069 2070
      pMsg->sId = htobe64(schMgmt.sId);
      pMsg->queryId = htobe64(pJob->queryId);
      pMsg->taskId = htobe64(pTask->taskId);
      pMsg->refId = htobe64(pJob->refId);
      pMsg->taskType = TASK_TYPE_TEMP;
D
dapan1121 已提交
2071
      pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob);
L
Liu Jicong 已提交
2072 2073
      pMsg->phyLen = htonl(pTask->msgLen);
      pMsg->sqlLen = htonl(len);
2074 2075 2076

      memcpy(pMsg->msg, pJob->sql, len);
      memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
D
dapan1121 已提交
2077 2078

      persistHandle = true;
D
dapan1121 已提交
2079
      break;
2080 2081
    }

D
dapan1121 已提交
2082
    case TDMT_VND_RES_READY: {
S
Shengliang Guan 已提交
2083
      msgSize = sizeof(SResReadyReq);
wafwerar's avatar
wafwerar 已提交
2084
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2085
      if (NULL == msg) {
D
dapan 已提交
2086
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2087 2088 2089
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

S
Shengliang Guan 已提交
2090
      SResReadyReq *pMsg = msg;
L
Liu Jicong 已提交
2091 2092 2093 2094

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
2095
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
2096
      pMsg->taskId = htobe64(pTask->taskId);
D
dapan1121 已提交
2097 2098 2099
      break;
    }
    case TDMT_VND_FETCH: {
S
Shengliang Guan 已提交
2100
      msgSize = sizeof(SResFetchReq);
wafwerar's avatar
wafwerar 已提交
2101
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2102
      if (NULL == msg) {
D
dapan 已提交
2103
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2104 2105
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
2106

S
Shengliang Guan 已提交
2107
      SResFetchReq *pMsg = msg;
L
Liu Jicong 已提交
2108 2109 2110 2111

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
2112
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
2113
      pMsg->taskId = htobe64(pTask->taskId);
D
dapan1121 已提交
2114

D
dapan1121 已提交
2115 2116
      break;
    }
L
Liu Jicong 已提交
2117
    case TDMT_VND_DROP_TASK: {
S
Shengliang Guan 已提交
2118
      msgSize = sizeof(STaskDropReq);
wafwerar's avatar
wafwerar 已提交
2119
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2120
      if (NULL == msg) {
D
dapan 已提交
2121
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2122 2123
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
2124

S
Shengliang Guan 已提交
2125
      STaskDropReq *pMsg = msg;
L
Liu Jicong 已提交
2126 2127 2128 2129

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
2130
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
2131 2132
      pMsg->taskId = htobe64(pTask->taskId);
      pMsg->refId = htobe64(pJob->refId);
D
dapan1121 已提交
2133 2134 2135
      break;
    }
    case TDMT_VND_QUERY_HEARTBEAT: {
D
dapan1121 已提交
2136
      SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &rpcCtx));
L
Liu Jicong 已提交
2137

D
dapan1121 已提交
2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148
      SSchedulerHbReq req = {0};
      req.sId = schMgmt.sId;
      req.header.vgId = addr->nodeId;
      req.epId.nodeId = addr->nodeId;
      memcpy(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));

      msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
      if (msgSize < 0) {
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
wafwerar's avatar
wafwerar 已提交
2149
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2150 2151 2152 2153 2154 2155 2156 2157
      if (NULL == msg) {
        SCH_JOB_ELOG("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
      if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
2158 2159

      persistHandle = true;
D
dapan1121 已提交
2160 2161 2162
      break;
    }
    default:
D
dapan1121 已提交
2163
      SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType);
D
dapan1121 已提交
2164 2165 2166 2167
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      break;
  }

D
dapan1121 已提交
2168
  SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType);
D
dapan1121 已提交
2169

D
dapan1121 已提交
2170
  SSchTrans trans = {.transInst = pJob->transport, .transHandle = SCH_GET_TASK_HANDLE(pTask)};
L
Liu Jicong 已提交
2171 2172
  SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle,
                               (rpcCtx.args ? &rpcCtx : NULL)));
D
dapan1121 已提交
2173

D
dapan1121 已提交
2174 2175
  if (msgType == TDMT_VND_QUERY) {
    SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.transHandle));
D
dapan1121 已提交
2176
  }
L
Liu Jicong 已提交
2177

D
dapan1121 已提交
2178 2179 2180 2181
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
2182
  SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan1121 已提交
2183
  schFreeRpcCtx(&rpcCtx);
L
Liu Jicong 已提交
2184

wafwerar's avatar
wafwerar 已提交
2185
  taosMemoryFreeClear(msg);
D
dapan1121 已提交
2186 2187
  SCH_RET(code);
}
D
dapan1121 已提交
2188

D
dapan1121 已提交
2189 2190
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
L
Liu Jicong 已提交
2191
  SQueryNodeEpId  epId = {0};
D
dapan1121 已提交
2192 2193 2194

  epId.nodeId = addr->nodeId;
  memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
L
Liu Jicong 已提交
2195

D
dapan1121 已提交
2196
#if 1
D
dapan1121 已提交
2197 2198
  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
D
dapan1121 已提交
2199
    bool exist = false;
D
dapan1121 已提交
2200
    SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist));
D
dapan1121 已提交
2201
    if (!exist) {
D
dapan1121 已提交
2202
      SCH_ERR_RET(schBuildAndSendHbMsg(&epId));
D
dapan1121 已提交
2203
    }
D
dapan1121 已提交
2204
  }
D
dapan1121 已提交
2205
#endif
D
dapan1121 已提交
2206 2207 2208

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
2209

D
dapan1121 已提交
2210
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
2211
  int8_t  status = 0;
D
dapan1121 已提交
2212
  int32_t code = 0;
D
dapan1121 已提交
2213 2214

  atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
L
Liu Jicong 已提交
2215

D
dapan1121 已提交
2216
  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
2217
    SCH_TASK_DLOG("no need to launch task cause of job status, job status:%s", jobTaskStatusStr(status));
L
Liu Jicong 已提交
2218

D
dapan1121 已提交
2219
    SCH_RET(atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
2220
  }
D
dapan1121 已提交
2221 2222 2223 2224 2225 2226

  // NOTE: race condition: the task should be put into the hash table before send msg to server
  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) {
    SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING);
  }
L
Liu Jicong 已提交
2227

D
dapan1121 已提交
2228
  SSubplan *plan = pTask->plan;
D
dapan1121 已提交
2229

L
Liu Jicong 已提交
2230
  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
D
dapan1121 已提交
2231
    code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen);
D
dapan1121 已提交
2232
    if (TSDB_CODE_SUCCESS != code) {
L
Liu Jicong 已提交
2233 2234
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
                    pTask->msgLen);
D
dapan1121 已提交
2235
      SCH_ERR_RET(code);
2236
    } else {
D
dapan1121 已提交
2237
      SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg);
D
dapan1121 已提交
2238
    }
D
dapan1121 已提交
2239
  }
L
Liu Jicong 已提交
2240

D
dapan1121 已提交
2241
  SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
D
dapan1121 已提交
2242

D
dapan1121 已提交
2243 2244 2245
  if (SCH_IS_QUERY_JOB(pJob)) {
    SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
  }
L
Liu Jicong 已提交
2246

D
dapan1121 已提交
2247
  SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
L
Liu Jicong 已提交
2248

D
dapan1121 已提交
2249 2250 2251 2252 2253
  return TSDB_CODE_SUCCESS;
}

// Note: no more error processing, handled in function internal
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
2254
  bool    enough = false;
D
dapan1121 已提交
2255 2256
  int32_t code = 0;

D
dapan1121 已提交
2257 2258
  SCH_SET_TASK_HANDLE(pTask, NULL);

D
dapan1121 已提交
2259 2260 2261 2262 2263 2264 2265 2266 2267 2268
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));

    if (enough) {
      SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
    }
  } else {
    SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
  }

D
dapan1121 已提交
2269
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2270 2271

_return:
D
dapan1121 已提交
2272

D
dapan1121 已提交
2273
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
2274
}
D
dapan1121 已提交
2275

D
dapan1121 已提交
2276
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
D
dapan 已提交
2277
  for (int32_t i = 0; i < level->taskNum; ++i) {
D
dapan1121 已提交
2278
    SSchTask *pTask = taosArrayGet(level->subTasks, i);
D
dapan1121 已提交
2279

D
dapan1121 已提交
2280
    SCH_ERR_RET(schLaunchTask(pJob, pTask));
D
dapan1121 已提交
2281
  }
D
dapan1121 已提交
2282 2283 2284 2285 2286 2287

  return TSDB_CODE_SUCCESS;
}

int32_t schLaunchJob(SSchJob *pJob) {
  SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
L
Liu Jicong 已提交
2288

D
dapan1121 已提交
2289 2290 2291 2292 2293 2294
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_EXECUTING));

  SCH_ERR_RET(schCheckJobNeedFlowCtrl(pJob, level));

  SCH_ERR_RET(schLaunchLevelTasks(pJob, level));

D
dapan1121 已提交
2295
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2296 2297
}

D
dapan1121 已提交
2298
void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
2299
  if (NULL == pTask->execNodes) {
D
dapan1121 已提交
2300
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
2301 2302
    return;
  }
H
Haojun Liao 已提交
2303

D
dapan1121 已提交
2304
  int32_t size = (int32_t)taosArrayGetSize(pTask->execNodes);
L
Liu Jicong 已提交
2305

D
dapan1121 已提交
2306
  if (size <= 0) {
D
dapan1121 已提交
2307
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
2308 2309
    return;
  }
H
Haojun Liao 已提交
2310

D
dapan1121 已提交
2311
  SSchNodeInfo *nodeInfo = NULL;
D
dapan1121 已提交
2312
  for (int32_t i = 0; i < size; ++i) {
D
dapan1121 已提交
2313 2314
    nodeInfo = (SSchNodeInfo *)taosArrayGet(pTask->execNodes, i);
    SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
D
dapan1121 已提交
2315

D
dapan1121 已提交
2316
    schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK);
D
dapan1121 已提交
2317
  }
D
dapan1121 已提交
2318 2319

  SCH_TASK_DLOG("task has %d exec address", size);
D
dapan1121 已提交
2320 2321 2322
}

void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
D
dapan1121 已提交
2323 2324 2325 2326
  if (!SCH_IS_NEED_DROP_JOB(pJob)) {
    return;
  }

D
dapan1121 已提交
2327
  void *pIter = taosHashIterate(list, NULL);
D
dapan1121 已提交
2328
  while (pIter) {
D
dapan1121 已提交
2329
    SSchTask *pTask = *(SSchTask **)pIter;
H
Haojun Liao 已提交
2330

D
dapan1121 已提交
2331
    schDropTaskOnExecutedNode(pJob, pTask);
L
Liu Jicong 已提交
2332

D
dapan1121 已提交
2333
    pIter = taosHashIterate(list, pIter);
L
Liu Jicong 已提交
2334
  }
D
dapan1121 已提交
2335
}
H
Haojun Liao 已提交
2336

D
dapan1121 已提交
2337 2338 2339 2340
void schDropJobAllTasks(SSchJob *pJob) {
  schDropTaskInHashList(pJob, pJob->execTasks);
  schDropTaskInHashList(pJob, pJob->succTasks);
  schDropTaskInHashList(pJob, pJob->failTasks);
D
dapan1121 已提交
2341
}
2342

D
dapan1121 已提交
2343
int32_t schCancelJob(SSchJob *pJob) {
L
Liu Jicong 已提交
2344
  // TODO
2345
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
2346
  // TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
D
dapan1121 已提交
2347 2348
}

D
dapan1121 已提交
2349
void schCloseJobRef(void) {
2350
  if (!atomic_load_8((int8_t *)&schMgmt.exit)) {
D
dapan1121 已提交
2351 2352
    return;
  }
2353

D
dapan1121 已提交
2354 2355 2356 2357 2358 2359 2360 2361
  SCH_LOCK(SCH_WRITE, &schMgmt.lock);
  if (atomic_load_32(&schMgmt.jobNum) <= 0 && schMgmt.jobRef >= 0) {
    taosCloseRef(schMgmt.jobRef);
    schMgmt.jobRef = -1;
  }
  SCH_UNLOCK(SCH_WRITE, &schMgmt.lock);
}

D
dapan1121 已提交
2362 2363 2364 2365 2366 2367 2368
void schFreeJobImpl(void *job) {
  if (NULL == job) {
    return;
  }

  SSchJob *pJob = job;
  uint64_t queryId = pJob->queryId;
L
Liu Jicong 已提交
2369
  int64_t  refId = pJob->refId;
D
dapan1121 已提交
2370 2371 2372 2373 2374 2375 2376

  if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
    schCancelJob(pJob);
  }

  schDropJobAllTasks(pJob);

L
Liu Jicong 已提交
2377 2378
  pJob->subPlans = NULL;  // it is a reference to pDag->pSubplans

D
dapan1121 已提交
2379
  int32_t numOfLevels = taosArrayGetSize(pJob->levels);
L
Liu Jicong 已提交
2380
  for (int32_t i = 0; i < numOfLevels; ++i) {
D
dapan1121 已提交
2381 2382
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);

D
dapan1121 已提交
2383
    schFreeFlowCtrl(pLevel);
L
Liu Jicong 已提交
2384

D
dapan1121 已提交
2385
    int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
L
Liu Jicong 已提交
2386 2387
    for (int32_t j = 0; j < numOfTasks; ++j) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
D
dapan1121 已提交
2388 2389 2390 2391 2392
      schFreeTask(pTask);
    }

    taosArrayDestroy(pLevel->subTasks);
  }
L
Liu Jicong 已提交
2393

D
dapan1121 已提交
2394 2395 2396
  taosHashCleanup(pJob->execTasks);
  taosHashCleanup(pJob->failTasks);
  taosHashCleanup(pJob->succTasks);
L
Liu Jicong 已提交
2397

D
dapan1121 已提交
2398 2399
  taosArrayDestroy(pJob->levels);
  taosArrayDestroy(pJob->nodeList);
L
Liu Jicong 已提交
2400

D
dapan1121 已提交
2401 2402
  qExplainFreeCtx(pJob->explainCtx);

wafwerar's avatar
wafwerar 已提交
2403 2404
  taosMemoryFreeClear(pJob->resData);
  taosMemoryFreeClear(pJob);
D
dapan1121 已提交
2405

L
Liu Jicong 已提交
2406
  qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob);
D
dapan1121 已提交
2407 2408 2409 2410

  atomic_sub_fetch_32(&schMgmt.jobNum, 1);

  schCloseJobRef();
D
dapan1121 已提交
2411 2412
}

L
Liu Jicong 已提交
2413
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
D
dapan 已提交
2414
                              int64_t startTs, bool needRes, bool syncSchedule) {
L
Liu Jicong 已提交
2415
  qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
H
Haojun Liao 已提交
2416

D
dapan1121 已提交
2417
  if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) {
L
Liu Jicong 已提交
2418
    qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pDag->queryId);
D
dapan1121 已提交
2419 2420
  }

L
Liu Jicong 已提交
2421
  int32_t  code = 0;
D
dapan1121 已提交
2422
  SSchJob *pJob = NULL;
D
dapan 已提交
2423
  SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, needRes, syncSchedule));
D
dapan1121 已提交
2424

D
dapan1121 已提交
2425
  SCH_ERR_JRET(schLaunchJob(pJob));
2426

D
dapan1121 已提交
2427
  *job = pJob->refId;
L
Liu Jicong 已提交
2428

D
dapan 已提交
2429
  if (syncSchedule) {
D
dapan1121 已提交
2430
    SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2431
    tsem_wait(&pJob->rspSem);
D
dapan1121 已提交
2432 2433
  }

D
dapan1121 已提交
2434
  SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2435

D
dapan1121 已提交
2436
  schReleaseJob(pJob->refId);
L
Liu Jicong 已提交
2437

D
dapan1121 已提交
2438
  return TSDB_CODE_SUCCESS;
2439

D
dapan1121 已提交
2440
_return:
D
dapan1121 已提交
2441

D
dapan1121 已提交
2442
  schFreeJobImpl(pJob);
D
dapan1121 已提交
2443
  SCH_RET(code);
2444
}
D
dapan1121 已提交
2445

D
dapan1121 已提交
2446
int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
H
Hongze Cheng 已提交
2447
                             bool syncSchedule) {
D
dapan1121 已提交
2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458
  qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);

  int32_t  code = 0;
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
    qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->sql = sql;
  pJob->attr.queryJob = true;
D
dapan1121 已提交
2459
  pJob->attr.explainMode = pDag->explainInfo.mode;
D
dapan1121 已提交
2460 2461
  pJob->queryId = pDag->queryId;
  pJob->subPlans = pDag->pSubplans;
D
dapan1121 已提交
2462

D
dapan1121 已提交
2463
  SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData));
D
dapan1121 已提交
2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493

  int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
  if (refId < 0) {
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
    SCH_ERR_JRET(terrno);
  }

  if (NULL == schAcquireJob(refId)) {
    SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
    SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  pJob->refId = refId;

  SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);

  pJob->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
  *job = pJob->refId;
  SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));

  schReleaseJob(pJob->refId);

  return TSDB_CODE_SUCCESS;

_return:

  schFreeJobImpl(pJob);
  SCH_RET(code);
}

D
dapan1121 已提交
2494
int32_t schedulerInit(SSchedulerCfg *cfg) {
D
dapan1121 已提交
2495
  if (schMgmt.jobRef >= 0) {
D
dapan1121 已提交
2496 2497 2498 2499 2500 2501
    qError("scheduler already initialized");
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  if (cfg) {
    schMgmt.cfg = *cfg;
L
Liu Jicong 已提交
2502

D
dapan1121 已提交
2503
    if (schMgmt.cfg.maxJobNum == 0) {
D
dapan1121 已提交
2504
      schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
D
dapan1121 已提交
2505
    }
D
dapan1121 已提交
2506 2507 2508
    if (schMgmt.cfg.maxNodeTableNum <= 0) {
      schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
    }
D
dapan1121 已提交
2509
  } else {
D
dapan1121 已提交
2510 2511
    schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
    schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
D
dapan1121 已提交
2512
  }
L
Liu Jicong 已提交
2513

D
dapan1121 已提交
2514 2515
  schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl);
  if (schMgmt.jobRef < 0) {
D
dapan1121 已提交
2516 2517 2518 2519 2520 2521 2522
    qError("init schduler jobRef failed, num:%u", schMgmt.cfg.maxJobNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  schMgmt.hbConnections = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
  if (NULL == schMgmt.hbConnections) {
    qError("taosHashInit hb connections failed");
D
dapan1121 已提交
2523 2524 2525
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
2526
  if (taosGetSystemUUID((char *)&schMgmt.sId, sizeof(schMgmt.sId))) {
D
dapan1121 已提交
2527 2528 2529 2530
    qError("generate schdulerId failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR);
  }

L
Liu Jicong 已提交
2531 2532
  qInfo("scheduler %" PRIx64 " initizlized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum);

D
dapan1121 已提交
2533 2534 2535
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2536
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
D
dapan 已提交
2537
                         int64_t startTs, bool needRes, SQueryResult *pRes) {
H
Haojun Liao 已提交
2538
  if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
D
dapan1121 已提交
2539 2540 2541
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
2542
  if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
D
dapan1121 已提交
2543
    SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true));
D
dapan1121 已提交
2544
  } else {
D
dapan 已提交
2545
    SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, needRes, true));
D
dapan1121 已提交
2546
  }
D
dapan1121 已提交
2547

D
dapan1121 已提交
2548
  SSchJob *job = schAcquireJob(*pJob);
D
dapan1121 已提交
2549

D
dapan1121 已提交
2550 2551
  pRes->code = atomic_load_32(&job->errCode);
  pRes->numOfRows = job->resNumOfRows;
D
dapan 已提交
2552 2553 2554 2555
  if (needRes) {
    pRes->res = job->resData;
    job->resData = NULL;
  }
L
Liu Jicong 已提交
2556

D
dapan1121 已提交
2557
  schReleaseJob(*pJob);
L
Liu Jicong 已提交
2558

D
dapan1121 已提交
2559 2560 2561
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2562
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, const char *sql, int64_t *pJob) {
2563
  if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
D
dapan1121 已提交
2564 2565 2566
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
2567 2568 2569
  if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
    SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false));
  } else {
D
dapan 已提交
2570
    SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false, false));
D
dapan1121 已提交
2571
  }
L
Liu Jicong 已提交
2572

D
dapan1121 已提交
2573
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2574 2575
}

L
Liu Jicong 已提交
2576
#if 0
X
Xiaoyu Wang 已提交
2577
int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks) {
X
Xiaoyu Wang 已提交
2578
  if (NULL == pDag || pDag->numOfSubplans <= 0 || LIST_LENGTH(pDag->pSubplans) == 0) {
D
dapan1121 已提交
2579 2580 2581
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

X
Xiaoyu Wang 已提交
2582
  int32_t levelNum = LIST_LENGTH(pDag->pSubplans);
D
dapan1121 已提交
2583 2584 2585 2586 2587
  if (1 != levelNum) {
    qError("invalid level num: %d", levelNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

X
Xiaoyu Wang 已提交
2588 2589
  SNodeListNode *plans = (SNodeListNode*)nodesListGetNode(pDag->pSubplans, 0);
  int32_t taskNum = LIST_LENGTH(plans->pNodeList);
D
dapan1121 已提交
2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606
  if (taskNum <= 0) {
    qError("invalid task num: %d", taskNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SArray *info = taosArrayInit(taskNum, sizeof(STaskInfo));
  if (NULL == info) {
    qError("taosArrayInit %d taskInfo failed", taskNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  STaskInfo tInfo = {0};
  char *msg = NULL;
  int32_t msgLen = 0;
  int32_t code = 0;
  
  for (int32_t i = 0; i < taskNum; ++i) {
X
Xiaoyu Wang 已提交
2607
    SSubplan *plan = (SSubplan*)nodesListGetNode(plans->pNodeList, i);
D
dapan1121 已提交
2608 2609 2610
    tInfo.addr = plan->execNode;

    code = qSubPlanToString(plan, &msg, &msgLen);
D
dapan1121 已提交
2611
    if (TSDB_CODE_SUCCESS != code) {
D
dapan1121 已提交
2612 2613 2614 2615 2616 2617 2618 2619 2620 2621
      qError("subplanToString error, code:%x, msg:%p, len:%d", code, msg, msgLen);
      SCH_ERR_JRET(code);
    }

    int32_t msgSize = sizeof(SSubQueryMsg) + msgLen;
    if (NULL == msg) {
      qError("calloc %d failed", msgSize);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
    
wafwerar's avatar
wafwerar 已提交
2622
    SSubQueryMsg* pMsg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2623
    
L
Liu Jicong 已提交
2624
    pMsg->header.vgId = tInfo.addr.nodeId;
D
dapan1121 已提交
2625
    
2626 2627 2628
    pMsg->sId      = schMgmt.sId;
    pMsg->queryId  = plan->id.queryId;
    pMsg->taskId   = schGenUUID();
D
dapan1121 已提交
2629
    pMsg->taskType = TASK_TYPE_PERSISTENT;
2630 2631
    pMsg->phyLen   = msgLen;
    pMsg->sqlLen   = 0;
L
Liu Jicong 已提交
2632
    memcpy(pMsg->msg, msg, msgLen);
L
fix tq  
Liu Jicong 已提交
2633
    /*memcpy(pMsg->msg, ((SSubQueryMsg*)msg)->msg, msgLen);*/
D
dapan1121 已提交
2634 2635 2636 2637 2638

    tInfo.msg = pMsg;

    if (NULL == taosArrayPush(info, &tInfo)) {
      qError("taosArrayPush failed, idx:%d", i);
wafwerar's avatar
wafwerar 已提交
2639
      taosMemoryFree(msg);
D
dapan1121 已提交
2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

  *pTasks = info;
  info = NULL;
  
_return:
  schedulerFreeTaskList(info);
  SCH_RET(code);
}

D
dapan1121 已提交
2652 2653 2654 2655 2656 2657
int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) {
  if (NULL == src || NULL == dst || copyNum <= 0) {
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
H
Haojun Liao 已提交
2658

D
dapan1121 已提交
2659 2660 2661 2662 2663 2664
  *dst = taosArrayInit(copyNum, sizeof(STaskInfo));
  if (NULL == *dst) {
    qError("taosArrayInit %d taskInfo failed", copyNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

2665
  int32_t msgSize = src->msg->phyLen + sizeof(*src->msg);
D
dapan1121 已提交
2666 2667 2668
  STaskInfo info = {0};

  info.addr = src->addr;
H
Haojun Liao 已提交
2669

D
dapan1121 已提交
2670
  for (int32_t i = 0; i < copyNum; ++i) {
wafwerar's avatar
wafwerar 已提交
2671
    info.msg = taosMemoryMalloc(msgSize);
D
dapan1121 已提交
2672 2673 2674 2675 2676 2677 2678 2679
    if (NULL == info.msg) {
      qError("malloc %d failed", msgSize);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    memcpy(info.msg, src->msg, msgSize);

    info.msg->taskId = schGenUUID();
H
Haojun Liao 已提交
2680

D
dapan1121 已提交
2681 2682
    if (NULL == taosArrayPush(*dst, &info)) {
      qError("taosArrayPush failed, idx:%d", i);
wafwerar's avatar
wafwerar 已提交
2683
      taosMemoryFree(info.msg);
D
dapan1121 已提交
2684 2685 2686 2687 2688 2689 2690 2691 2692 2693
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

  return TSDB_CODE_SUCCESS;

_return:

  schedulerFreeTaskList(*dst);
  *dst = NULL;
H
Haojun Liao 已提交
2694

D
dapan1121 已提交
2695 2696
  SCH_RET(code);
}
L
Liu Jicong 已提交
2697
#endif
D
dapan1121 已提交
2698

L
Liu Jicong 已提交
2699
int32_t schedulerFetchRows(int64_t job, void **pData) {
D
dapan1121 已提交
2700
  if (NULL == pData) {
D
dapan1121 已提交
2701
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan 已提交
2702 2703
  }

L
Liu Jicong 已提交
2704
  int32_t  code = 0;
D
dapan1121 已提交
2705
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2706 2707 2708 2709
  if (NULL == pJob) {
    qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
D
dapan1121 已提交
2710

D
dapan1121 已提交
2711 2712
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (status == JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
2713
    SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2714
    schReleaseJob(job);
D
dapan1121 已提交
2715
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
2716 2717
  }

D
dapan1121 已提交
2718
  if (!SCH_JOB_NEED_FETCH(pJob)) {
D
dapan1121 已提交
2719
    SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2720
    schReleaseJob(job);
D
dapan1121 已提交
2721
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
2722 2723
  }

D
dapan1121 已提交
2724 2725
  if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) {
    SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch));
D
dapan1121 已提交
2726
    schReleaseJob(job);
D
dapan1121 已提交
2727
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan 已提交
2728 2729
  }

D
dapan1121 已提交
2730
  if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
D
dapan1121 已提交
2731
    SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2732 2733
    SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
  } else if (status == JOB_TASK_STATUS_SUCCEED) {
D
dapan1121 已提交
2734
    SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2735 2736
    goto _return;
  } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
D
dapan1121 已提交
2737
    if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
D
dapan1121 已提交
2738 2739
      SCH_ERR_JRET(schFetchFromRemote(pJob));
      tsem_wait(&pJob->rspSem);
H
Hongze Cheng 已提交
2740
    }
D
dapan1121 已提交
2741 2742 2743
  } else {
    SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
    SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan 已提交
2744 2745
  }

D
dapan1121 已提交
2746
  status = SCH_GET_JOB_STATUS(pJob);
D
dapan 已提交
2747

D
dapan1121 已提交
2748
  if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
D
dapan1121 已提交
2749
    SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2750
    SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
D
dapan 已提交
2751
  }
L
Liu Jicong 已提交
2752

D
dapan1121 已提交
2753
  if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) {
D
dapan1121 已提交
2754
    SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED));
D
dapan 已提交
2755 2756
  }

D
dapan1121 已提交
2757
  while (true) {
D
dapan1121 已提交
2758 2759
    *pData = atomic_load_ptr(&pJob->resData);
    if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) {
D
dapan1121 已提交
2760 2761 2762 2763 2764
      continue;
    }

    break;
  }
D
dapan 已提交
2765

D
dapan1121 已提交
2766
  if (NULL == *pData) {
wafwerar's avatar
wafwerar 已提交
2767
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
D
dapan1121 已提交
2768 2769 2770 2771 2772
    if (rsp) {
      rsp->completed = 1;
    }

    *pData = rsp;
D
dapan1121 已提交
2773
    SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
D
dapan1121 已提交
2774
  }
D
dapan1121 已提交
2775

2776
  SCH_JOB_DLOG("fetch done, totalRows:%d, code:%s", pJob->resNumOfRows, tstrerror(code));
D
dapan1121 已提交
2777 2778 2779 2780

_return:

  atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
L
Liu Jicong 已提交
2781

D
dapan1121 已提交
2782
  schReleaseJob(job);
D
dapan 已提交
2783

D
dapan1121 已提交
2784
  SCH_RET(code);
D
dapan 已提交
2785
}
D
dapan1121 已提交
2786

D
dapan1121 已提交
2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
  int32_t  code = 0;
  SSchJob *pJob = schAcquireJob(job);
  if (NULL == pJob) {
    qDebug("acquire job from jobRef list failed, may not started or dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  if (pJob->status < JOB_TASK_STATUS_NOT_START || pJob->levelNum <= 0 || NULL == pJob->levels) {
    qDebug("job not initialized or not executable job, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  for (int32_t i = pJob->levelNum - 1; i >= 0; --i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
H
Hongze Cheng 已提交
2802

D
dapan1121 已提交
2803
    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
X
Xiaoyu Wang 已提交
2804
      SSchTask     *pTask = taosArrayGet(pLevel->subTasks, m);
D
dapan1121 已提交
2805
      SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status};
H
Hongze Cheng 已提交
2806

D
dapan1121 已提交
2807 2808 2809 2810 2811 2812 2813
      taosArrayPush(pSub, &subDesc);
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2814
int32_t scheduleCancelJob(int64_t job) {
D
dapan1121 已提交
2815
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2816 2817 2818 2819
  if (NULL == pJob) {
    qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
D
dapan1121 已提交
2820

D
dapan1121 已提交
2821 2822
  int32_t code = schCancelJob(pJob);

D
dapan1121 已提交
2823
  schReleaseJob(job);
D
dapan1121 已提交
2824 2825

  SCH_RET(code);
D
dapan1121 已提交
2826 2827
}

D
dapan1121 已提交
2828
void schedulerFreeJob(int64_t job) {
D
dapan1121 已提交
2829
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2830
  if (NULL == pJob) {
D
dapan1121 已提交
2831
    qDebug("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
D
dapan 已提交
2832 2833
    return;
  }
D
dapan1121 已提交
2834

D
dapan1121 已提交
2835 2836
  if (atomic_load_8(&pJob->userFetch) > 0) {
    schProcessOnJobDropped(pJob, TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
2837
  }
D
dapan1121 已提交
2838

D
dapan1121 已提交
2839
  SCH_JOB_DLOG("start to remove job from jobRef list, refId:%" PRIx64, job);
2840

D
dapan1121 已提交
2841 2842
  if (taosRemoveRef(schMgmt.jobRef, job)) {
    SCH_JOB_ELOG("remove job from job list failed, refId:%" PRIx64, job);
2843
  }
D
dapan1121 已提交
2844 2845

  schReleaseJob(job);
D
dapan1121 已提交
2846
}
D
dapan1121 已提交
2847 2848 2849 2850 2851 2852 2853 2854 2855

void schedulerFreeTaskList(SArray *taskList) {
  if (NULL == taskList) {
    return;
  }

  int32_t taskNum = taosArrayGetSize(taskList);
  for (int32_t i = 0; i < taskNum; ++i) {
    STaskInfo *info = taosArrayGet(taskList, i);
wafwerar's avatar
wafwerar 已提交
2856
    taosMemoryFreeClear(info->msg);
D
dapan1121 已提交
2857 2858 2859 2860
  }

  taosArrayDestroy(taskList);
}
L
Liu Jicong 已提交
2861

D
dapan1121 已提交
2862
void schedulerDestroy(void) {
2863 2864
  atomic_store_8((int8_t *)&schMgmt.exit, 1);

D
dapan1121 已提交
2865
  if (schMgmt.jobRef >= 0) {
D
dapan1121 已提交
2866
    SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0);
H
Hongze Cheng 已提交
2867
    int64_t  refId = 0;
C
Cary Xu 已提交
2868

D
dapan1121 已提交
2869
    while (pJob) {
D
dapan1121 已提交
2870
      refId = pJob->refId;
C
Cary Xu 已提交
2871 2872 2873
      if (refId == 0) {
        break;
      }
D
dapan1121 已提交
2874
      taosRemoveRef(schMgmt.jobRef, pJob->refId);
L
Liu Jicong 已提交
2875

D
dapan1121 已提交
2876
      pJob = taosIterateRef(schMgmt.jobRef, refId);
D
dapan1121 已提交
2877
    }
D
dapan1121 已提交
2878
  }
D
dapan1121 已提交
2879 2880

  if (schMgmt.hbConnections) {
H
Hongze Cheng 已提交
2881
    void *pIter = taosHashIterate(schMgmt.hbConnections, NULL);
D
dapan1121 已提交
2882 2883 2884 2885
    while (pIter != NULL) {
      SSchHbTrans *hb = pIter;
      schFreeRpcCtx(&hb->rpcCtx);
      pIter = taosHashIterate(schMgmt.hbConnections, pIter);
H
Hongze Cheng 已提交
2886
    }
D
dapan1121 已提交
2887 2888 2889
    taosHashCleanup(schMgmt.hbConnections);
    schMgmt.hbConnections = NULL;
  }
D
dapan1121 已提交
2890
}