scheduler.c 87.7 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15
 */

L
Liu Jicong 已提交
16
#include "catalog.h"
H
Hongze Cheng 已提交
17
#include "command.h"
L
Liu Jicong 已提交
18
#include "query.h"
D
dapan1121 已提交
19
#include "schedulerInt.h"
H
Hongze Cheng 已提交
20
#include "tmsg.h"
D
dapan1121 已提交
21
#include "tref.h"
D
dapan1121 已提交
22
#include "trpc.h"
23

D
dapan1121 已提交
24
SSchedulerMgmt schMgmt = {
25
    .jobRef = -1,
D
dapan1121 已提交
26
};
D
dapan1121 已提交
27

L
Liu Jicong 已提交
28
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); }
D
dapan1121 已提交
29

L
Liu Jicong 已提交
30
FORCE_INLINE int32_t schReleaseJob(int64_t refId) { return taosReleaseRef(schMgmt.jobRef, refId); }
D
dapan1121 已提交
31

L
Liu Jicong 已提交
32
uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }
D
dapan1121 已提交
33

L
Liu Jicong 已提交
34
#if 0
D
dapan1121 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
uint64_t schGenUUID(void) {
  static uint64_t hashId = 0;
  static int32_t requestSerialId = 0;

  if (hashId == 0) {
    char    uid[64];
    int32_t code = taosGetSystemUUID(uid, tListLen(uid));
    if (code != TSDB_CODE_SUCCESS) {
      qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    } else {
      hashId = MurmurHash3_32(uid, strlen(uid));
    }
  }

  int64_t ts      = taosGetTimestampMs();
  uint64_t pid    = taosGetPId();
  int32_t val     = atomic_add_fetch_32(&requestSerialId, 1);

  uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
  return id;
}
L
Liu Jicong 已提交
56
#endif
D
dapan1121 已提交
57

L
Liu Jicong 已提交
58 59 60
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
  pTask->plan = pPlan;
  pTask->level = pLevel;
D
dapan1121 已提交
61
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
D
dapan1121 已提交
62
  pTask->taskId = schGenTaskId();
D
dapan1121 已提交
63 64 65
  pTask->execNodes = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SSchNodeInfo));
  if (NULL == pTask->execNodes) {
    SCH_TASK_ELOG("taosArrayInit %d execNodes failed", SCH_MAX_CANDIDATE_EP_NUM);
D
dapan1121 已提交
66 67 68 69 70 71
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
72
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql,
D
dapan1121 已提交
73
                   int64_t startTs, bool syncSchedule) {
H
Hongze Cheng 已提交
74
  int32_t  code = 0;
75
  int64_t  refId = -1;
D
dapan1121 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
    qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->attr.explainMode = pDag->explainInfo.mode;
  pJob->attr.syncSchedule = syncSchedule;
  pJob->transport = transport;
  pJob->sql = sql;

  if (pNodeList != NULL) {
    pJob->nodeList = taosArrayDup(pNodeList);
  }

  SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_JRET(qExecExplainBegin(pDag, &pJob->explainCtx, startTs));
  }

  pJob->execTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->execTasks) {
    SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->succTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->succTasks) {
    SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->failTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->failTasks) {
    SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  tsem_init(&pJob->rspSem, 0, 0);

D
dapan1121 已提交
120
  refId = taosAddRef(schMgmt.jobRef, pJob);
D
dapan1121 已提交
121 122 123 124 125
  if (refId < 0) {
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
    SCH_ERR_JRET(terrno);
  }

D
dapan1121 已提交
126
  atomic_add_fetch_32(&schMgmt.jobNum, 1);
127

D
dapan1121 已提交
128 129
  if (NULL == schAcquireJob(refId)) {
    SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
D
dapan1121 已提交
130
    SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143 144
  }

  pJob->refId = refId;

  SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);

  pJob->status = JOB_TASK_STATUS_NOT_START;

  *pSchJob = pJob;

  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
145 146 147 148 149
  if (refId < 0) {
    schFreeJobImpl(pJob);
  } else {
    taosRemoveRef(schMgmt.jobRef, refId);
  }
D
dapan1121 已提交
150 151 152
  SCH_RET(code);
}

D
dapan1121 已提交
153 154 155 156 157 158 159 160
void schFreeRpcCtx(SRpcCtx *pCtx) {
  if (NULL == pCtx) {
    return;
  }
  void *pIter = taosHashIterate(pCtx->args, NULL);
  while (pIter) {
    SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter;

D
dapan1121 已提交
161
    (*ctxVal->freeFunc)(ctxVal->val);
L
Liu Jicong 已提交
162

D
dapan1121 已提交
163 164
    pIter = taosHashIterate(pCtx->args, pIter);
  }
L
Liu Jicong 已提交
165

D
dapan1121 已提交
166
  taosHashCleanup(pCtx->args);
D
dapan1121 已提交
167

D
dapan1121 已提交
168 169
  if (pCtx->brokenVal.freeFunc) {
    (*pCtx->brokenVal.freeFunc)(pCtx->brokenVal.val);
D
dapan1121 已提交
170
  }
D
dapan1121 已提交
171 172
}

L
Liu Jicong 已提交
173
void schFreeTask(SSchTask *pTask) {
D
dapan1121 已提交
174 175 176 177
  if (pTask->candidateAddrs) {
    taosArrayDestroy(pTask->candidateAddrs);
  }

wafwerar's avatar
wafwerar 已提交
178
  taosMemoryFreeClear(pTask->msg);
D
dapan1121 已提交
179 180 181 182 183 184 185 186

  if (pTask->children) {
    taosArrayDestroy(pTask->children);
  }

  if (pTask->parents) {
    taosArrayDestroy(pTask->parents);
  }
H
Haojun Liao 已提交
187

D
dapan1121 已提交
188 189
  if (pTask->execNodes) {
    taosArrayDestroy(pTask->execNodes);
H
Haojun Liao 已提交
190
  }
D
dapan1121 已提交
191 192
}

D
dapan1121 已提交
193 194 195 196 197 198
static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (pStatus) {
    *pStatus = status;
  }

L
Liu Jicong 已提交
199 200 201
  return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED ||
          status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING ||
          status == JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
202 203
}

D
dapan1121 已提交
204
int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
D
dapan1121 已提交
205
  int32_t lastMsgType = SCH_GET_TASK_LASTMSG_TYPE(pTask);
D
dapan1121 已提交
206
  int32_t taskStatus = SCH_GET_TASK_STATUS(pTask);
D
dapan1121 已提交
207
  int32_t reqMsgType = msgType - 1;
D
dapan1121 已提交
208
  switch (msgType) {
D
dapan1121 已提交
209
    case TDMT_SCH_LINK_BROKEN:
D
dapan1121 已提交
210
    case TDMT_VND_EXPLAIN_RSP:
D
dapan1121 已提交
211
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
212
    case TDMT_VND_QUERY_RSP:  // query_rsp may be processed later than ready_rsp
L
Liu Jicong 已提交
213 214 215
      if (lastMsgType != reqMsgType && -1 != lastMsgType && TDMT_VND_FETCH != lastMsgType) {
        SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
216
      }
L
Liu Jicong 已提交
217

D
dapan1121 已提交
218
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
219 220
        SCH_TASK_DLOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
221
      }
L
Liu Jicong 已提交
222

D
dapan1121 已提交
223
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan1121 已提交
224
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
225 226
    case TDMT_VND_RES_READY_RSP:
      reqMsgType = TDMT_VND_QUERY;
D
dapan1121 已提交
227
      if (lastMsgType != reqMsgType && -1 != lastMsgType) {
L
Liu Jicong 已提交
228 229
        SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s",
                      (lastMsgType > 0 ? TMSG_INFO(lastMsgType) : "null"), TMSG_INFO(msgType));
D
dapan1121 已提交
230 231
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
232

D
dapan1121 已提交
233
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
234 235
        SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
236 237 238
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }

D
dapan1121 已提交
239 240 241
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
      return TSDB_CODE_SUCCESS;
    case TDMT_VND_FETCH_RSP:
L
Liu Jicong 已提交
242 243 244
      if (lastMsgType != reqMsgType && -1 != lastMsgType) {
        SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
245 246
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
247

D
dapan1121 已提交
248
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
249 250
        SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
251 252
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
253

D
dapan1121 已提交
254 255
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
256
    case TDMT_VND_CREATE_TABLE_RSP:
X
Xiaoyu Wang 已提交
257
    case TDMT_VND_DROP_TABLE_RSP:
X
Xiaoyu Wang 已提交
258
    case TDMT_VND_ALTER_TABLE_RSP:
D
dapan1121 已提交
259
    case TDMT_VND_SUBMIT_RSP:
D
dapan1121 已提交
260 261
      break;
    default:
D
dapan1121 已提交
262
      SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus));
D
dapan1121 已提交
263 264 265
      SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
266
  if (lastMsgType != reqMsgType) {
L
Liu Jicong 已提交
267 268
    SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                  TMSG_INFO(msgType));
D
dapan1121 已提交
269 270
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
L
Liu Jicong 已提交
271

D
dapan1121 已提交
272
  if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
273 274
    SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                  TMSG_INFO(msgType));
D
dapan1121 已提交
275 276 277
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

D
dapan1121 已提交
278
  SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan 已提交
279

D
dapan1121 已提交
280 281 282 283
  return TSDB_CODE_SUCCESS;
}

int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
D
dapan 已提交
284 285
  int32_t code = 0;

D
dapan1121 已提交
286
  int8_t oriStatus = 0;
D
dapan1121 已提交
287

D
dapan1121 已提交
288 289 290 291 292 293
  while (true) {
    oriStatus = SCH_GET_JOB_STATUS(pJob);

    if (oriStatus == newStatus) {
      SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
L
Liu Jicong 已提交
294

D
dapan1121 已提交
295 296 297 298 299
    switch (oriStatus) {
      case JOB_TASK_STATUS_NULL:
        if (newStatus != JOB_TASK_STATUS_NOT_START) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
300

D
dapan1121 已提交
301 302 303 304 305
        break;
      case JOB_TASK_STATUS_NOT_START:
        if (newStatus != JOB_TASK_STATUS_EXECUTING) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
306

D
dapan1121 已提交
307 308
        break;
      case JOB_TASK_STATUS_EXECUTING:
L
Liu Jicong 已提交
309 310 311
        if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED && newStatus != JOB_TASK_STATUS_FAILED &&
            newStatus != JOB_TASK_STATUS_CANCELLING && newStatus != JOB_TASK_STATUS_CANCELLED &&
            newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
312 313
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
314

D
dapan1121 已提交
315 316
        break;
      case JOB_TASK_STATUS_PARTIAL_SUCCEED:
L
Liu Jicong 已提交
317 318
        if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_SUCCEED &&
            newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
319 320
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
321

D
dapan1121 已提交
322 323 324 325 326 327 328
        break;
      case JOB_TASK_STATUS_SUCCEED:
      case JOB_TASK_STATUS_FAILED:
      case JOB_TASK_STATUS_CANCELLING:
        if (newStatus != JOB_TASK_STATUS_DROPPING) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
329

D
dapan1121 已提交
330 331 332
        break;
      case JOB_TASK_STATUS_CANCELLED:
      case JOB_TASK_STATUS_DROPPING:
D
dapan1121 已提交
333
        SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
334
        break;
L
Liu Jicong 已提交
335

D
dapan1121 已提交
336
      default:
D
dapan1121 已提交
337
        SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));
D
dapan 已提交
338
        SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
339 340 341 342 343
    }

    if (oriStatus != atomic_val_compare_exchange_8(&pJob->status, oriStatus, newStatus)) {
      continue;
    }
D
dapan 已提交
344

D
dapan1121 已提交
345
    SCH_JOB_DLOG("job status updated from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan1121 已提交
346

D
dapan1121 已提交
347 348
    break;
  }
D
dapan1121 已提交
349

D
dapan 已提交
350 351 352 353
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
354
  SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan 已提交
355
  SCH_ERR_RET(code);
356
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
357 358
}

D
dapan1121 已提交
359
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
D
dapan1121 已提交
360 361
  for (int32_t i = 0; i < pJob->levelNum; ++i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
L
Liu Jicong 已提交
362

D
dapan1121 已提交
363 364 365
    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
      SSubplan *pPlan = pTask->plan;
L
Liu Jicong 已提交
366 367
      int32_t   childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0;
      int32_t   parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0;
D
dapan1121 已提交
368 369

      if (childNum > 0) {
D
dapan1121 已提交
370 371 372 373
        if (pJob->levelIdx == pLevel->level) {
          SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
L
Liu Jicong 已提交
374

D
dapan1121 已提交
375 376 377
        pTask->children = taosArrayInit(childNum, POINTER_BYTES);
        if (NULL == pTask->children) {
          SCH_TASK_ELOG("taosArrayInit %d children failed", childNum);
D
dapan1121 已提交
378 379 380 381 382
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      for (int32_t n = 0; n < childNum; ++n) {
X
Xiaoyu Wang 已提交
383
        SSubplan  *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
X
Xiaoyu Wang 已提交
384
        SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
D
dapan 已提交
385
        if (NULL == childTask || NULL == *childTask) {
D
dapan1121 已提交
386
          SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
387 388 389
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan1121 已提交
390 391
        if (NULL == taosArrayPush(pTask->children, childTask)) {
          SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
392 393 394 395 396
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      if (parentNum > 0) {
D
dapan1121 已提交
397 398 399 400
        if (0 == pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
L
Liu Jicong 已提交
401

D
dapan1121 已提交
402 403 404
        pTask->parents = taosArrayInit(parentNum, POINTER_BYTES);
        if (NULL == pTask->parents) {
          SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
D
dapan1121 已提交
405 406
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
D
dapan1121 已提交
407 408 409 410 411
      } else {
        if (0 != pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:%d, parentNum:%d", pLevel->level, parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
D
dapan1121 已提交
412 413 414
      }

      for (int32_t n = 0; n < parentNum; ++n) {
X
Xiaoyu Wang 已提交
415
        SSubplan  *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
X
Xiaoyu Wang 已提交
416
        SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
D
dapan 已提交
417
        if (NULL == parentTask || NULL == *parentTask) {
D
dapan1121 已提交
418
          SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
419 420 421
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan1121 已提交
422 423
        if (NULL == taosArrayPush(pTask->parents, parentTask)) {
          SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
424 425
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
L
Liu Jicong 已提交
426
      }
D
dapan1121 已提交
427 428

      SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
D
dapan1121 已提交
429 430 431
    }
  }

D
dapan1121 已提交
432
  SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
D
dapan1121 已提交
433
  if (SCH_IS_QUERY_JOB(pJob) && pLevel->taskNum > 1) {
D
dapan1121 已提交
434
    SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
D
dapan 已提交
435 436 437
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

D
dapan1121 已提交
438 439 440
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
441
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
442
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
D
dapan1121 已提交
443
  if (NULL == addr) {
L
Liu Jicong 已提交
444 445
    SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
                  (int32_t)taosArrayGetSize(pTask->candidateAddrs));
D
dapan1121 已提交
446 447
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
D
dapan1121 已提交
448 449

  pTask->succeedAddr = *addr;
450

D
dapan1121 已提交
451
  return TSDB_CODE_SUCCESS;
452 453
}

D
dapan1121 已提交
454 455
int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle) {
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = handle};
L
Liu Jicong 已提交
456

D
dapan1121 已提交
457 458
  if (NULL == taosArrayPush(pTask->execNodes, &nodeInfo)) {
    SCH_TASK_ELOG("taosArrayPush nodeInfo to execNodes list failed, errno:%d", errno);
D
dapan1121 已提交
459 460 461
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
462 463
  SCH_TASK_DLOG("task execNode recorded, handle:%p", handle);

D
dapan1121 已提交
464
  return TSDB_CODE_SUCCESS;
465
}
D
dapan1121 已提交
466

X
Xiaoyu Wang 已提交
467
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
D
dapan1121 已提交
468
  int32_t code = 0;
D
dapan1121 已提交
469
  pJob->queryId = pDag->queryId;
L
Liu Jicong 已提交
470

D
dapan1121 已提交
471 472
  if (pDag->numOfSubplans <= 0) {
    SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
D
dapan 已提交
473 474
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
L
Liu Jicong 已提交
475

X
Xiaoyu Wang 已提交
476
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
477
  if (levelNum <= 0) {
D
dapan1121 已提交
478
    SCH_JOB_ELOG("invalid level num:%d", levelNum);
D
dapan1121 已提交
479
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
480 481
  }

L
Liu Jicong 已提交
482 483 484 485
  SHashObj *planToTask = taosHashInit(
      SCHEDULE_DEFAULT_MAX_TASK_NUM,
      taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
      HASH_NO_LOCK);
D
dapan1121 已提交
486
  if (NULL == planToTask) {
D
dapan1121 已提交
487
    SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
D
dapan1121 已提交
488 489
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
490

D
dapan1121 已提交
491 492
  pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
  if (NULL == pJob->levels) {
D
dapan1121 已提交
493
    SCH_JOB_ELOG("taosArrayInit %d failed", levelNum);
D
dapan1121 已提交
494
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
495 496
  }

D
dapan1121 已提交
497 498
  pJob->levelNum = levelNum;
  pJob->levelIdx = levelNum - 1;
499

D
dapan1121 已提交
500
  pJob->subPlans = pDag->pSubplans;
501

L
Liu Jicong 已提交
502
  SSchLevel      level = {0};
X
Xiaoyu Wang 已提交
503
  SNodeListNode *plans = NULL;
L
Liu Jicong 已提交
504
  int32_t        taskNum = 0;
X
Xiaoyu Wang 已提交
505
  SSchLevel     *pLevel = NULL;
506

D
dapan1121 已提交
507
  level.status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
508

509
  for (int32_t i = 0; i < levelNum; ++i) {
D
dapan1121 已提交
510
    if (NULL == taosArrayPush(pJob->levels, &level)) {
D
dapan1121 已提交
511
      SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i);
D
dapan1121 已提交
512 513 514
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

D
dapan1121 已提交
515
    pLevel = taosArrayGet(pJob->levels, i);
D
dapan1121 已提交
516
    pLevel->level = i;
L
Liu Jicong 已提交
517 518

    plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
D
dapan1121 已提交
519 520
    if (NULL == plans) {
      SCH_JOB_ELOG("empty level plan, level:%d", i);
D
dapan1121 已提交
521
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
522 523
    }

X
Xiaoyu Wang 已提交
524
    taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
D
dapan1121 已提交
525 526
    if (taskNum <= 0) {
      SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
D
dapan1121 已提交
527
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
528 529
    }

D
dapan1121 已提交
530
    pLevel->taskNum = taskNum;
L
Liu Jicong 已提交
531

D
dapan1121 已提交
532
    pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
D
dapan1121 已提交
533
    if (NULL == pLevel->subTasks) {
D
dapan1121 已提交
534
      SCH_JOB_ELOG("taosArrayInit %d failed", taskNum);
D
dapan1121 已提交
535
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
536
    }
L
Liu Jicong 已提交
537

D
dapan1121 已提交
538
    for (int32_t n = 0; n < taskNum; ++n) {
L
Liu Jicong 已提交
539
      SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
D
dapan 已提交
540

D
dapan1121 已提交
541
      SCH_SET_JOB_TYPE(pJob, plan->subplanType);
D
dapan1121 已提交
542 543 544

      SSchTask  task = {0};
      SSchTask *pTask = &task;
545

D
dapan1121 已提交
546
      SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel));
L
Liu Jicong 已提交
547

D
dapan1121 已提交
548
      void *p = taosArrayPush(pLevel->subTasks, &task);
D
dapan1121 已提交
549
      if (NULL == p) {
D
dapan1121 已提交
550
        SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
D
dapan1121 已提交
551 552
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
553

D
dapan1121 已提交
554
      if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
D
dapan1121 已提交
555
        SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
D
dapan1121 已提交
556 557
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
558 559

      ++pJob->taskNum;
D
dapan1121 已提交
560
    }
D
dapan1121 已提交
561

D
dapan1121 已提交
562
    SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum);
D
dapan1121 已提交
563
  }
D
dapan1121 已提交
564 565

  SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
D
dapan1121 已提交
566 567

_return:
D
dapan1121 已提交
568 569 570 571
  if (planToTask) {
    taosHashCleanup(planToTask);
  }

D
dapan1121 已提交
572
  SCH_RET(code);
573 574
}

D
dapan1121 已提交
575 576
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
  if (NULL != pTask->candidateAddrs) {
D
dapan 已提交
577 578 579
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
580
  pTask->candidateIdx = 0;
581
  pTask->candidateAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
D
dapan1121 已提交
582
  if (NULL == pTask->candidateAddrs) {
583
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CANDIDATE_EP_NUM);
D
dapan1121 已提交
584 585 586
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

L
Liu Jicong 已提交
587
  if (pTask->plan->execNode.epSet.numOfEps > 0) {
D
dapan1121 已提交
588 589
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
D
dapan1121 已提交
590 591 592
      SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

L
Liu Jicong 已提交
593
    SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
D
dapan1121 已提交
594

D
dapan1121 已提交
595 596 597 598
    return TSDB_CODE_SUCCESS;
  }

  int32_t addNum = 0;
D
dapan 已提交
599
  int32_t nodeNum = 0;
600
  if (pJob->nodeList) {
D
dapan 已提交
601
    nodeNum = taosArrayGetSize(pJob->nodeList);
L
Liu Jicong 已提交
602

603
    for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
D
dapan 已提交
604
      SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i);
L
Liu Jicong 已提交
605

606
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
D
dapan 已提交
607 608 609
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
610 611

      ++addNum;
D
dapan1121 已提交
612
    }
D
dapan1121 已提交
613 614
  }

D
dapan1121 已提交
615
  if (addNum <= 0) {
H
Haojun Liao 已提交
616
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
D
dapan1121 已提交
617
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
618 619
  }

L
Liu Jicong 已提交
620 621 622 623 624 625 626 627
  /*
    for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
      strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
      epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];

      ++epSet->numOfEps;
    }
  */
D
dapan 已提交
628 629

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
630
}
D
dapan1121 已提交
631

D
dapan1121 已提交
632
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
633 634 635
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
636
      SCH_TASK_ELOG("task already in execTask list, code:%x", code);
D
dapan1121 已提交
637 638
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }
L
Liu Jicong 已提交
639

D
dapan1121 已提交
640
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
D
dapan 已提交
641 642 643
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
644
  SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
645

D
dapan 已提交
646 647 648
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
649 650
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
651
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan 已提交
652 653
  } else {
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
D
dapan 已提交
654 655
  }

D
dapan1121 已提交
656 657 658 659
  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
D
dapan1121 已提交
660
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
661 662
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
663

D
dapan1121 已提交
664
    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno);
D
dapan1121 已提交
665 666 667 668
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
D
dapan1121 已提交
669 670

  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));
L
Liu Jicong 已提交
671

D
dapan1121 已提交
672 673 674
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
675 676
int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  *moved = false;
L
Liu Jicong 已提交
677

D
dapan1121 已提交
678
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
679
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
680 681
  }

D
dapan1121 已提交
682 683 684 685
  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
L
Liu Jicong 已提交
686

D
dapan1121 已提交
687
      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
688 689
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
690

D
dapan1121 已提交
691
    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno);
D
dapan 已提交
692 693 694 695
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
D
dapan1121 已提交
696 697

  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));
L
Liu Jicong 已提交
698

D
dapan 已提交
699 700 701
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
702 703
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
704
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
705 706 707 708 709 710
  }

  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
L
Liu Jicong 已提交
711

D
dapan1121 已提交
712
      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
713 714
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
715

D
dapan1121 已提交
716 717 718 719 720 721 722
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
L
Liu Jicong 已提交
723

D
dapan1121 已提交
724 725 726
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
727
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
D
dapan1121 已提交
728 729
  int8_t status = 0;
  ++pTask->tryTimes;
L
Liu Jicong 已提交
730

D
dapan1121 已提交
731 732 733 734 735
  if (schJobNeedToStop(pJob, &status)) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry cause of job status, job status:%s", jobTaskStatusStr(status));
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
736

D
dapan1121 已提交
737 738 739 740 741
  if (pTask->tryTimes >= REQUEST_MAX_TRY_TIMES) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry since reach max try times, tryTimes:%d", pTask->tryTimes);
    return TSDB_CODE_SUCCESS;
  }
L
Liu Jicong 已提交
742

D
dapan1121 已提交
743 744 745 746 747
  if (!NEED_SCHEDULER_RETRY_ERROR(errCode)) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
748

L
Liu Jicong 已提交
749
  // TODO CHECK epList/condidateList
D
dapan1121 已提交
750 751 752
  if (SCH_IS_DATA_SRC_TASK(pTask)) {
    if (pTask->tryTimes >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
      *needRetry = false;
L
Liu Jicong 已提交
753 754
      SCH_TASK_DLOG("task no more retry since all ep tried, tryTimes:%d, epNum:%d", pTask->tryTimes,
                    SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
D
dapan1121 已提交
755 756
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
757 758
  } else {
    int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
L
Liu Jicong 已提交
759

D
dapan1121 已提交
760
    if ((pTask->candidateIdx + 1) >= candidateNum) {
D
dapan1121 已提交
761
      *needRetry = false;
L
Liu Jicong 已提交
762 763
      SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
                    pTask->candidateIdx, candidateNum);
D
dapan1121 已提交
764 765 766 767
      return TSDB_CODE_SUCCESS;
    }
  }

D
dapan1121 已提交
768 769
  *needRetry = true;
  SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->tryTimes, errCode, tstrerror(errCode));
L
Liu Jicong 已提交
770

D
dapan1121 已提交
771
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
772 773 774 775 776 777 778
}

int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
  atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);

  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));
D
dapan1121 已提交
779
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
D
dapan1121 已提交
780 781
  }

D
dapan1121 已提交
782
  if (SCH_IS_DATA_SRC_TASK(pTask)) {
D
dapan1121 已提交
783 784 785 786 787 788 789 790
    SCH_SWITCH_EPSET(&pTask->plan->execNode);
  } else {
    ++pTask->candidateIdx;
  }

  SCH_ERR_RET(schLaunchTask(pJob, pTask));

  return TSDB_CODE_SUCCESS;
D
dapan 已提交
791 792
}

D
dapan1121 已提交
793
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
D
dapan1121 已提交
794 795 796 797 798 799
  int32_t      code = 0;
  SSchHbTrans *hb = NULL;

  hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
    qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
D
dapan1121 已提交
800
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
801 802
  }

D
dapan1121 已提交
803
  SCH_LOCK(SCH_WRITE, &hb->lock);
D
dapan1121 已提交
804
  memcpy(&hb->trans, trans, sizeof(*trans));
D
dapan1121 已提交
805
  SCH_UNLOCK(SCH_WRITE, &hb->lock);
D
dapan1121 已提交
806

L
Liu Jicong 已提交
807 808
  qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, handle:%p", schMgmt.sId,
         epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->transInst, trans->transHandle);
L
Liu Jicong 已提交
809

D
dapan1121 已提交
810 811 812
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
  if (TSDB_CODE_SUCCESS == errCode) {
    return;
  }

  int32_t origCode = atomic_load_32(&pJob->errCode);
  if (TSDB_CODE_SUCCESS == origCode) {
    if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) {
      goto _return;
    }

    origCode = atomic_load_32(&pJob->errCode);
  }

  if (NEED_CLIENT_HANDLE_ERROR(origCode)) {
    return;
  }
L
Liu Jicong 已提交
830

D
dapan1121 已提交
831
  if (NEED_CLIENT_HANDLE_ERROR(errCode)) {
D
dapan1121 已提交
832
    atomic_store_32(&pJob->errCode, errCode);
D
dapan1121 已提交
833
    goto _return;
D
dapan1121 已提交
834 835
  }

D
dapan1121 已提交
836
  return;
L
Liu Jicong 已提交
837 838

_return:
D
dapan1121 已提交
839 840 841 842 843 844 845 846 847 848

  SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode));
}

int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) {
  // if already FAILED, no more processing
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, status));

  schUpdateJobErrCode(pJob, errCode);

D
dapan1121 已提交
849
  if (atomic_load_8(&pJob->userFetch) || pJob->attr.syncSchedule) {
D
dapan1121 已提交
850 851 852
    tsem_post(&pJob->rspSem);
  }

D
dapan1121 已提交
853
  int32_t code = atomic_load_32(&pJob->errCode);
D
dapan1121 已提交
854

D
dapan1121 已提交
855
  SCH_JOB_DLOG("job failed with error: %s", tstrerror(code));
L
Liu Jicong 已提交
856

D
dapan1121 已提交
857
  SCH_RET(code);
D
dapan1121 已提交
858 859
}

D
dapan1121 已提交
860
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
861 862 863 864 865 866 867 868 869
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
  SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAILED, errCode));
}

// Note: no more error processing, handled in function internal
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) {
  SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_DROPPING, errCode));
}

D
dapan1121 已提交
870
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
871 872
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
  int32_t code = 0;
L
Liu Jicong 已提交
873

D
dapan1121 已提交
874
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_PARTIAL_SUCCEED));
D
dapan 已提交
875

D
dapan1121 已提交
876
  if (pJob->attr.syncSchedule) {
D
dapan 已提交
877
    tsem_post(&pJob->rspSem);
D
dapan 已提交
878
  }
L
Liu Jicong 已提交
879

D
dapan1121 已提交
880 881 882
  if (atomic_load_8(&pJob->userFetch)) {
    SCH_ERR_JRET(schFetchFromRemote(pJob));
  }
D
dapan 已提交
883

D
dapan 已提交
884
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
885 886 887

_return:

D
dapan1121 已提交
888
  SCH_RET(schProcessOnJobFailure(pJob, code));
D
dapan 已提交
889 890
}

891
void schProcessOnDataFetched(SSchJob *job) {
D
dapan 已提交
892 893
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
  tsem_post(&job->rspSem);
D
dapan 已提交
894 895
}

D
dapan1121 已提交
896
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
897
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
D
dapan1121 已提交
898
  int8_t status = 0;
L
Liu Jicong 已提交
899

D
dapan1121 已提交
900
  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
901
    SCH_TASK_DLOG("task failed not processed cause of job status, job status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
902 903 904
    SCH_RET(atomic_load_32(&pJob->errCode));
  }

L
Liu Jicong 已提交
905 906
  bool    needRetry = false;
  bool    moved = false;
D
dapan1121 已提交
907
  int32_t taskDone = 0;
D
dapan1121 已提交
908
  int32_t code = 0;
D
dapan1121 已提交
909

H
Haojun Liao 已提交
910
  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
L
Liu Jicong 已提交
911

D
dapan1121 已提交
912
  SCH_ERR_JRET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
L
Liu Jicong 已提交
913

D
dapan1121 已提交
914
  if (!needRetry) {
H
Haojun Liao 已提交
915
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
D
dapan1121 已提交
916 917

    if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
D
dapan1121 已提交
918 919
      SCH_ERR_JRET(schMoveTaskToFailList(pJob, pTask, &moved));
    } else {
D
dapan1121 已提交
920
      SCH_TASK_ELOG("task not in executing list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
921
      SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
922 923 924
    }

    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED);
L
Liu Jicong 已提交
925

D
dapan1121 已提交
926
    if (SCH_IS_WAIT_ALL_JOB(pJob)) {
D
dapan1121 已提交
927 928 929 930 931
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskFailed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);

D
dapan1121 已提交
932
      schUpdateJobErrCode(pJob, errCode);
L
Liu Jicong 已提交
933

D
dapan1121 已提交
934
      if (taskDone < pTask->level->taskNum) {
L
Liu Jicong 已提交
935
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
D
dapan 已提交
936
        SCH_RET(errCode);
D
dapan1121 已提交
937 938 939
      }
    }
  } else {
D
dapan1121 已提交
940
    SCH_ERR_JRET(schHandleTaskRetry(pJob, pTask));
L
Liu Jicong 已提交
941

D
dapan 已提交
942 943
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
944

D
dapan1121 已提交
945 946
_return:

D
dapan1121 已提交
947
  SCH_RET(schProcessOnJobFailure(pJob, errCode));
D
dapan1121 已提交
948 949
}

D
dapan1121 已提交
950
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
951
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
952
  bool    moved = false;
D
dapan1121 已提交
953 954
  int32_t code = 0;

D
dapan1121 已提交
955
  SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan 已提交
956

D
dapan1121 已提交
957
  SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved));
D
dapan1121 已提交
958

D
dapan1121 已提交
959
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED);
D
dapan1121 已提交
960

D
dapan1121 已提交
961
  SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask));
D
dapan1121 已提交
962 963

  SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
L
Liu Jicong 已提交
964

D
dapan1121 已提交
965
  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
D
dapan 已提交
966
  if (parentNum == 0) {
L
Liu Jicong 已提交
967
    int32_t taskDone = 0;
D
dapan1121 已提交
968
    if (SCH_IS_WAIT_ALL_JOB(pJob)) {
D
dapan1121 已提交
969 970 971 972
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskSucceed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
L
Liu Jicong 已提交
973

D
dapan1121 已提交
974
      if (taskDone < pTask->level->taskNum) {
S
Shengliang Guan 已提交
975
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
976
        return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
977
      } else if (taskDone > pTask->level->taskNum) {
D
dapan1121 已提交
978
        SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
979 980
      }

D
dapan1121 已提交
981
      if (pTask->level->taskFailed > 0) {
D
dapan1121 已提交
982 983 984
        SCH_RET(schProcessOnJobFailure(pJob, 0));
      } else {
        SCH_RET(schProcessOnJobPartialSuccess(pJob));
D
dapan1121 已提交
985 986
      }
    } else {
D
dapan1121 已提交
987
      pJob->resNode = pTask->succeedAddr;
D
dapan1121 已提交
988
    }
D
dapan 已提交
989

D
dapan1121 已提交
990
    pJob->fetchTask = pTask;
D
dapan1121 已提交
991

D
dapan1121 已提交
992
    SCH_ERR_JRET(schMoveTaskToExecList(pJob, pTask, &moved));
L
Liu Jicong 已提交
993

D
dapan1121 已提交
994
    SCH_RET(schProcessOnJobPartialSuccess(pJob));
D
dapan 已提交
995 996
  }

L
Liu Jicong 已提交
997 998 999 1000
  /*
    if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
      strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
      job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
D
dapan 已提交
1001

L
Liu Jicong 已提交
1002 1003 1004
      ++job->dataSrcEps.numOfEps;
    }
  */
D
dapan 已提交
1005

D
dapan 已提交
1006
  for (int32_t i = 0; i < parentNum; ++i) {
D
dapan1121 已提交
1007
    SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i);
L
Liu Jicong 已提交
1008
    int32_t   readyNum = atomic_add_fetch_32(&par->childReady, 1);
D
dapan 已提交
1009

D
dapan1121 已提交
1010
    SCH_LOCK(SCH_WRITE, &par->lock);
L
Liu Jicong 已提交
1011 1012 1013 1014
    SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE,
                                    .taskId = pTask->taskId,
                                    .schedId = schMgmt.sId,
                                    .addr = pTask->succeedAddr};
X
Xiaoyu Wang 已提交
1015
    qSetSubplanExecutionNode(par->plan, pTask->plan->id.groupId, &source);
D
dapan1121 已提交
1016
    SCH_UNLOCK(SCH_WRITE, &par->lock);
L
Liu Jicong 已提交
1017

D
dapan 已提交
1018
    if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) {
D
dapan1121 已提交
1019
      SCH_ERR_RET(schLaunchTaskImpl(pJob, par));
D
dapan 已提交
1020 1021 1022 1023 1024
    }
  }

  return TSDB_CODE_SUCCESS;

D
dapan1121 已提交
1025
_return:
D
dapan 已提交
1026

D
dapan1121 已提交
1027 1028
  SCH_RET(schProcessOnJobFailure(pJob, code));
}
D
dapan 已提交
1029

D
dapan1121 已提交
1030 1031 1032
// Note: no more error processing, handled in function internal
int32_t schFetchFromRemote(SSchJob *pJob) {
  int32_t code = 0;
L
Liu Jicong 已提交
1033

D
dapan1121 已提交
1034 1035 1036 1037 1038
  if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) {
    SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch));
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1039 1040
  void *resData = atomic_load_ptr(&pJob->resData);
  if (resData) {
D
dapan1121 已提交
1041 1042
    atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1043
    SCH_JOB_DLOG("res already fetched, res:%p", resData);
D
dapan1121 已提交
1044 1045 1046 1047 1048 1049
    return TSDB_CODE_SUCCESS;
  }

  SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH));

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1050

D
dapan1121 已提交
1051 1052 1053 1054
_return:

  atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1055
  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
D
dapan1121 已提交
1056 1057
}

D
dapan1121 已提交
1058 1059
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
  SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
H
Hongze Cheng 已提交
1060

D
dapan1121 已提交
1061 1062
  pJob->resType = SCH_RES_TYPE_FETCH;

D
dapan1121 已提交
1063 1064 1065 1066
  atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
  atomic_store_ptr(&pJob->resData, pRsp);

  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
H
Hongze Cheng 已提交
1067

D
dapan1121 已提交
1068 1069 1070 1071 1072
  schProcessOnDataFetched(pJob);

  return TSDB_CODE_SUCCESS;
}

1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093
int32_t schSaveJobQueryRes(SSchJob *pJob, SResReadyRsp *rsp) {
  if (rsp->tbFName[0]) {
    if (NULL == pJob->resData) {
      pJob->resData = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
      if (NULL == pJob->resData) {
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
      }
    }

    STbVerInfo tbInfo;
    strcpy(tbInfo.tbFName, rsp->tbFName);
    tbInfo.sversion = rsp->sversion;
    tbInfo.tversion = rsp->tversion;

    taosArrayPush((SArray *)pJob->resData, &tbInfo);
  }

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
1094
// Note: no more task error processing, handled in function internal
L
Liu Jicong 已提交
1095 1096
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize,
                             int32_t rspCode) {
D
dapan1121 已提交
1097
  int32_t code = 0;
L
Liu Jicong 已提交
1098 1099
  int8_t  status = 0;

D
dapan1121 已提交
1100
  if (schJobNeedToStop(pJob, &status)) {
L
Liu Jicong 已提交
1101 1102
    SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status),
                  rspCode);
D
dapan1121 已提交
1103 1104
    SCH_RET(atomic_load_32(&pJob->errCode));
  }
H
Haojun Liao 已提交
1105

D
dapan1121 已提交
1106 1107
  SCH_ERR_JRET(schValidateTaskReceivedMsgType(pJob, pTask, msgType));

D
dapan1121 已提交
1108
  switch (msgType) {
H
Haojun Liao 已提交
1109
    case TDMT_VND_CREATE_TABLE_RSP: {
X
Xiaoyu Wang 已提交
1110 1111
      SVCreateTbBatchRsp batchRsp = {0};
      if (msg) {
H
Hongze Cheng 已提交
1112 1113
        SDecoder coder = {0};
        tDecoderInit(&coder, msg, msgSize);
1114 1115 1116 1117
        code = tDecodeSVCreateTbBatchRsp(&coder, &batchRsp);
        if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
          for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
            SVCreateTbRsp *rsp = batchRsp.pRsps + i;
X
Xiaoyu Wang 已提交
1118
            if (TSDB_CODE_SUCCESS != rsp->code) {
1119
              code = rsp->code;
X
Xiaoyu Wang 已提交
1120 1121
              tDecoderClear(&coder);
              SCH_ERR_JRET(code);
D
dapan 已提交
1122 1123
            }
          }
D
dapan1121 已提交
1124
        }
H
Hongze Cheng 已提交
1125
        tDecoderClear(&coder);
1126
        SCH_ERR_JRET(code);
L
Liu Jicong 已提交
1127 1128
      }

L
Liu Jicong 已提交
1129 1130 1131 1132
      SCH_ERR_JRET(rspCode);
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
      break;
    }
X
Xiaoyu Wang 已提交
1133 1134 1135
    case TDMT_VND_DROP_TABLE_RSP: {
      SVDropTbBatchRsp batchRsp = {0};
      if (msg) {
H
Hongze Cheng 已提交
1136 1137
        SDecoder coder = {0};
        tDecoderInit(&coder, msg, msgSize);
X
Xiaoyu Wang 已提交
1138
        code = tDecodeSVDropTbBatchRsp(&coder, &batchRsp);
1139 1140 1141
        if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
          for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
            SVDropTbRsp *rsp = batchRsp.pRsps + i;
X
Xiaoyu Wang 已提交
1142
            if (TSDB_CODE_SUCCESS != rsp->code) {
1143
              code = rsp->code;
X
Xiaoyu Wang 已提交
1144 1145
              tDecoderClear(&coder);
              SCH_ERR_JRET(code);
X
Xiaoyu Wang 已提交
1146 1147 1148
            }
          }
        }
H
Hongze Cheng 已提交
1149
        tDecoderClear(&coder);
X
Xiaoyu Wang 已提交
1150 1151 1152 1153 1154 1155 1156
        SCH_ERR_JRET(code);
      }

      SCH_ERR_JRET(rspCode);
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
      break;
    }
X
Xiaoyu Wang 已提交
1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172
    case TDMT_VND_ALTER_TABLE_RSP: {
      SVAlterTbRsp rsp = {0};
      if (msg) {
        SDecoder coder = {0};
        tDecoderInit(&coder, msg, msgSize);
        code = tDecodeSVAlterTbRsp(&coder, &rsp);
        tDecoderClear(&coder);
        SCH_ERR_JRET(code);
        SCH_ERR_JRET(rsp.code);
      }

      SCH_ERR_JRET(rspCode);

      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }
X
Xiaoyu Wang 已提交
1173
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
X
Xiaoyu Wang 已提交
1174 1175
      break;
    }
D
dapan1121 已提交
1176
    case TDMT_VND_SUBMIT_RSP: {
L
Liu Jicong 已提交
1177
      SCH_ERR_JRET(rspCode);
D
dapan1121 已提交
1178

D
dapan 已提交
1179
      if (msg) {
X
Xiaoyu Wang 已提交
1180
        SDecoder    coder = {0};
D
dapan 已提交
1181 1182 1183 1184 1185 1186 1187 1188
        SSubmitRsp *rsp = taosMemoryMalloc(sizeof(*rsp));
        tDecoderInit(&coder, msg, msgSize);
        code = tDecodeSSubmitRsp(&coder, rsp);
        if (code) {
          SCH_TASK_ELOG("decode submitRsp failed, code:%d", code);
          tFreeSSubmitRsp(rsp);
          SCH_ERR_JRET(code);
        }
X
Xiaoyu Wang 已提交
1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200

        if (rsp->nBlocks > 0) {
          for (int32_t i = 0; i < rsp->nBlocks; ++i) {
            SSubmitBlkRsp *blk = rsp->pBlocks + i;
            if (TSDB_CODE_SUCCESS != blk->code) {
              code = blk->code;
              tFreeSSubmitRsp(rsp);
              SCH_ERR_JRET(code);
            }
          }
        }

D
dapan 已提交
1201 1202
        atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
        SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows);
X
Xiaoyu Wang 已提交
1203

D
dapan1121 已提交
1204 1205 1206 1207 1208 1209 1210 1211 1212 1213
        pJob->resType = SCH_RES_TYPE_QUERY;
        SCH_LOCK(SCH_WRITE, &pJob->resLock);
        if (pJob->resData) {
          SSubmitRsp *sum = pJob->resData;
          sum->affectedRows += rsp->affectedRows;
          sum->nBlocks += rsp->nBlocks;
          sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks));
          memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks));
          taosMemoryFree(rsp->pBlocks);
          taosMemoryFree(rsp);
D
dapan 已提交
1214
        } else {
D
dapan1121 已提交
1215
          pJob->resData = rsp;
D
dapan 已提交
1216
        }
D
dapan1121 已提交
1217
        SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
L
Liu Jicong 已提交
1218
      }
D
dapan1121 已提交
1219

L
Liu Jicong 已提交
1220
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
D
dapan1121 已提交
1221

L
Liu Jicong 已提交
1222 1223
      break;
    }
D
dapan1121 已提交
1224
    case TDMT_VND_QUERY_RSP: {
L
Liu Jicong 已提交
1225 1226
      SQueryTableRsp rsp = {0};
      if (msg) {
D
dapan1121 已提交
1227
        SCH_ERR_JRET(tDeserializeSQueryTableRsp(msg, msgSize, &rsp));
L
Liu Jicong 已提交
1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239
        SCH_ERR_JRET(rsp.code);
      }

      SCH_ERR_JRET(rspCode);

      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }

      // SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY));

      break;
L
Liu Jicong 已提交
1240
    }
D
dapan1121 已提交
1241
    case TDMT_VND_RES_READY_RSP: {
L
Liu Jicong 已提交
1242 1243 1244 1245 1246
      SResReadyRsp *rsp = (SResReadyRsp *)msg;

      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
1247
      }
L
Liu Jicong 已提交
1248
      SCH_ERR_JRET(rsp->code);
1249 1250 1251 1252
      pJob->resType = SCH_RES_TYPE_QUERY;

      SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp));
      
L
Liu Jicong 已提交
1253
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
D
dapan1121 已提交
1254

L
Liu Jicong 已提交
1255 1256
      break;
    }
D
dapan1121 已提交
1257 1258 1259 1260 1261
    case TDMT_VND_EXPLAIN_RSP: {
      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }
H
Hongze Cheng 已提交
1262

D
dapan1121 已提交
1263 1264 1265 1266 1267 1268 1269 1270 1271 1272
      if (!SCH_IS_EXPLAIN_JOB(pJob)) {
        SCH_TASK_ELOG("invalid msg received for none explain query, msg type:%s", TMSG_INFO(msgType));
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }

      if (pJob->resData) {
        SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->resData);
        SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
      }

D
dapan1121 已提交
1273 1274 1275 1276 1277 1278
      SExplainRsp rsp = {0};
      if (tDeserializeSExplainRsp(msg, msgSize, &rsp)) {
        taosMemoryFree(rsp.subplanInfo);
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

D
dapan1121 已提交
1279
      SRetrieveTableRsp *pRsp = NULL;
D
dapan1121 已提交
1280
      SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, &rsp, pTask->plan->id.groupId, &pRsp));
D
dapan1121 已提交
1281 1282

      if (pRsp) {
D
dapan1121 已提交
1283
        SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
D
dapan1121 已提交
1284 1285 1286
      }
      break;
    }
L
Liu Jicong 已提交
1287 1288
    case TDMT_VND_FETCH_RSP: {
      SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
D
dapan1121 已提交
1289

L
Liu Jicong 已提交
1290 1291 1292 1293
      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }
D
dapan1121 已提交
1294

D
dapan1121 已提交
1295
      if (SCH_IS_EXPLAIN_JOB(pJob)) {
H
Hongze Cheng 已提交
1296
        if (rsp->completed) {
D
dapan1121 已提交
1297 1298 1299 1300 1301
          SRetrieveTableRsp *pRsp = NULL;
          SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
          if (pRsp) {
            SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
          }
H
Hongze Cheng 已提交
1302

D
dapan1121 已提交
1303 1304 1305
          return TSDB_CODE_SUCCESS;
        }

D
dapan1121 已提交
1306 1307
        atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1308 1309 1310 1311 1312
        SCH_ERR_JRET(schFetchFromRemote(pJob));

        return TSDB_CODE_SUCCESS;
      }

X
Xiaoyu Wang 已提交
1313 1314
      if (pJob->resData) {
        SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->resData);
wafwerar's avatar
wafwerar 已提交
1315
        taosMemoryFreeClear(rsp);
L
Liu Jicong 已提交
1316 1317
        SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
      }
H
Haojun Liao 已提交
1318

X
Xiaoyu Wang 已提交
1319
      atomic_store_ptr(&pJob->resData, rsp);
L
Liu Jicong 已提交
1320
      atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
D
dapan1121 已提交
1321

L
Liu Jicong 已提交
1322 1323
      if (rsp->completed) {
        SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
1324
      }
L
Liu Jicong 已提交
1325 1326 1327 1328 1329 1330

      SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);

      schProcessOnDataFetched(pJob);
      break;
    }
D
dapan1121 已提交
1331
    case TDMT_VND_DROP_TASK_RSP: {
L
Liu Jicong 已提交
1332 1333 1334 1335 1336
      // SHOULD NEVER REACH HERE
      SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:%" PRIx64, pJob->refId);
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
      break;
    }
D
dapan1121 已提交
1337 1338 1339 1340
    case TDMT_SCH_LINK_BROKEN:
      SCH_TASK_ELOG("link broken received, error:%x - %s", rspCode, tstrerror(rspCode));
      SCH_ERR_JRET(rspCode);
      break;
D
dapan1121 已提交
1341
    default:
D
dapan1121 已提交
1342
      SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%s", msgType, SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
1343
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
1344 1345 1346 1347 1348
  }

  return TSDB_CODE_SUCCESS;

_return:
D
dapan1121 已提交
1349

D
dapan1121 已提交
1350
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
1351 1352
}

D
dapan1121 已提交
1353
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
H
Hongze Cheng 已提交
1354 1355 1356 1357
  int32_t s = taosHashGetSize(pTaskList);
  if (s <= 0) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1358

H
Hongze Cheng 已提交
1359 1360 1361 1362
  SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
  if (NULL == task || NULL == (*task)) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1363

H
Hongze Cheng 已提交
1364 1365 1366
  *pTask = *task;

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1367 1368
}

D
dapan1121 已提交
1369
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) {
H
Hongze Cheng 已提交
1370 1371
  if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 ||
      taosArrayGetSize(pTask->execNodes) <= 0) {
D
dapan1121 已提交
1372 1373 1374 1375 1376 1377 1378 1379 1380
    return TSDB_CODE_SUCCESS;
  }

  SSchNodeInfo *nodeInfo = taosArrayGet(pTask->execNodes, 0);
  nodeInfo->handle = handle;

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1381
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
L
Liu Jicong 已提交
1382
  int32_t                code = 0;
D
dapan1121 已提交
1383
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
X
Xiaoyu Wang 已提交
1384
  SSchTask              *pTask = NULL;
L
Liu Jicong 已提交
1385

D
dapan1121 已提交
1386
  SSchJob *pJob = schAcquireJob(pParam->refId);
D
dapan1121 已提交
1387
  if (NULL == pJob) {
D
dapan1121 已提交
1388
    qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:%" PRIx64,
L
Liu Jicong 已提交
1389
          pParam->queryId, pParam->taskId, pParam->refId);
D
dapan1121 已提交
1390
    SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
1391 1392
  }

D
dapan1121 已提交
1393 1394 1395 1396 1397
  schGetTaskFromTaskList(pJob->execTasks, pParam->taskId, &pTask);
  if (NULL == pTask) {
    if (TDMT_VND_EXPLAIN_RSP == msgType) {
      schGetTaskFromTaskList(pJob->succTasks, pParam->taskId, &pTask);
    } else {
H
Hongze Cheng 已提交
1398 1399
      SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
                   pParam->taskId);
D
dapan1121 已提交
1400 1401
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }
D
dapan1121 已提交
1402
  }
H
Hongze Cheng 已提交
1403

D
dapan1121 已提交
1404
  if (NULL == pTask) {
H
Hongze Cheng 已提交
1405 1406
    SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
                 pParam->taskId);
D
dapan1121 已提交
1407 1408
    SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
D
dapan1121 已提交
1409

D
dapan1121 已提交
1410
  SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
L
Liu Jicong 已提交
1411

L
Liu Jicong 已提交
1412
  SCH_SET_TASK_HANDLE(pTask, pMsg->handle);
D
dapan1121 已提交
1413
  schUpdateTaskExecNodeHandle(pTask, pMsg->handle, rspCode);
D
dapan1121 已提交
1414
  SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
D
dapan1121 已提交
1415

H
Haojun Liao 已提交
1416
_return:
D
dapan1121 已提交
1417
  if (pJob) {
D
dapan1121 已提交
1418
    schReleaseJob(pParam->refId);
D
dapan1121 已提交
1419 1420
  }

wafwerar's avatar
wafwerar 已提交
1421
  taosMemoryFreeClear(param);
D
dapan1121 已提交
1422 1423 1424
  SCH_RET(code);
}

L
Liu Jicong 已提交
1425
int32_t schHandleSubmitCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1426 1427
  return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
}
H
Haojun Liao 已提交
1428

L
Liu Jicong 已提交
1429
int32_t schHandleCreateTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
H
Haojun Liao 已提交
1430 1431 1432
  return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
}

X
Xiaoyu Wang 已提交
1433 1434 1435 1436
int32_t schHandleDropTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_DROP_TABLE_RSP, code);
}

X
Xiaoyu Wang 已提交
1437 1438 1439 1440
int32_t schHandleAlterTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_ALTER_TABLE_RSP, code);
}

L
Liu Jicong 已提交
1441
int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1442 1443
  return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
H
Haojun Liao 已提交
1444

L
Liu Jicong 已提交
1445
int32_t schHandleFetchCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1446 1447
  return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
}
H
Haojun Liao 已提交
1448

L
Liu Jicong 已提交
1449
int32_t schHandleReadyCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1450 1451
  return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
}
H
Haojun Liao 已提交
1452

D
dapan1121 已提交
1453 1454 1455 1456
int32_t schHandleExplainCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_EXPLAIN_RSP, code);
}

L
Liu Jicong 已提交
1457
int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1458
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
L
Liu Jicong 已提交
1459
  qDebug("QID:%" PRIx64 ",TID:%" PRIx64 " drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code);
1460
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1461 1462
}

L
Liu Jicong 已提交
1463
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1464 1465 1466
  SSchedulerHbRsp rsp = {0};
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;

D
dapan1121 已提交
1467 1468
  if (code) {
    qError("hb rsp error:%s", tstrerror(code));
D
dapan1121 已提交
1469
    SCH_ERR_JRET(code);
D
dapan1121 已提交
1470
  }
L
Liu Jicong 已提交
1471

D
dapan1121 已提交
1472 1473
  if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) {
    qError("invalid hb rsp msg, size:%d", pMsg->len);
D
dapan1121 已提交
1474
    SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
1475 1476
  }

D
dapan1121 已提交
1477 1478 1479
  SSchTrans trans = {0};
  trans.transInst = pParam->transport;
  trans.transHandle = pMsg->handle;
L
Liu Jicong 已提交
1480

D
dapan1121 已提交
1481
  SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans));
D
dapan1121 已提交
1482 1483

  int32_t taskNum = (int32_t)taosArrayGetSize(rsp.taskStatus);
L
Liu Jicong 已提交
1484 1485
  qDebug("%d task status in hb rsp, nodeId:%d, fqdn:%s, port:%d", taskNum, rsp.epId.nodeId, rsp.epId.ep.fqdn,
         rsp.epId.ep.port);
D
dapan1121 已提交
1486

D
dapan1121 已提交
1487 1488
  for (int32_t i = 0; i < taskNum; ++i) {
    STaskStatus *taskStatus = taosArrayGet(rsp.taskStatus, i);
L
Liu Jicong 已提交
1489

D
dapan1121 已提交
1490 1491
    SSchJob *pJob = schAcquireJob(taskStatus->refId);
    if (NULL == pJob) {
L
Liu Jicong 已提交
1492 1493 1494
      qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId,
            taskStatus->queryId, taskStatus->taskId);
      // TODO DROP TASK FROM SERVER!!!!
D
dapan1121 已提交
1495 1496
      continue;
    }
L
Liu Jicong 已提交
1497

D
dapan1121 已提交
1498
    // TODO
L
Liu Jicong 已提交
1499 1500 1501

    SCH_JOB_DLOG("TID:0x%" PRIx64 " task status in server: %s", taskStatus->taskId,
                 jobTaskStatusStr(taskStatus->status));
D
dapan1121 已提交
1502 1503 1504 1505 1506 1507 1508

    schReleaseJob(taskStatus->refId);
  }

_return:

  tFreeSSchedulerHbRsp(&rsp);
D
dapan1121 已提交
1509
  taosMemoryFree(param);
D
dapan1121 已提交
1510 1511 1512 1513

  SCH_RET(code);
}

D
dapan1121 已提交
1514 1515 1516 1517
int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param;
  rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);

D
dapan1121 已提交
1518 1519
  qDebug("handle %p is broken", pMsg->handle);

D
dapan1121 已提交
1520 1521
  if (head->isHbParam) {
    SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param;
L
Liu Jicong 已提交
1522
    SSchTrans            trans = {.transInst = hbParam->transport, .transHandle = NULL};
D
dapan1121 已提交
1523 1524 1525 1526 1527 1528 1529 1530 1531 1532
    SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans));

    SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId));
  } else {
    SCH_ERR_RET(schHandleCallback(param, pMsg, TDMT_SCH_LINK_BROKEN, code));
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1533
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
D
dapan1121 已提交
1534
  switch (msgType) {
H
Haojun Liao 已提交
1535 1536 1537
    case TDMT_VND_CREATE_TABLE:
      *fp = schHandleCreateTableCallback;
      break;
X
Xiaoyu Wang 已提交
1538 1539 1540
    case TDMT_VND_DROP_TABLE:
      *fp = schHandleDropTableCallback;
      break;
X
Xiaoyu Wang 已提交
1541 1542 1543
    case TDMT_VND_ALTER_TABLE:
      *fp = schHandleAlterTableCallback;
      break;
L
Liu Jicong 已提交
1544
    case TDMT_VND_SUBMIT:
D
dapan1121 已提交
1545 1546
      *fp = schHandleSubmitCallback;
      break;
L
Liu Jicong 已提交
1547
    case TDMT_VND_QUERY:
D
dapan1121 已提交
1548 1549
      *fp = schHandleQueryCallback;
      break;
L
Liu Jicong 已提交
1550
    case TDMT_VND_RES_READY:
D
dapan1121 已提交
1551 1552
      *fp = schHandleReadyCallback;
      break;
D
dapan1121 已提交
1553 1554 1555
    case TDMT_VND_EXPLAIN:
      *fp = schHandleExplainCallback;
      break;
L
Liu Jicong 已提交
1556
    case TDMT_VND_FETCH:
D
dapan1121 已提交
1557 1558 1559 1560 1561
      *fp = schHandleFetchCallback;
      break;
    case TDMT_VND_DROP_TASK:
      *fp = schHandleDropCallback;
      break;
D
dapan1121 已提交
1562 1563 1564
    case TDMT_VND_QUERY_HEARTBEAT:
      *fp = schHandleHbCallback;
      break;
D
dapan1121 已提交
1565 1566 1567
    case TDMT_SCH_LINK_BROKEN:
      *fp = schHandleLinkBrokenCallback;
      break;
D
dapan1121 已提交
1568
    default:
D
dapan1121 已提交
1569
      qError("unknown msg type for callback, msgType:%d", msgType);
D
dapan1121 已提交
1570 1571 1572 1573 1574 1575
      SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1576
int32_t schGenerateTaskCallBackAHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, SMsgSendInfo **pMsgSendInfo) {
H
Hongze Cheng 已提交
1577
  int32_t       code = 0;
D
dapan1121 已提交
1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612
  SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (NULL == msgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));

  param->queryId = pJob->queryId;
  param->refId = pJob->refId;
  param->taskId = SCH_TASK_ID(pTask);
  param->transport = pJob->transport;

  msgSendInfo->param = param;
  msgSendInfo->fp = fp;

  *pMsgSendInfo = msgSendInfo;

  return TSDB_CODE_SUCCESS;

_return:

  taosMemoryFree(param);
  taosMemoryFree(msgSendInfo);

  SCH_RET(code);
}

D
dapan1121 已提交
1613
void schFreeRpcCtxVal(const void *arg) {
D
dapan1121 已提交
1614 1615 1616
  if (NULL == arg) {
    return;
  }
L
Liu Jicong 已提交
1617 1618

  SMsgSendInfo *pMsgSendInfo = (SMsgSendInfo *)arg;
D
dapan1121 已提交
1619 1620
  taosMemoryFreeClear(pMsgSendInfo->param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1621
}
D
dapan1121 已提交
1622

D
dapan1121 已提交
1623
int32_t schMakeTaskCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
D
dapan1121 已提交
1624
  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  param->queryId = pJob->queryId;
  param->refId = pJob->refId;
  param->taskId = SCH_TASK_ID(pTask);
  param->transport = pJob->transport;

  *pParam = param;

  return TSDB_CODE_SUCCESS;
}

int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
D
dapan1121 已提交
1641
  SSchHbCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  param->head.isHbParam = true;

  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);

  param->nodeEpId.nodeId = addr->nodeId;
  memcpy(&param->nodeEpId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
  param->transport = pJob->transport;

  *pParam = param;

  return TSDB_CODE_SUCCESS;
}

int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb) {
L
Liu Jicong 已提交
1661 1662 1663
  int32_t       code = 0;
  SMsgSendInfo *pMsgSendInfo = NULL;

D
dapan1121 已提交
1664
  pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675
  if (NULL == pMsgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  if (isHb) {
    SCH_ERR_JRET(schMakeHbCallbackParam(pJob, pTask, &pMsgSendInfo->param));
  } else {
    SCH_ERR_JRET(schMakeTaskCallbackParam(pJob, pTask, &pMsgSendInfo->param));
  }

L
Liu Jicong 已提交
1676
  int32_t              msgType = TDMT_SCH_LINK_BROKEN;
D
dapan1121 已提交
1677 1678
  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
L
Liu Jicong 已提交
1679

D
dapan1121 已提交
1680 1681 1682 1683
  pMsgSendInfo->fp = fp;

  brokenVal->msgType = msgType;
  brokenVal->val = pMsgSendInfo;
D
dapan1121 已提交
1684
  brokenVal->clone = schCloneSMsgSendInfo;
D
dapan1121 已提交
1685
  brokenVal->freeFunc = schFreeRpcCtxVal;
L
Liu Jicong 已提交
1686

D
dapan1121 已提交
1687 1688 1689 1690
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
1691 1692
  taosMemoryFreeClear(pMsgSendInfo->param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1693 1694 1695 1696

  SCH_RET(code);
}

D
dapan1121 已提交
1697
int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
H
Hongze Cheng 已提交
1698
  int32_t       code = 0;
D
dapan1121 已提交
1699 1700
  SMsgSendInfo *pReadyMsgSendInfo = NULL;
  SMsgSendInfo *pExplainMsgSendInfo = NULL;
D
dapan1121 已提交
1701 1702 1703 1704 1705 1706

  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pCtx->args) {
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1707

D
dapan1121 已提交
1708 1709
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_RES_READY, &pReadyMsgSendInfo));
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_EXPLAIN, &pExplainMsgSendInfo));
D
dapan1121 已提交
1710

H
Hongze Cheng 已提交
1711
  int32_t    msgType = TDMT_VND_RES_READY_RSP;
D
dapan1121 已提交
1712 1713 1714
  SRpcCtxVal ctxVal = {.val = pReadyMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
D
dapan1121 已提交
1715 1716 1717
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1718 1719
  msgType = TDMT_VND_EXPLAIN_RSP;
  ctxVal.val = pExplainMsgSendInfo;
D
dapan1121 已提交
1720 1721 1722 1723
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
1724

D
dapan1121 已提交
1725 1726
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, false));

D
dapan1121 已提交
1727 1728 1729 1730 1731
  return TSDB_CODE_SUCCESS;

_return:

  taosHashCleanup(pCtx->args);
H
Hongze Cheng 已提交
1732

D
dapan1121 已提交
1733 1734 1735 1736 1737 1738 1739 1740 1741
  if (pReadyMsgSendInfo) {
    taosMemoryFreeClear(pReadyMsgSendInfo->param);
    taosMemoryFreeClear(pReadyMsgSendInfo);
  }

  if (pExplainMsgSendInfo) {
    taosMemoryFreeClear(pExplainMsgSendInfo->param);
    taosMemoryFreeClear(pExplainMsgSendInfo);
  }
D
dapan1121 已提交
1742 1743 1744 1745 1746

  SCH_RET(code);
}

int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
L
Liu Jicong 已提交
1747
  int32_t              code = 0;
D
dapan1121 已提交
1748
  SSchHbCallbackParam *param = NULL;
X
Xiaoyu Wang 已提交
1749 1750
  SMsgSendInfo        *pMsgSendInfo = NULL;
  SQueryNodeAddr      *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
L
Liu Jicong 已提交
1751
  SQueryNodeEpId       epId = {0};
D
dapan1121 已提交
1752 1753 1754

  epId.nodeId = addr->nodeId;
  memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
D
dapan1121 已提交
1755 1756 1757 1758 1759 1760

  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pCtx->args) {
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1761

D
dapan1121 已提交
1762
  pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1763 1764 1765 1766 1767
  if (NULL == pMsgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1768
  param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1769
  if (NULL == param) {
D
dapan1121 已提交
1770
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1771 1772 1773
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

L
Liu Jicong 已提交
1774
  int32_t              msgType = TDMT_VND_QUERY_HEARTBEAT_RSP;
D
dapan1121 已提交
1775 1776 1777
  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_QUERY_HEARTBEAT, &fp));

D
dapan1121 已提交
1778
  param->nodeEpId = epId;
D
dapan1121 已提交
1779
  param->transport = pJob->transport;
L
Liu Jicong 已提交
1780

D
dapan1121 已提交
1781 1782 1783
  pMsgSendInfo->param = param;
  pMsgSendInfo->fp = fp;

D
dapan1121 已提交
1784
  SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
D
dapan1121 已提交
1785 1786 1787 1788 1789
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1790 1791
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, true));

D
dapan1121 已提交
1792 1793 1794 1795 1796
  return TSDB_CODE_SUCCESS;

_return:

  taosHashCleanup(pCtx->args);
D
dapan1121 已提交
1797 1798
  taosMemoryFreeClear(param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1799 1800 1801 1802

  SCH_RET(code);
}

D
dapan1121 已提交
1803
int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
L
Liu Jicong 已提交
1804
  int32_t     code = 0;
D
dapan1121 已提交
1805 1806 1807
  SSchHbTrans hb = {0};

  hb.trans.transInst = pJob->transport;
L
Liu Jicong 已提交
1808

D
dapan1121 已提交
1809 1810 1811 1812 1813
  SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx));

  code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans));
  if (code) {
    schFreeRpcCtx(&hb.rpcCtx);
L
Liu Jicong 已提交
1814

D
dapan1121 已提交
1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828
    if (HASH_NODE_EXIST(code)) {
      *exist = true;
      return TSDB_CODE_SUCCESS;
    }

    qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
    SCH_ERR_RET(code);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHeader **pDst) {
  if (pSrc->isHbParam) {
D
dapan1121 已提交
1829
    SSchHbCallbackParam *dst = taosMemoryMalloc(sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840
    if (NULL == dst) {
      qError("malloc SSchHbCallbackParam failed");
      SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    memcpy(dst, pSrc, sizeof(*dst));
    *pDst = (SSchCallbackParamHeader *)dst;

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1841
  SSchTaskCallbackParam *dst = taosMemoryMalloc(sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
1842 1843 1844 1845
  if (NULL == dst) {
    qError("malloc SSchTaskCallbackParam failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1846

D
dapan1121 已提交
1847 1848
  memcpy(dst, pSrc, sizeof(*dst));
  *pDst = (SSchCallbackParamHeader *)dst;
L
Liu Jicong 已提交
1849

D
dapan1121 已提交
1850 1851 1852
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1853 1854
int32_t schCloneSMsgSendInfo(void *src, void **dst) {
  SMsgSendInfo *pSrc = src;
L
Liu Jicong 已提交
1855
  int32_t       code = 0;
D
dapan1121 已提交
1856
  SMsgSendInfo *pDst = taosMemoryMalloc(sizeof(*pSrc));
D
dapan1121 已提交
1857
  if (NULL == pDst) {
D
dapan1121 已提交
1858 1859 1860 1861
    qError("malloc SMsgSendInfo for rpcCtx failed, len:%d", (int32_t)sizeof(*pSrc));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1862 1863
  memcpy(pDst, pSrc, sizeof(*pSrc));
  pDst->param = NULL;
D
dapan1121 已提交
1864

D
dapan1121 已提交
1865
  SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
D
dapan1121 已提交
1866

D
dapan1121 已提交
1867
  *dst = pDst;
D
dapan1121 已提交
1868

D
dapan1121 已提交
1869
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1870

D
dapan1121 已提交
1871 1872
_return:

D
dapan1121 已提交
1873
  taosMemoryFreeClear(pDst);
D
dapan1121 已提交
1874 1875 1876 1877 1878 1879 1880
  SCH_RET(code);
}

int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
  int32_t code = 0;
  memcpy(&pDst->brokenVal, &pSrc->brokenVal, sizeof(pSrc->brokenVal));
  pDst->brokenVal.val = NULL;
L
Liu Jicong 已提交
1881

D
dapan1121 已提交
1882
  SCH_ERR_RET(schCloneSMsgSendInfo(pSrc->brokenVal.val, &pDst->brokenVal.val));
D
dapan1121 已提交
1883 1884 1885 1886 1887 1888 1889 1890

  pDst->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pDst->args) {
    qError("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SRpcCtxVal dst = {0};
X
Xiaoyu Wang 已提交
1891
  void      *pIter = taosHashIterate(pSrc->args, NULL);
D
dapan1121 已提交
1892 1893
  while (pIter) {
    SRpcCtxVal *pVal = (SRpcCtxVal *)pIter;
X
Xiaoyu Wang 已提交
1894
    int32_t    *msgType = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
1895

D
dapan1121 已提交
1896 1897
    dst = *pVal;
    dst.val = NULL;
L
Liu Jicong 已提交
1898

D
dapan1121 已提交
1899
    SCH_ERR_JRET(schCloneSMsgSendInfo(pVal->val, &dst.val));
L
Liu Jicong 已提交
1900

D
dapan1121 已提交
1901
    if (taosHashPut(pDst->args, msgType, sizeof(*msgType), &dst, sizeof(dst))) {
D
dapan1121 已提交
1902
      qError("taosHashPut msg %d to rpcCtx failed", *msgType);
D
dapan1121 已提交
1903
      (*dst.freeFunc)(dst.val);
D
dapan1121 已提交
1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    pIter = taosHashIterate(pSrc->args, pIter);
  }

  return TSDB_CODE_SUCCESS;

_return:

  schFreeRpcCtx(pDst);
  SCH_RET(code);
}

L
Liu Jicong 已提交
1918 1919
int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet *epSet, int32_t msgType, void *msg,
                        uint32_t msgSize, bool persistHandle, SRpcCtx *ctx) {
D
dapan1121 已提交
1920
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
1921 1922 1923

  SSchTrans *trans = (SSchTrans *)transport;

D
dapan1121 已提交
1924 1925
  SMsgSendInfo *pMsgSendInfo = NULL;
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, msgType, &pMsgSendInfo));
D
dapan1121 已提交
1926 1927 1928

  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
L
Liu Jicong 已提交
1929
  pMsgSendInfo->msgInfo.handle = trans->transHandle;
D
dapan1121 已提交
1930
  pMsgSendInfo->msgType = msgType;
D
dapan1121 已提交
1931

L
Liu Jicong 已提交
1932 1933 1934 1935 1936
  qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "instance:%p, handle:%p", TMSG_INFO(msgType),
         ntohl(((SMsgHead *)msg)->vgId), epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, pJob->refId,
         trans->transInst, trans->transHandle);

  int64_t transporterId = 0;
D
dapan1121 已提交
1937
  code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
D
dapan1121 已提交
1938 1939 1940
  if (code) {
    SCH_ERR_JRET(code);
  }
D
dapan1121 已提交
1941

D
dapan1121 已提交
1942
  SCH_TASK_DLOG("req msg sent, refId:%" PRIx64 ", type:%d, %s", pJob->refId, msgType, TMSG_INFO(msgType));
D
dapan1121 已提交
1943 1944 1945
  return TSDB_CODE_SUCCESS;

_return:
L
Liu Jicong 已提交
1946

D
dapan1121 已提交
1947 1948 1949 1950
  if (pMsgSendInfo) {
    taosMemoryFreeClear(pMsgSendInfo->param);
    taosMemoryFreeClear(pMsgSendInfo);
  }
H
Hongze Cheng 已提交
1951

D
dapan1121 已提交
1952 1953 1954
  SCH_RET(code);
}

D
dapan1121 已提交
1955 1956
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
  SSchedulerHbReq req = {0};
L
Liu Jicong 已提交
1957 1958 1959 1960
  int32_t         code = 0;
  SRpcCtx         rpcCtx = {0};
  SSchTrans       trans = {0};
  int32_t         msgType = TDMT_VND_QUERY_HEARTBEAT;
D
dapan1121 已提交
1961

L
Liu Jicong 已提交
1962
  req.header.vgId = nodeEpId->nodeId;
D
dapan1121 已提交
1963 1964 1965 1966 1967
  req.sId = schMgmt.sId;
  memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));

  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
L
Liu Jicong 已提交
1968 1969
    qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
           nodeEpId->ep.port);
D
dapan1121 已提交
1970 1971 1972 1973 1974 1975 1976
    SCH_ERR_RET(code);
  }

  SCH_LOCK(SCH_WRITE, &hb->lock);
  code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx);
  memcpy(&trans, &hb->trans, sizeof(trans));
  SCH_UNLOCK(SCH_WRITE, &hb->lock);
L
Liu Jicong 已提交
1977

D
dapan1121 已提交
1978
  SCH_ERR_RET(code);
L
Liu Jicong 已提交
1979

D
dapan1121 已提交
1980 1981 1982 1983 1984
  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
  if (msgSize < 0) {
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
1985
  void *msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1986 1987 1988 1989
  if (NULL == msg) {
    qError("calloc hb req %d failed", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1990

D
dapan1121 已提交
1991 1992 1993 1994 1995
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1996
  SMsgSendInfo *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1997 1998 1999 2000 2001
  if (NULL == pMsgSendInfo) {
    qError("calloc SMsgSendInfo failed");
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
2002
  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018
  if (NULL == param) {
    qError("calloc SSchTaskCallbackParam failed");
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));

  param->transport = trans.transInst;

  pMsgSendInfo->param = param;
  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
  pMsgSendInfo->msgInfo.handle = trans.transHandle;
  pMsgSendInfo->msgType = msgType;
  pMsgSendInfo->fp = fp;
L
Liu Jicong 已提交
2019 2020 2021

  int64_t transporterId = 0;
  SEpSet  epSet = {.inUse = 0, .numOfEps = 1};
D
dapan1121 已提交
2022
  memcpy(&epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
D
dapan1121 已提交
2023

L
Liu Jicong 已提交
2024 2025 2026
  qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle,
         nodeEpId->ep.fqdn, nodeEpId->ep.port);

D
dapan1121 已提交
2027 2028
  code = asyncSendMsgToServerExt(trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx);
  if (code) {
L
Liu Jicong 已提交
2029 2030
    qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s", trans.transInst,
           trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code));
D
dapan1121 已提交
2031 2032 2033
    SCH_ERR_JRET(code);
  }

D
dapan1121 已提交
2034
  qDebug("hb msg sent");
D
dapan1121 已提交
2035 2036 2037 2038
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
2039 2040 2041
  taosMemoryFreeClear(msg);
  taosMemoryFreeClear(param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
2042 2043 2044 2045
  schFreeRpcCtx(&rpcCtx);
  SCH_RET(code);
}

D
dapan1121 已提交
2046
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
D
dapan1121 已提交
2047
  uint32_t msgSize = 0;
X
Xiaoyu Wang 已提交
2048
  void    *msg = NULL;
L
Liu Jicong 已提交
2049 2050 2051 2052 2053
  int32_t  code = 0;
  bool     isCandidateAddr = false;
  bool     persistHandle = false;
  SRpcCtx  rpcCtx = {0};

D
dapan1121 已提交
2054
  if (NULL == addr) {
D
dapan1121 已提交
2055
    addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
D
dapan1121 已提交
2056 2057 2058
    isCandidateAddr = true;
  }

L
Liu Jicong 已提交
2059
  SEpSet epSet = addr->epSet;
H
Haojun Liao 已提交
2060

D
dapan1121 已提交
2061
  switch (msgType) {
H
Haojun Liao 已提交
2062
    case TDMT_VND_CREATE_TABLE:
X
Xiaoyu Wang 已提交
2063
    case TDMT_VND_DROP_TABLE:
X
Xiaoyu Wang 已提交
2064
    case TDMT_VND_ALTER_TABLE:
D
dapan1121 已提交
2065
    case TDMT_VND_SUBMIT: {
D
dapan1121 已提交
2066
      msgSize = pTask->msgLen;
wafwerar's avatar
wafwerar 已提交
2067
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2068 2069 2070 2071 2072 2073
      if (NULL == msg) {
        SCH_TASK_ELOG("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      memcpy(msg, pTask->msg, msgSize);
D
dapan1121 已提交
2074 2075
      break;
    }
2076

D
dapan1121 已提交
2077
    case TDMT_VND_QUERY: {
D
dapan1121 已提交
2078
      SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
L
Liu Jicong 已提交
2079

2080 2081
      uint32_t len = strlen(pJob->sql);
      msgSize = sizeof(SSubQueryMsg) + pTask->msgLen + len;
wafwerar's avatar
wafwerar 已提交
2082
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2083
      if (NULL == msg) {
D
dapan 已提交
2084
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2085 2086 2087 2088
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      SSubQueryMsg *pMsg = msg;
D
dapan 已提交
2089
      pMsg->header.vgId = htonl(addr->nodeId);
L
Liu Jicong 已提交
2090 2091 2092 2093 2094
      pMsg->sId = htobe64(schMgmt.sId);
      pMsg->queryId = htobe64(pJob->queryId);
      pMsg->taskId = htobe64(pTask->taskId);
      pMsg->refId = htobe64(pJob->refId);
      pMsg->taskType = TASK_TYPE_TEMP;
D
dapan1121 已提交
2095
      pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob);
L
Liu Jicong 已提交
2096 2097
      pMsg->phyLen = htonl(pTask->msgLen);
      pMsg->sqlLen = htonl(len);
2098 2099 2100

      memcpy(pMsg->msg, pJob->sql, len);
      memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
D
dapan1121 已提交
2101 2102

      persistHandle = true;
D
dapan1121 已提交
2103
      break;
2104 2105
    }

D
dapan1121 已提交
2106
    case TDMT_VND_RES_READY: {
S
Shengliang Guan 已提交
2107
      msgSize = sizeof(SResReadyReq);
wafwerar's avatar
wafwerar 已提交
2108
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2109
      if (NULL == msg) {
D
dapan 已提交
2110
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2111 2112 2113
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

S
Shengliang Guan 已提交
2114
      SResReadyReq *pMsg = msg;
L
Liu Jicong 已提交
2115 2116 2117 2118

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
2119
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
2120
      pMsg->taskId = htobe64(pTask->taskId);
D
dapan1121 已提交
2121 2122 2123
      break;
    }
    case TDMT_VND_FETCH: {
S
Shengliang Guan 已提交
2124
      msgSize = sizeof(SResFetchReq);
wafwerar's avatar
wafwerar 已提交
2125
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2126
      if (NULL == msg) {
D
dapan 已提交
2127
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2128 2129
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
2130

S
Shengliang Guan 已提交
2131
      SResFetchReq *pMsg = msg;
L
Liu Jicong 已提交
2132 2133 2134 2135

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
2136
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
2137
      pMsg->taskId = htobe64(pTask->taskId);
D
dapan1121 已提交
2138

D
dapan1121 已提交
2139 2140
      break;
    }
L
Liu Jicong 已提交
2141
    case TDMT_VND_DROP_TASK: {
S
Shengliang Guan 已提交
2142
      msgSize = sizeof(STaskDropReq);
wafwerar's avatar
wafwerar 已提交
2143
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2144
      if (NULL == msg) {
D
dapan 已提交
2145
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2146 2147
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
2148

S
Shengliang Guan 已提交
2149
      STaskDropReq *pMsg = msg;
L
Liu Jicong 已提交
2150 2151 2152 2153

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
2154
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
2155 2156
      pMsg->taskId = htobe64(pTask->taskId);
      pMsg->refId = htobe64(pJob->refId);
D
dapan1121 已提交
2157 2158 2159
      break;
    }
    case TDMT_VND_QUERY_HEARTBEAT: {
D
dapan1121 已提交
2160
      SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &rpcCtx));
L
Liu Jicong 已提交
2161

D
dapan1121 已提交
2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172
      SSchedulerHbReq req = {0};
      req.sId = schMgmt.sId;
      req.header.vgId = addr->nodeId;
      req.epId.nodeId = addr->nodeId;
      memcpy(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));

      msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
      if (msgSize < 0) {
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
wafwerar's avatar
wafwerar 已提交
2173
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2174 2175 2176 2177 2178 2179 2180 2181
      if (NULL == msg) {
        SCH_JOB_ELOG("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
      if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
2182 2183

      persistHandle = true;
D
dapan1121 已提交
2184 2185 2186
      break;
    }
    default:
D
dapan1121 已提交
2187
      SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType);
D
dapan1121 已提交
2188 2189 2190 2191
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      break;
  }

D
dapan1121 已提交
2192
  SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType);
D
dapan1121 已提交
2193

D
dapan1121 已提交
2194
  SSchTrans trans = {.transInst = pJob->transport, .transHandle = SCH_GET_TASK_HANDLE(pTask)};
L
Liu Jicong 已提交
2195 2196
  SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle,
                               (rpcCtx.args ? &rpcCtx : NULL)));
D
dapan1121 已提交
2197

D
dapan1121 已提交
2198 2199
  if (msgType == TDMT_VND_QUERY) {
    SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.transHandle));
D
dapan1121 已提交
2200
  }
L
Liu Jicong 已提交
2201

D
dapan1121 已提交
2202 2203 2204 2205
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
2206
  SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan1121 已提交
2207
  schFreeRpcCtx(&rpcCtx);
L
Liu Jicong 已提交
2208

wafwerar's avatar
wafwerar 已提交
2209
  taosMemoryFreeClear(msg);
D
dapan1121 已提交
2210 2211
  SCH_RET(code);
}
D
dapan1121 已提交
2212

D
dapan1121 已提交
2213 2214
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
L
Liu Jicong 已提交
2215
  SQueryNodeEpId  epId = {0};
D
dapan1121 已提交
2216 2217 2218

  epId.nodeId = addr->nodeId;
  memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
L
Liu Jicong 已提交
2219

D
dapan1121 已提交
2220
#if 1
D
dapan1121 已提交
2221 2222
  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
D
dapan1121 已提交
2223
    bool exist = false;
D
dapan1121 已提交
2224
    SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist));
D
dapan1121 已提交
2225
    if (!exist) {
D
dapan1121 已提交
2226
      SCH_ERR_RET(schBuildAndSendHbMsg(&epId));
D
dapan1121 已提交
2227
    }
D
dapan1121 已提交
2228
  }
D
dapan1121 已提交
2229
#endif
D
dapan1121 已提交
2230 2231 2232

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
2233

D
dapan1121 已提交
2234
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
2235
  int8_t  status = 0;
D
dapan1121 已提交
2236
  int32_t code = 0;
D
dapan1121 已提交
2237 2238

  atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
L
Liu Jicong 已提交
2239

D
dapan1121 已提交
2240
  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
2241
    SCH_TASK_DLOG("no need to launch task cause of job status, job status:%s", jobTaskStatusStr(status));
L
Liu Jicong 已提交
2242

D
dapan1121 已提交
2243
    SCH_RET(atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
2244
  }
D
dapan1121 已提交
2245 2246 2247 2248 2249 2250

  // NOTE: race condition: the task should be put into the hash table before send msg to server
  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) {
    SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING);
  }
L
Liu Jicong 已提交
2251

D
dapan1121 已提交
2252
  SSubplan *plan = pTask->plan;
D
dapan1121 已提交
2253

L
Liu Jicong 已提交
2254
  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
D
dapan1121 已提交
2255
    code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen);
D
dapan1121 已提交
2256
    if (TSDB_CODE_SUCCESS != code) {
L
Liu Jicong 已提交
2257 2258
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
                    pTask->msgLen);
D
dapan1121 已提交
2259
      SCH_ERR_RET(code);
2260
    } else {
D
dapan1121 已提交
2261
      SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg);
D
dapan1121 已提交
2262
    }
D
dapan1121 已提交
2263
  }
L
Liu Jicong 已提交
2264

D
dapan1121 已提交
2265
  SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
D
dapan1121 已提交
2266

D
dapan1121 已提交
2267 2268 2269
  if (SCH_IS_QUERY_JOB(pJob)) {
    SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
  }
L
Liu Jicong 已提交
2270

D
dapan1121 已提交
2271
  SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
L
Liu Jicong 已提交
2272

D
dapan1121 已提交
2273 2274 2275 2276 2277
  return TSDB_CODE_SUCCESS;
}

// Note: no more error processing, handled in function internal
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
2278
  bool    enough = false;
D
dapan1121 已提交
2279 2280
  int32_t code = 0;

D
dapan1121 已提交
2281 2282
  SCH_SET_TASK_HANDLE(pTask, NULL);

D
dapan1121 已提交
2283 2284 2285 2286 2287 2288 2289 2290 2291 2292
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));

    if (enough) {
      SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
    }
  } else {
    SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
  }

D
dapan1121 已提交
2293
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2294 2295

_return:
D
dapan1121 已提交
2296

D
dapan1121 已提交
2297
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
2298
}
D
dapan1121 已提交
2299

D
dapan1121 已提交
2300
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
D
dapan 已提交
2301
  for (int32_t i = 0; i < level->taskNum; ++i) {
D
dapan1121 已提交
2302
    SSchTask *pTask = taosArrayGet(level->subTasks, i);
D
dapan1121 已提交
2303

D
dapan1121 已提交
2304
    SCH_ERR_RET(schLaunchTask(pJob, pTask));
D
dapan1121 已提交
2305
  }
D
dapan1121 已提交
2306 2307 2308 2309 2310 2311

  return TSDB_CODE_SUCCESS;
}

int32_t schLaunchJob(SSchJob *pJob) {
  SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
L
Liu Jicong 已提交
2312

D
dapan1121 已提交
2313 2314 2315 2316 2317 2318
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_EXECUTING));

  SCH_ERR_RET(schCheckJobNeedFlowCtrl(pJob, level));

  SCH_ERR_RET(schLaunchLevelTasks(pJob, level));

D
dapan1121 已提交
2319
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2320 2321
}

D
dapan1121 已提交
2322
void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
2323
  if (NULL == pTask->execNodes) {
D
dapan1121 已提交
2324
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
2325 2326
    return;
  }
H
Haojun Liao 已提交
2327

D
dapan1121 已提交
2328
  int32_t size = (int32_t)taosArrayGetSize(pTask->execNodes);
L
Liu Jicong 已提交
2329

D
dapan1121 已提交
2330
  if (size <= 0) {
D
dapan1121 已提交
2331
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
2332 2333
    return;
  }
H
Haojun Liao 已提交
2334

D
dapan1121 已提交
2335
  SSchNodeInfo *nodeInfo = NULL;
D
dapan1121 已提交
2336
  for (int32_t i = 0; i < size; ++i) {
D
dapan1121 已提交
2337 2338
    nodeInfo = (SSchNodeInfo *)taosArrayGet(pTask->execNodes, i);
    SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
D
dapan1121 已提交
2339

D
dapan1121 已提交
2340
    schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK);
D
dapan1121 已提交
2341
  }
D
dapan1121 已提交
2342 2343

  SCH_TASK_DLOG("task has %d exec address", size);
D
dapan1121 已提交
2344 2345 2346
}

void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
D
dapan1121 已提交
2347 2348 2349 2350
  if (!SCH_IS_NEED_DROP_JOB(pJob)) {
    return;
  }

D
dapan1121 已提交
2351
  void *pIter = taosHashIterate(list, NULL);
D
dapan1121 已提交
2352
  while (pIter) {
D
dapan1121 已提交
2353
    SSchTask *pTask = *(SSchTask **)pIter;
H
Haojun Liao 已提交
2354

D
dapan1121 已提交
2355
    schDropTaskOnExecutedNode(pJob, pTask);
L
Liu Jicong 已提交
2356

D
dapan1121 已提交
2357
    pIter = taosHashIterate(list, pIter);
L
Liu Jicong 已提交
2358
  }
D
dapan1121 已提交
2359
}
H
Haojun Liao 已提交
2360

D
dapan1121 已提交
2361 2362 2363 2364
void schDropJobAllTasks(SSchJob *pJob) {
  schDropTaskInHashList(pJob, pJob->execTasks);
  schDropTaskInHashList(pJob, pJob->succTasks);
  schDropTaskInHashList(pJob, pJob->failTasks);
D
dapan1121 已提交
2365
}
2366

D
dapan1121 已提交
2367
int32_t schCancelJob(SSchJob *pJob) {
L
Liu Jicong 已提交
2368
  // TODO
2369
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
2370
  // TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
D
dapan1121 已提交
2371 2372
}

D
dapan1121 已提交
2373
void schCloseJobRef(void) {
2374
  if (!atomic_load_8((int8_t *)&schMgmt.exit)) {
D
dapan1121 已提交
2375 2376
    return;
  }
2377

D
dapan1121 已提交
2378 2379 2380 2381 2382 2383 2384 2385
  SCH_LOCK(SCH_WRITE, &schMgmt.lock);
  if (atomic_load_32(&schMgmt.jobNum) <= 0 && schMgmt.jobRef >= 0) {
    taosCloseRef(schMgmt.jobRef);
    schMgmt.jobRef = -1;
  }
  SCH_UNLOCK(SCH_WRITE, &schMgmt.lock);
}

D
dapan1121 已提交
2386 2387 2388 2389 2390 2391 2392
void schFreeJobImpl(void *job) {
  if (NULL == job) {
    return;
  }

  SSchJob *pJob = job;
  uint64_t queryId = pJob->queryId;
L
Liu Jicong 已提交
2393
  int64_t  refId = pJob->refId;
D
dapan1121 已提交
2394 2395 2396 2397 2398 2399 2400

  if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
    schCancelJob(pJob);
  }

  schDropJobAllTasks(pJob);

L
Liu Jicong 已提交
2401 2402
  pJob->subPlans = NULL;  // it is a reference to pDag->pSubplans

D
dapan1121 已提交
2403
  int32_t numOfLevels = taosArrayGetSize(pJob->levels);
L
Liu Jicong 已提交
2404
  for (int32_t i = 0; i < numOfLevels; ++i) {
D
dapan1121 已提交
2405 2406
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);

D
dapan1121 已提交
2407
    schFreeFlowCtrl(pLevel);
L
Liu Jicong 已提交
2408

D
dapan1121 已提交
2409
    int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
L
Liu Jicong 已提交
2410 2411
    for (int32_t j = 0; j < numOfTasks; ++j) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
D
dapan1121 已提交
2412 2413 2414 2415 2416
      schFreeTask(pTask);
    }

    taosArrayDestroy(pLevel->subTasks);
  }
L
Liu Jicong 已提交
2417

D
dapan1121 已提交
2418 2419 2420
  taosHashCleanup(pJob->execTasks);
  taosHashCleanup(pJob->failTasks);
  taosHashCleanup(pJob->succTasks);
L
Liu Jicong 已提交
2421

D
dapan1121 已提交
2422 2423
  taosArrayDestroy(pJob->levels);
  taosArrayDestroy(pJob->nodeList);
L
Liu Jicong 已提交
2424

D
dapan1121 已提交
2425 2426
  qExplainFreeCtx(pJob->explainCtx);

wafwerar's avatar
wafwerar 已提交
2427 2428
  taosMemoryFreeClear(pJob->resData);
  taosMemoryFreeClear(pJob);
D
dapan1121 已提交
2429

L
Liu Jicong 已提交
2430
  qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob);
D
dapan1121 已提交
2431 2432 2433 2434

  atomic_sub_fetch_32(&schMgmt.jobNum, 1);

  schCloseJobRef();
D
dapan1121 已提交
2435 2436
}

L
Liu Jicong 已提交
2437
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
D
dapan1121 已提交
2438
                              int64_t startTs, bool syncSchedule) {
L
Liu Jicong 已提交
2439
  qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
H
Haojun Liao 已提交
2440

D
dapan1121 已提交
2441
  if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) {
L
Liu Jicong 已提交
2442
    qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pDag->queryId);
D
dapan1121 已提交
2443 2444
  }

L
Liu Jicong 已提交
2445
  int32_t  code = 0;
D
dapan1121 已提交
2446
  SSchJob *pJob = NULL;
D
dapan1121 已提交
2447
  SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, syncSchedule));
D
dapan1121 已提交
2448

D
dapan1121 已提交
2449
  SCH_ERR_JRET(schLaunchJob(pJob));
2450

D
dapan1121 已提交
2451
  *job = pJob->refId;
L
Liu Jicong 已提交
2452

D
dapan 已提交
2453
  if (syncSchedule) {
D
dapan1121 已提交
2454
    SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2455
    tsem_wait(&pJob->rspSem);
D
dapan1121 已提交
2456 2457
  }

D
dapan1121 已提交
2458
  SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2459

D
dapan1121 已提交
2460
  schReleaseJob(pJob->refId);
L
Liu Jicong 已提交
2461

D
dapan1121 已提交
2462
  return TSDB_CODE_SUCCESS;
2463

D
dapan1121 已提交
2464
_return:
D
dapan1121 已提交
2465

D
dapan1121 已提交
2466
  schFreeJobImpl(pJob);
D
dapan1121 已提交
2467
  SCH_RET(code);
2468
}
D
dapan1121 已提交
2469

D
dapan1121 已提交
2470
int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
H
Hongze Cheng 已提交
2471
                             bool syncSchedule) {
D
dapan1121 已提交
2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482
  qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);

  int32_t  code = 0;
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
    qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->sql = sql;
  pJob->attr.queryJob = true;
D
dapan1121 已提交
2483
  pJob->attr.explainMode = pDag->explainInfo.mode;
D
dapan1121 已提交
2484 2485
  pJob->queryId = pDag->queryId;
  pJob->subPlans = pDag->pSubplans;
D
dapan1121 已提交
2486

D
dapan1121 已提交
2487
  SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData));
D
dapan1121 已提交
2488

D
dapan1121 已提交
2489 2490
  pJob->resType = SCH_RES_TYPE_FETCH;

D
dapan1121 已提交
2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519
  int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
  if (refId < 0) {
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
    SCH_ERR_JRET(terrno);
  }

  if (NULL == schAcquireJob(refId)) {
    SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
    SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  pJob->refId = refId;

  SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);

  pJob->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
  *job = pJob->refId;
  SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));

  schReleaseJob(pJob->refId);

  return TSDB_CODE_SUCCESS;

_return:

  schFreeJobImpl(pJob);
  SCH_RET(code);
}

D
dapan1121 已提交
2520
int32_t schedulerInit(SSchedulerCfg *cfg) {
D
dapan1121 已提交
2521
  if (schMgmt.jobRef >= 0) {
D
dapan1121 已提交
2522 2523 2524 2525 2526 2527
    qError("scheduler already initialized");
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  if (cfg) {
    schMgmt.cfg = *cfg;
L
Liu Jicong 已提交
2528

D
dapan1121 已提交
2529
    if (schMgmt.cfg.maxJobNum == 0) {
D
dapan1121 已提交
2530
      schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
D
dapan1121 已提交
2531
    }
D
dapan1121 已提交
2532 2533 2534
    if (schMgmt.cfg.maxNodeTableNum <= 0) {
      schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
    }
D
dapan1121 已提交
2535
  } else {
D
dapan1121 已提交
2536 2537
    schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
    schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
D
dapan1121 已提交
2538
  }
L
Liu Jicong 已提交
2539

D
dapan1121 已提交
2540 2541
  schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl);
  if (schMgmt.jobRef < 0) {
D
dapan1121 已提交
2542 2543 2544 2545 2546 2547 2548
    qError("init schduler jobRef failed, num:%u", schMgmt.cfg.maxJobNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  schMgmt.hbConnections = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
  if (NULL == schMgmt.hbConnections) {
    qError("taosHashInit hb connections failed");
D
dapan1121 已提交
2549 2550 2551
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
2552
  if (taosGetSystemUUID((char *)&schMgmt.sId, sizeof(schMgmt.sId))) {
D
dapan1121 已提交
2553 2554 2555 2556
    qError("generate schdulerId failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR);
  }

L
Liu Jicong 已提交
2557 2558
  qInfo("scheduler %" PRIx64 " initizlized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum);

D
dapan1121 已提交
2559 2560 2561
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2562
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
D
dapan1121 已提交
2563
                         int64_t startTs, SQueryResult *pRes) {
H
Haojun Liao 已提交
2564
  if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
D
dapan1121 已提交
2565 2566 2567
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
2568
  if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
D
dapan1121 已提交
2569
    SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true));
D
dapan1121 已提交
2570
  } else {
D
dapan1121 已提交
2571
    SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true));
D
dapan1121 已提交
2572
  }
D
dapan1121 已提交
2573

D
dapan1121 已提交
2574
  SSchJob *job = schAcquireJob(*pJob);
D
dapan1121 已提交
2575

D
dapan1121 已提交
2576 2577
  pRes->code = atomic_load_32(&job->errCode);
  pRes->numOfRows = job->resNumOfRows;
D
dapan1121 已提交
2578
  if (SCH_RES_TYPE_QUERY == job->resType) {
D
dapan 已提交
2579 2580 2581
    pRes->res = job->resData;
    job->resData = NULL;
  }
L
Liu Jicong 已提交
2582

D
dapan1121 已提交
2583
  schReleaseJob(*pJob);
L
Liu Jicong 已提交
2584

D
dapan1121 已提交
2585 2586 2587
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2588
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, const char *sql, int64_t *pJob) {
2589
  if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
D
dapan1121 已提交
2590 2591 2592
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
2593 2594 2595
  if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
    SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false));
  } else {
D
dapan1121 已提交
2596
    SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false));
D
dapan1121 已提交
2597
  }
L
Liu Jicong 已提交
2598

D
dapan1121 已提交
2599
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2600 2601
}

L
Liu Jicong 已提交
2602
#if 0
X
Xiaoyu Wang 已提交
2603
int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks) {
X
Xiaoyu Wang 已提交
2604
  if (NULL == pDag || pDag->numOfSubplans <= 0 || LIST_LENGTH(pDag->pSubplans) == 0) {
D
dapan1121 已提交
2605 2606 2607
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

X
Xiaoyu Wang 已提交
2608
  int32_t levelNum = LIST_LENGTH(pDag->pSubplans);
D
dapan1121 已提交
2609 2610 2611 2612 2613
  if (1 != levelNum) {
    qError("invalid level num: %d", levelNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

X
Xiaoyu Wang 已提交
2614 2615
  SNodeListNode *plans = (SNodeListNode*)nodesListGetNode(pDag->pSubplans, 0);
  int32_t taskNum = LIST_LENGTH(plans->pNodeList);
D
dapan1121 已提交
2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632
  if (taskNum <= 0) {
    qError("invalid task num: %d", taskNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SArray *info = taosArrayInit(taskNum, sizeof(STaskInfo));
  if (NULL == info) {
    qError("taosArrayInit %d taskInfo failed", taskNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  STaskInfo tInfo = {0};
  char *msg = NULL;
  int32_t msgLen = 0;
  int32_t code = 0;
  
  for (int32_t i = 0; i < taskNum; ++i) {
X
Xiaoyu Wang 已提交
2633
    SSubplan *plan = (SSubplan*)nodesListGetNode(plans->pNodeList, i);
D
dapan1121 已提交
2634 2635 2636
    tInfo.addr = plan->execNode;

    code = qSubPlanToString(plan, &msg, &msgLen);
D
dapan1121 已提交
2637
    if (TSDB_CODE_SUCCESS != code) {
D
dapan1121 已提交
2638 2639 2640 2641 2642 2643 2644 2645 2646 2647
      qError("subplanToString error, code:%x, msg:%p, len:%d", code, msg, msgLen);
      SCH_ERR_JRET(code);
    }

    int32_t msgSize = sizeof(SSubQueryMsg) + msgLen;
    if (NULL == msg) {
      qError("calloc %d failed", msgSize);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
    
wafwerar's avatar
wafwerar 已提交
2648
    SSubQueryMsg* pMsg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2649
    
L
Liu Jicong 已提交
2650
    pMsg->header.vgId = tInfo.addr.nodeId;
D
dapan1121 已提交
2651
    
2652 2653 2654
    pMsg->sId      = schMgmt.sId;
    pMsg->queryId  = plan->id.queryId;
    pMsg->taskId   = schGenUUID();
D
dapan1121 已提交
2655
    pMsg->taskType = TASK_TYPE_PERSISTENT;
2656 2657
    pMsg->phyLen   = msgLen;
    pMsg->sqlLen   = 0;
L
Liu Jicong 已提交
2658
    memcpy(pMsg->msg, msg, msgLen);
L
fix tq  
Liu Jicong 已提交
2659
    /*memcpy(pMsg->msg, ((SSubQueryMsg*)msg)->msg, msgLen);*/
D
dapan1121 已提交
2660 2661 2662 2663 2664

    tInfo.msg = pMsg;

    if (NULL == taosArrayPush(info, &tInfo)) {
      qError("taosArrayPush failed, idx:%d", i);
wafwerar's avatar
wafwerar 已提交
2665
      taosMemoryFree(msg);
D
dapan1121 已提交
2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

  *pTasks = info;
  info = NULL;
  
_return:
  schedulerFreeTaskList(info);
  SCH_RET(code);
}

D
dapan1121 已提交
2678 2679 2680 2681 2682 2683
int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) {
  if (NULL == src || NULL == dst || copyNum <= 0) {
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
H
Haojun Liao 已提交
2684

D
dapan1121 已提交
2685 2686 2687 2688 2689 2690
  *dst = taosArrayInit(copyNum, sizeof(STaskInfo));
  if (NULL == *dst) {
    qError("taosArrayInit %d taskInfo failed", copyNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

2691
  int32_t msgSize = src->msg->phyLen + sizeof(*src->msg);
D
dapan1121 已提交
2692 2693 2694
  STaskInfo info = {0};

  info.addr = src->addr;
H
Haojun Liao 已提交
2695

D
dapan1121 已提交
2696
  for (int32_t i = 0; i < copyNum; ++i) {
wafwerar's avatar
wafwerar 已提交
2697
    info.msg = taosMemoryMalloc(msgSize);
D
dapan1121 已提交
2698 2699 2700 2701 2702 2703 2704 2705
    if (NULL == info.msg) {
      qError("malloc %d failed", msgSize);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    memcpy(info.msg, src->msg, msgSize);

    info.msg->taskId = schGenUUID();
H
Haojun Liao 已提交
2706

D
dapan1121 已提交
2707 2708
    if (NULL == taosArrayPush(*dst, &info)) {
      qError("taosArrayPush failed, idx:%d", i);
wafwerar's avatar
wafwerar 已提交
2709
      taosMemoryFree(info.msg);
D
dapan1121 已提交
2710 2711 2712 2713 2714 2715 2716 2717 2718 2719
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

  return TSDB_CODE_SUCCESS;

_return:

  schedulerFreeTaskList(*dst);
  *dst = NULL;
H
Haojun Liao 已提交
2720

D
dapan1121 已提交
2721 2722
  SCH_RET(code);
}
L
Liu Jicong 已提交
2723
#endif
D
dapan1121 已提交
2724

L
Liu Jicong 已提交
2725
int32_t schedulerFetchRows(int64_t job, void **pData) {
D
dapan1121 已提交
2726
  if (NULL == pData) {
D
dapan1121 已提交
2727
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan 已提交
2728 2729
  }

L
Liu Jicong 已提交
2730
  int32_t  code = 0;
D
dapan1121 已提交
2731
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2732 2733 2734 2735
  if (NULL == pJob) {
    qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
D
dapan1121 已提交
2736

D
dapan1121 已提交
2737 2738
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (status == JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
2739
    SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2740
    schReleaseJob(job);
D
dapan1121 已提交
2741
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
2742 2743
  }

D
dapan1121 已提交
2744
  if (!SCH_JOB_NEED_FETCH(pJob)) {
D
dapan1121 已提交
2745
    SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2746
    schReleaseJob(job);
D
dapan1121 已提交
2747
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
2748 2749
  }

D
dapan1121 已提交
2750 2751
  if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) {
    SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch));
D
dapan1121 已提交
2752
    schReleaseJob(job);
D
dapan1121 已提交
2753
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan 已提交
2754 2755
  }

D
dapan1121 已提交
2756
  if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
D
dapan1121 已提交
2757
    SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2758 2759
    SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
  } else if (status == JOB_TASK_STATUS_SUCCEED) {
D
dapan1121 已提交
2760
    SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2761 2762
    goto _return;
  } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
D
dapan1121 已提交
2763
    if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
D
dapan1121 已提交
2764 2765
      SCH_ERR_JRET(schFetchFromRemote(pJob));
      tsem_wait(&pJob->rspSem);
H
Hongze Cheng 已提交
2766
    }
D
dapan1121 已提交
2767 2768 2769
  } else {
    SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
    SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan 已提交
2770 2771
  }

D
dapan1121 已提交
2772
  status = SCH_GET_JOB_STATUS(pJob);
D
dapan 已提交
2773

D
dapan1121 已提交
2774
  if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
D
dapan1121 已提交
2775
    SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2776
    SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
D
dapan 已提交
2777
  }
L
Liu Jicong 已提交
2778

D
dapan1121 已提交
2779
  if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) {
D
dapan1121 已提交
2780
    SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED));
D
dapan 已提交
2781 2782
  }

D
dapan1121 已提交
2783
  while (true) {
D
dapan1121 已提交
2784 2785
    *pData = atomic_load_ptr(&pJob->resData);
    if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) {
D
dapan1121 已提交
2786 2787 2788 2789 2790
      continue;
    }

    break;
  }
D
dapan 已提交
2791

D
dapan1121 已提交
2792
  if (NULL == *pData) {
wafwerar's avatar
wafwerar 已提交
2793
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
D
dapan1121 已提交
2794 2795 2796 2797 2798
    if (rsp) {
      rsp->completed = 1;
    }

    *pData = rsp;
D
dapan1121 已提交
2799
    SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
D
dapan1121 已提交
2800
  }
D
dapan1121 已提交
2801

2802
  SCH_JOB_DLOG("fetch done, totalRows:%d, code:%s", pJob->resNumOfRows, tstrerror(code));
D
dapan1121 已提交
2803 2804 2805 2806

_return:

  atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
L
Liu Jicong 已提交
2807

D
dapan1121 已提交
2808
  schReleaseJob(job);
D
dapan 已提交
2809

D
dapan1121 已提交
2810
  SCH_RET(code);
D
dapan 已提交
2811
}
D
dapan1121 已提交
2812

D
dapan1121 已提交
2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
  int32_t  code = 0;
  SSchJob *pJob = schAcquireJob(job);
  if (NULL == pJob) {
    qDebug("acquire job from jobRef list failed, may not started or dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  if (pJob->status < JOB_TASK_STATUS_NOT_START || pJob->levelNum <= 0 || NULL == pJob->levels) {
    qDebug("job not initialized or not executable job, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  for (int32_t i = pJob->levelNum - 1; i >= 0; --i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
H
Hongze Cheng 已提交
2828

D
dapan1121 已提交
2829
    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
X
Xiaoyu Wang 已提交
2830
      SSchTask     *pTask = taosArrayGet(pLevel->subTasks, m);
D
dapan1121 已提交
2831
      SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status};
H
Hongze Cheng 已提交
2832

D
dapan1121 已提交
2833 2834 2835 2836 2837 2838 2839
      taosArrayPush(pSub, &subDesc);
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2840
int32_t scheduleCancelJob(int64_t job) {
D
dapan1121 已提交
2841
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2842 2843 2844 2845
  if (NULL == pJob) {
    qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
D
dapan1121 已提交
2846

D
dapan1121 已提交
2847 2848
  int32_t code = schCancelJob(pJob);

D
dapan1121 已提交
2849
  schReleaseJob(job);
D
dapan1121 已提交
2850 2851

  SCH_RET(code);
D
dapan1121 已提交
2852 2853
}

D
dapan1121 已提交
2854
void schedulerFreeJob(int64_t job) {
D
dapan1121 已提交
2855
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2856
  if (NULL == pJob) {
D
dapan1121 已提交
2857
    qDebug("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
D
dapan 已提交
2858 2859
    return;
  }
D
dapan1121 已提交
2860

D
dapan1121 已提交
2861 2862
  if (atomic_load_8(&pJob->userFetch) > 0) {
    schProcessOnJobDropped(pJob, TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
2863
  }
D
dapan1121 已提交
2864

D
dapan1121 已提交
2865
  SCH_JOB_DLOG("start to remove job from jobRef list, refId:%" PRIx64, job);
2866

D
dapan1121 已提交
2867 2868
  if (taosRemoveRef(schMgmt.jobRef, job)) {
    SCH_JOB_ELOG("remove job from job list failed, refId:%" PRIx64, job);
2869
  }
D
dapan1121 已提交
2870 2871

  schReleaseJob(job);
D
dapan1121 已提交
2872
}
D
dapan1121 已提交
2873 2874 2875 2876 2877 2878 2879 2880 2881

void schedulerFreeTaskList(SArray *taskList) {
  if (NULL == taskList) {
    return;
  }

  int32_t taskNum = taosArrayGetSize(taskList);
  for (int32_t i = 0; i < taskNum; ++i) {
    STaskInfo *info = taosArrayGet(taskList, i);
wafwerar's avatar
wafwerar 已提交
2882
    taosMemoryFreeClear(info->msg);
D
dapan1121 已提交
2883 2884 2885 2886
  }

  taosArrayDestroy(taskList);
}
L
Liu Jicong 已提交
2887

D
dapan1121 已提交
2888
void schedulerDestroy(void) {
2889 2890
  atomic_store_8((int8_t *)&schMgmt.exit, 1);

D
dapan1121 已提交
2891
  if (schMgmt.jobRef >= 0) {
D
dapan1121 已提交
2892
    SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0);
H
Hongze Cheng 已提交
2893
    int64_t  refId = 0;
C
Cary Xu 已提交
2894

D
dapan1121 已提交
2895
    while (pJob) {
D
dapan1121 已提交
2896
      refId = pJob->refId;
C
Cary Xu 已提交
2897 2898 2899
      if (refId == 0) {
        break;
      }
D
dapan1121 已提交
2900
      taosRemoveRef(schMgmt.jobRef, pJob->refId);
L
Liu Jicong 已提交
2901

D
dapan1121 已提交
2902
      pJob = taosIterateRef(schMgmt.jobRef, refId);
D
dapan1121 已提交
2903
    }
D
dapan1121 已提交
2904
  }
D
dapan1121 已提交
2905 2906

  if (schMgmt.hbConnections) {
H
Hongze Cheng 已提交
2907
    void *pIter = taosHashIterate(schMgmt.hbConnections, NULL);
D
dapan1121 已提交
2908 2909 2910 2911
    while (pIter != NULL) {
      SSchHbTrans *hb = pIter;
      schFreeRpcCtx(&hb->rpcCtx);
      pIter = taosHashIterate(schMgmt.hbConnections, pIter);
H
Hongze Cheng 已提交
2912
    }
D
dapan1121 已提交
2913 2914 2915
    taosHashCleanup(schMgmt.hbConnections);
    schMgmt.hbConnections = NULL;
  }
D
dapan1121 已提交
2916
}