scheduler.c 89.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15
 */

L
Liu Jicong 已提交
16
#include "catalog.h"
H
Hongze Cheng 已提交
17
#include "command.h"
L
Liu Jicong 已提交
18
#include "query.h"
D
dapan1121 已提交
19
#include "schedulerInt.h"
H
Hongze Cheng 已提交
20
#include "tmsg.h"
D
dapan1121 已提交
21
#include "tref.h"
D
dapan1121 已提交
22
#include "trpc.h"
23

D
dapan1121 已提交
24
SSchedulerMgmt schMgmt = {
25
    .jobRef = -1,
D
dapan1121 已提交
26
};
D
dapan1121 已提交
27

L
Liu Jicong 已提交
28
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); }
D
dapan1121 已提交
29

L
Liu Jicong 已提交
30
FORCE_INLINE int32_t schReleaseJob(int64_t refId) { return taosReleaseRef(schMgmt.jobRef, refId); }
D
dapan1121 已提交
31

L
Liu Jicong 已提交
32
uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }
D
dapan1121 已提交
33

L
Liu Jicong 已提交
34
#if 0
D
dapan1121 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
uint64_t schGenUUID(void) {
  static uint64_t hashId = 0;
  static int32_t requestSerialId = 0;

  if (hashId == 0) {
    char    uid[64];
    int32_t code = taosGetSystemUUID(uid, tListLen(uid));
    if (code != TSDB_CODE_SUCCESS) {
      qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    } else {
      hashId = MurmurHash3_32(uid, strlen(uid));
    }
  }

  int64_t ts      = taosGetTimestampMs();
  uint64_t pid    = taosGetPId();
  int32_t val     = atomic_add_fetch_32(&requestSerialId, 1);

  uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
  return id;
}
L
Liu Jicong 已提交
56
#endif
D
dapan1121 已提交
57

L
Liu Jicong 已提交
58 59 60
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
  pTask->plan = pPlan;
  pTask->level = pLevel;
D
dapan1121 已提交
61
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
D
dapan1121 已提交
62
  pTask->taskId = schGenTaskId();
D
dapan1121 已提交
63 64 65
  pTask->execNodes = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SSchNodeInfo));
  if (NULL == pTask->execNodes) {
    SCH_TASK_ELOG("taosArrayInit %d execNodes failed", SCH_MAX_CANDIDATE_EP_NUM);
D
dapan1121 已提交
66 67 68 69 70 71
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
72
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql,
D
dapan1121 已提交
73
                   int64_t startTs, bool syncSchedule) {
H
Hongze Cheng 已提交
74
  int32_t  code = 0;
75
  int64_t  refId = -1;
D
dapan1121 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
    qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->attr.explainMode = pDag->explainInfo.mode;
  pJob->attr.syncSchedule = syncSchedule;
  pJob->transport = transport;
  pJob->sql = sql;

  if (pNodeList != NULL) {
    pJob->nodeList = taosArrayDup(pNodeList);
  }

  SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));

  if (SCH_IS_EXPLAIN_JOB(pJob)) {
    SCH_ERR_JRET(qExecExplainBegin(pDag, &pJob->explainCtx, startTs));
  }

  pJob->execTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->execTasks) {
    SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->succTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->succTasks) {
    SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->failTasks =
      taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pJob->failTasks) {
    SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  tsem_init(&pJob->rspSem, 0, 0);

D
dapan1121 已提交
120
  refId = taosAddRef(schMgmt.jobRef, pJob);
D
dapan1121 已提交
121 122 123 124 125
  if (refId < 0) {
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
    SCH_ERR_JRET(terrno);
  }

D
dapan1121 已提交
126
  atomic_add_fetch_32(&schMgmt.jobNum, 1);
127

D
dapan1121 已提交
128 129
  if (NULL == schAcquireJob(refId)) {
    SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
D
dapan1121 已提交
130
    SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143 144
  }

  pJob->refId = refId;

  SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);

  pJob->status = JOB_TASK_STATUS_NOT_START;

  *pSchJob = pJob;

  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
145 146 147 148 149
  if (refId < 0) {
    schFreeJobImpl(pJob);
  } else {
    taosRemoveRef(schMgmt.jobRef, refId);
  }
D
dapan1121 已提交
150 151 152
  SCH_RET(code);
}

D
dapan1121 已提交
153 154 155 156 157 158 159 160
void schFreeRpcCtx(SRpcCtx *pCtx) {
  if (NULL == pCtx) {
    return;
  }
  void *pIter = taosHashIterate(pCtx->args, NULL);
  while (pIter) {
    SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter;

D
dapan1121 已提交
161
    (*ctxVal->freeFunc)(ctxVal->val);
L
Liu Jicong 已提交
162

D
dapan1121 已提交
163 164
    pIter = taosHashIterate(pCtx->args, pIter);
  }
L
Liu Jicong 已提交
165

D
dapan1121 已提交
166
  taosHashCleanup(pCtx->args);
D
dapan1121 已提交
167

D
dapan1121 已提交
168 169
  if (pCtx->brokenVal.freeFunc) {
    (*pCtx->brokenVal.freeFunc)(pCtx->brokenVal.val);
D
dapan1121 已提交
170
  }
D
dapan1121 已提交
171 172
}

L
Liu Jicong 已提交
173
void schFreeTask(SSchTask *pTask) {
D
dapan1121 已提交
174 175 176 177
  if (pTask->candidateAddrs) {
    taosArrayDestroy(pTask->candidateAddrs);
  }

wafwerar's avatar
wafwerar 已提交
178
  taosMemoryFreeClear(pTask->msg);
D
dapan1121 已提交
179 180 181 182 183 184 185 186

  if (pTask->children) {
    taosArrayDestroy(pTask->children);
  }

  if (pTask->parents) {
    taosArrayDestroy(pTask->parents);
  }
H
Haojun Liao 已提交
187

D
dapan1121 已提交
188 189
  if (pTask->execNodes) {
    taosArrayDestroy(pTask->execNodes);
H
Haojun Liao 已提交
190
  }
D
dapan1121 已提交
191 192
}

D
dapan1121 已提交
193 194 195 196 197 198
static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (pStatus) {
    *pStatus = status;
  }

L
Liu Jicong 已提交
199 200 201
  return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED ||
          status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING ||
          status == JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
202 203
}

D
dapan1121 已提交
204
int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
D
dapan1121 已提交
205
  int32_t lastMsgType = SCH_GET_TASK_LASTMSG_TYPE(pTask);
D
dapan1121 已提交
206
  int32_t taskStatus = SCH_GET_TASK_STATUS(pTask);
D
dapan1121 已提交
207
  int32_t reqMsgType = msgType - 1;
D
dapan1121 已提交
208
  switch (msgType) {
D
dapan1121 已提交
209
    case TDMT_SCH_LINK_BROKEN:
D
dapan1121 已提交
210
    case TDMT_VND_EXPLAIN_RSP:
D
dapan1121 已提交
211
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
212
    case TDMT_VND_QUERY_RSP:  // query_rsp may be processed later than ready_rsp
L
Liu Jicong 已提交
213 214 215
      if (lastMsgType != reqMsgType && -1 != lastMsgType && TDMT_VND_FETCH != lastMsgType) {
        SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
216
      }
L
Liu Jicong 已提交
217

D
dapan1121 已提交
218
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
219 220
        SCH_TASK_DLOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
221
      }
L
Liu Jicong 已提交
222

D
dapan1121 已提交
223
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan1121 已提交
224
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
225 226
    case TDMT_VND_RES_READY_RSP:
      reqMsgType = TDMT_VND_QUERY;
D
dapan1121 已提交
227
      if (lastMsgType != reqMsgType && -1 != lastMsgType) {
L
Liu Jicong 已提交
228 229
        SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s",
                      (lastMsgType > 0 ? TMSG_INFO(lastMsgType) : "null"), TMSG_INFO(msgType));
D
dapan1121 已提交
230 231
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
232

D
dapan1121 已提交
233
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
234 235
        SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
236 237 238
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }

D
dapan1121 已提交
239 240 241
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
      return TSDB_CODE_SUCCESS;
    case TDMT_VND_FETCH_RSP:
L
Liu Jicong 已提交
242 243 244
      if (lastMsgType != reqMsgType && -1 != lastMsgType) {
        SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
245 246
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
247

D
dapan1121 已提交
248
      if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
249 250
        SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                      TMSG_INFO(msgType));
D
dapan1121 已提交
251 252
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
      }
L
Liu Jicong 已提交
253

D
dapan1121 已提交
254 255
      SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
256
    case TDMT_VND_CREATE_TABLE_RSP:
X
Xiaoyu Wang 已提交
257
    case TDMT_VND_DROP_TABLE_RSP:
X
Xiaoyu Wang 已提交
258
    case TDMT_VND_ALTER_TABLE_RSP:
D
dapan1121 已提交
259
    case TDMT_VND_SUBMIT_RSP:
D
dapan1121 已提交
260 261
      break;
    default:
D
dapan1121 已提交
262
      SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus));
D
dapan1121 已提交
263 264 265
      SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
266
  if (lastMsgType != reqMsgType) {
L
Liu Jicong 已提交
267 268
    SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
                  TMSG_INFO(msgType));
D
dapan1121 已提交
269 270
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
L
Liu Jicong 已提交
271

D
dapan1121 已提交
272
  if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
L
Liu Jicong 已提交
273 274
    SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
                  TMSG_INFO(msgType));
D
dapan1121 已提交
275 276 277
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

D
dapan1121 已提交
278
  SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan 已提交
279

D
dapan1121 已提交
280 281 282 283
  return TSDB_CODE_SUCCESS;
}

int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
D
dapan 已提交
284 285
  int32_t code = 0;

D
dapan1121 已提交
286
  int8_t oriStatus = 0;
D
dapan1121 已提交
287

D
dapan1121 已提交
288 289 290 291 292 293
  while (true) {
    oriStatus = SCH_GET_JOB_STATUS(pJob);

    if (oriStatus == newStatus) {
      SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
L
Liu Jicong 已提交
294

D
dapan1121 已提交
295 296 297 298 299
    switch (oriStatus) {
      case JOB_TASK_STATUS_NULL:
        if (newStatus != JOB_TASK_STATUS_NOT_START) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
300

D
dapan1121 已提交
301 302 303 304 305
        break;
      case JOB_TASK_STATUS_NOT_START:
        if (newStatus != JOB_TASK_STATUS_EXECUTING) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
306

D
dapan1121 已提交
307 308
        break;
      case JOB_TASK_STATUS_EXECUTING:
L
Liu Jicong 已提交
309 310 311
        if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED && newStatus != JOB_TASK_STATUS_FAILED &&
            newStatus != JOB_TASK_STATUS_CANCELLING && newStatus != JOB_TASK_STATUS_CANCELLED &&
            newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
312 313
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
314

D
dapan1121 已提交
315 316
        break;
      case JOB_TASK_STATUS_PARTIAL_SUCCEED:
L
Liu Jicong 已提交
317 318
        if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_SUCCEED &&
            newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
319 320
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
321

D
dapan1121 已提交
322 323 324 325 326 327 328
        break;
      case JOB_TASK_STATUS_SUCCEED:
      case JOB_TASK_STATUS_FAILED:
      case JOB_TASK_STATUS_CANCELLING:
        if (newStatus != JOB_TASK_STATUS_DROPPING) {
          SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
        }
L
Liu Jicong 已提交
329

D
dapan1121 已提交
330 331 332
        break;
      case JOB_TASK_STATUS_CANCELLED:
      case JOB_TASK_STATUS_DROPPING:
D
dapan1121 已提交
333
        SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
334
        break;
L
Liu Jicong 已提交
335

D
dapan1121 已提交
336
      default:
D
dapan1121 已提交
337
        SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));
D
dapan 已提交
338
        SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
339 340 341 342 343
    }

    if (oriStatus != atomic_val_compare_exchange_8(&pJob->status, oriStatus, newStatus)) {
      continue;
    }
D
dapan 已提交
344

D
dapan1121 已提交
345
    SCH_JOB_DLOG("job status updated from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan1121 已提交
346

D
dapan1121 已提交
347 348
    break;
  }
D
dapan1121 已提交
349

D
dapan 已提交
350 351 352 353
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
354
  SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan 已提交
355
  SCH_ERR_RET(code);
356
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
357 358
}

D
dapan1121 已提交
359
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
D
dapan1121 已提交
360 361
  for (int32_t i = 0; i < pJob->levelNum; ++i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
L
Liu Jicong 已提交
362

D
dapan1121 已提交
363 364 365
    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
      SSubplan *pPlan = pTask->plan;
L
Liu Jicong 已提交
366 367
      int32_t   childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0;
      int32_t   parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0;
D
dapan1121 已提交
368 369

      if (childNum > 0) {
D
dapan1121 已提交
370 371 372 373
        if (pJob->levelIdx == pLevel->level) {
          SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
L
Liu Jicong 已提交
374

D
dapan1121 已提交
375 376 377
        pTask->children = taosArrayInit(childNum, POINTER_BYTES);
        if (NULL == pTask->children) {
          SCH_TASK_ELOG("taosArrayInit %d children failed", childNum);
D
dapan1121 已提交
378 379 380 381 382
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }

      for (int32_t n = 0; n < childNum; ++n) {
X
Xiaoyu Wang 已提交
383
        SSubplan  *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
X
Xiaoyu Wang 已提交
384
        SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
D
dapan 已提交
385
        if (NULL == childTask || NULL == *childTask) {
D
dapan1121 已提交
386
          SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
387 388 389
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan1121 已提交
390 391
        if (NULL == taosArrayPush(pTask->children, childTask)) {
          SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
392 393
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
D
dapan1121 已提交
394 395

        SCH_TASK_DLOG("children info, the %d child TID %" PRIx64, n, (*childTask)->taskId);
D
dapan1121 已提交
396 397 398
      }

      if (parentNum > 0) {
D
dapan1121 已提交
399 400 401 402
        if (0 == pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
L
Liu Jicong 已提交
403

D
dapan1121 已提交
404 405 406
        pTask->parents = taosArrayInit(parentNum, POINTER_BYTES);
        if (NULL == pTask->parents) {
          SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
D
dapan1121 已提交
407 408
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
D
dapan1121 已提交
409 410 411 412 413
      } else {
        if (0 != pLevel->level) {
          SCH_TASK_ELOG("invalid task info, level:%d, parentNum:%d", pLevel->level, parentNum);
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }
D
dapan1121 已提交
414 415 416
      }

      for (int32_t n = 0; n < parentNum; ++n) {
X
Xiaoyu Wang 已提交
417
        SSubplan  *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
X
Xiaoyu Wang 已提交
418
        SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
D
dapan 已提交
419
        if (NULL == parentTask || NULL == *parentTask) {
D
dapan1121 已提交
420
          SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
421 422 423
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
        }

D
dapan1121 已提交
424 425
        if (NULL == taosArrayPush(pTask->parents, parentTask)) {
          SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
D
dapan1121 已提交
426 427
          SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
D
dapan1121 已提交
428 429

        SCH_TASK_DLOG("parents info, the %d parent TID %" PRIx64, n, (*parentTask)->taskId);        
L
Liu Jicong 已提交
430
      }
D
dapan1121 已提交
431 432

      SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
D
dapan1121 已提交
433 434 435
    }
  }

D
dapan1121 已提交
436
  SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
D
dapan1121 已提交
437
  if (SCH_IS_QUERY_JOB(pJob) && pLevel->taskNum > 1) {
D
dapan1121 已提交
438
    SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
D
dapan 已提交
439 440 441
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

D
dapan1121 已提交
442 443 444
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
445
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
446
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
D
dapan1121 已提交
447
  if (NULL == addr) {
L
Liu Jicong 已提交
448 449
    SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
                  (int32_t)taosArrayGetSize(pTask->candidateAddrs));
D
dapan1121 已提交
450 451
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
D
dapan1121 已提交
452 453

  pTask->succeedAddr = *addr;
454

D
dapan1121 已提交
455
  return TSDB_CODE_SUCCESS;
456 457
}

D
dapan1121 已提交
458 459
int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle) {
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = handle};
L
Liu Jicong 已提交
460

D
dapan1121 已提交
461 462
  if (NULL == taosArrayPush(pTask->execNodes, &nodeInfo)) {
    SCH_TASK_ELOG("taosArrayPush nodeInfo to execNodes list failed, errno:%d", errno);
D
dapan1121 已提交
463 464 465
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
466 467
  SCH_TASK_DLOG("task execNode recorded, handle:%p", handle);

D
dapan1121 已提交
468
  return TSDB_CODE_SUCCESS;
469
}
D
dapan1121 已提交
470

D
dapan1121 已提交
471 472 473 474 475 476 477 478 479 480 481
int32_t schRecordQueryDataSrc(SSchJob *pJob, SSchTask *pTask) {
  if (!SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
    return TSDB_CODE_SUCCESS;
  }

  taosArrayPush(pJob->dataSrcTasks, &pTask);

  return TSDB_CODE_SUCCESS;
}


X
Xiaoyu Wang 已提交
482
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
D
dapan1121 已提交
483
  int32_t code = 0;
D
dapan1121 已提交
484
  pJob->queryId = pDag->queryId;
L
Liu Jicong 已提交
485

D
dapan1121 已提交
486 487
  if (pDag->numOfSubplans <= 0) {
    SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
D
dapan 已提交
488 489
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
L
Liu Jicong 已提交
490

D
dapan1121 已提交
491 492 493 494 495
  pJob->dataSrcTasks = taosArrayInit(pDag->numOfSubplans, POINTER_BYTES);
  if (NULL == pJob->dataSrcTasks) {
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

X
Xiaoyu Wang 已提交
496
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
497
  if (levelNum <= 0) {
D
dapan1121 已提交
498
    SCH_JOB_ELOG("invalid level num:%d", levelNum);
D
dapan1121 已提交
499
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
500 501
  }

L
Liu Jicong 已提交
502 503 504 505
  SHashObj *planToTask = taosHashInit(
      SCHEDULE_DEFAULT_MAX_TASK_NUM,
      taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
      HASH_NO_LOCK);
D
dapan1121 已提交
506
  if (NULL == planToTask) {
D
dapan1121 已提交
507
    SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
D
dapan1121 已提交
508 509
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
510

D
dapan1121 已提交
511 512
  pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
  if (NULL == pJob->levels) {
D
dapan1121 已提交
513
    SCH_JOB_ELOG("taosArrayInit %d failed", levelNum);
D
dapan1121 已提交
514
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
515 516
  }

D
dapan1121 已提交
517 518
  pJob->levelNum = levelNum;
  pJob->levelIdx = levelNum - 1;
519

D
dapan1121 已提交
520
  pJob->subPlans = pDag->pSubplans;
521

L
Liu Jicong 已提交
522
  SSchLevel      level = {0};
X
Xiaoyu Wang 已提交
523
  SNodeListNode *plans = NULL;
L
Liu Jicong 已提交
524
  int32_t        taskNum = 0;
X
Xiaoyu Wang 已提交
525
  SSchLevel     *pLevel = NULL;
526

D
dapan1121 已提交
527
  level.status = JOB_TASK_STATUS_NOT_START;
D
dapan1121 已提交
528

529
  for (int32_t i = 0; i < levelNum; ++i) {
D
dapan1121 已提交
530
    if (NULL == taosArrayPush(pJob->levels, &level)) {
D
dapan1121 已提交
531
      SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i);
D
dapan1121 已提交
532 533 534
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

D
dapan1121 已提交
535
    pLevel = taosArrayGet(pJob->levels, i);
D
dapan1121 已提交
536
    pLevel->level = i;
L
Liu Jicong 已提交
537 538

    plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
D
dapan1121 已提交
539 540
    if (NULL == plans) {
      SCH_JOB_ELOG("empty level plan, level:%d", i);
D
dapan1121 已提交
541
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
542 543
    }

X
Xiaoyu Wang 已提交
544
    taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
D
dapan1121 已提交
545 546
    if (taskNum <= 0) {
      SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
D
dapan1121 已提交
547
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
548 549
    }

D
dapan1121 已提交
550
    pLevel->taskNum = taskNum;
L
Liu Jicong 已提交
551

D
dapan1121 已提交
552
    pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
D
dapan1121 已提交
553
    if (NULL == pLevel->subTasks) {
D
dapan1121 已提交
554
      SCH_JOB_ELOG("taosArrayInit %d failed", taskNum);
D
dapan1121 已提交
555
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
556
    }
L
Liu Jicong 已提交
557

D
dapan1121 已提交
558
    for (int32_t n = 0; n < taskNum; ++n) {
L
Liu Jicong 已提交
559
      SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
D
dapan 已提交
560

D
dapan1121 已提交
561
      SCH_SET_JOB_TYPE(pJob, plan->subplanType);
D
dapan1121 已提交
562 563 564

      SSchTask  task = {0};
      SSchTask *pTask = &task;
565

D
dapan1121 已提交
566
      SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel));
L
Liu Jicong 已提交
567

D
dapan1121 已提交
568
      void *p = taosArrayPush(pLevel->subTasks, &task);
D
dapan1121 已提交
569
      if (NULL == p) {
D
dapan1121 已提交
570
        SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
D
dapan1121 已提交
571 572
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
573

D
dapan1121 已提交
574 575
      SCH_ERR_JRET(schRecordQueryDataSrc(pJob, p));

D
dapan1121 已提交
576
      if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
D
dapan1121 已提交
577
        SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
D
dapan1121 已提交
578 579
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
580 581

      ++pJob->taskNum;
D
dapan1121 已提交
582
    }
D
dapan1121 已提交
583

D
dapan1121 已提交
584
    SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum);
D
dapan1121 已提交
585
  }
D
dapan1121 已提交
586 587

  SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
D
dapan1121 已提交
588 589

_return:
D
dapan1121 已提交
590 591 592 593
  if (planToTask) {
    taosHashCleanup(planToTask);
  }

D
dapan1121 已提交
594
  SCH_RET(code);
595 596
}

D
dapan1121 已提交
597 598
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
  if (NULL != pTask->candidateAddrs) {
D
dapan 已提交
599 600 601
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
602
  pTask->candidateIdx = 0;
603
  pTask->candidateAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
D
dapan1121 已提交
604
  if (NULL == pTask->candidateAddrs) {
605
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CANDIDATE_EP_NUM);
D
dapan1121 已提交
606 607 608
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

L
Liu Jicong 已提交
609
  if (pTask->plan->execNode.epSet.numOfEps > 0) {
D
dapan1121 已提交
610 611
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
D
dapan1121 已提交
612 613 614
      SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

L
Liu Jicong 已提交
615
    SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
D
dapan1121 已提交
616

D
dapan1121 已提交
617 618 619 620
    return TSDB_CODE_SUCCESS;
  }

  int32_t addNum = 0;
D
dapan 已提交
621
  int32_t nodeNum = 0;
622
  if (pJob->nodeList) {
D
dapan 已提交
623
    nodeNum = taosArrayGetSize(pJob->nodeList);
L
Liu Jicong 已提交
624

625
    for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
D
dapan 已提交
626
      SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i);
L
Liu Jicong 已提交
627

628
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
D
dapan 已提交
629 630 631
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
632 633

      ++addNum;
D
dapan1121 已提交
634
    }
D
dapan1121 已提交
635 636
  }

D
dapan1121 已提交
637
  if (addNum <= 0) {
H
Haojun Liao 已提交
638
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
D
dapan1121 已提交
639
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
640 641
  }

L
Liu Jicong 已提交
642 643 644 645 646 647 648 649
  /*
    for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
      strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
      epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];

      ++epSet->numOfEps;
    }
  */
D
dapan 已提交
650 651

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
652
}
D
dapan1121 已提交
653

D
dapan1121 已提交
654 655 656 657 658 659 660 661 662 663 664
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
  int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
  if (code) {
    SCH_TASK_ELOG("task failed to rm from execTask list, code:%x", code);
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
665
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
666 667 668
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
669
      SCH_TASK_ELOG("task already in execTask list, code:%x", code);
D
dapan1121 已提交
670 671
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }
L
Liu Jicong 已提交
672

D
dapan1121 已提交
673
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
D
dapan 已提交
674 675 676
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
677
  SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
678

D
dapan 已提交
679 680 681
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
682 683
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
684
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan 已提交
685 686
  } else {
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
D
dapan 已提交
687 688
  }

D
dapan1121 已提交
689 690 691 692
  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
D
dapan1121 已提交
693
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
694 695
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
696

D
dapan1121 已提交
697
    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno);
D
dapan1121 已提交
698 699 700 701
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
D
dapan1121 已提交
702 703

  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));
L
Liu Jicong 已提交
704

D
dapan1121 已提交
705 706 707
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
708 709
int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  *moved = false;
L
Liu Jicong 已提交
710

D
dapan1121 已提交
711
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
712
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
713 714
  }

D
dapan1121 已提交
715 716 717 718
  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
L
Liu Jicong 已提交
719

D
dapan1121 已提交
720
      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
721 722
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
723

D
dapan1121 已提交
724
    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno);
D
dapan 已提交
725 726 727 728
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;
D
dapan1121 已提交
729 730

  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));
L
Liu Jicong 已提交
731

D
dapan 已提交
732 733 734
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
735 736
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
D
dapan1121 已提交
737
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
738 739 740 741 742 743
  }

  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
      *moved = true;
L
Liu Jicong 已提交
744

D
dapan1121 已提交
745
      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
746 747
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
    }
L
Liu Jicong 已提交
748

D
dapan1121 已提交
749 750 751 752 753 754 755
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  *moved = true;

  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
L
Liu Jicong 已提交
756

D
dapan1121 已提交
757 758 759
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
760
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
D
dapan1121 已提交
761 762
  int8_t status = 0;
  ++pTask->tryTimes;
L
Liu Jicong 已提交
763

D
dapan1121 已提交
764 765 766 767 768
  if (schJobNeedToStop(pJob, &status)) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry cause of job status, job status:%s", jobTaskStatusStr(status));
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
769

D
dapan1121 已提交
770 771 772 773 774
  if (pTask->tryTimes >= REQUEST_MAX_TRY_TIMES) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry since reach max try times, tryTimes:%d", pTask->tryTimes);
    return TSDB_CODE_SUCCESS;
  }
L
Liu Jicong 已提交
775

D
dapan1121 已提交
776 777 778 779 780
  if (!NEED_SCHEDULER_RETRY_ERROR(errCode)) {
    *needRetry = false;
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
781

L
Liu Jicong 已提交
782
  // TODO CHECK epList/condidateList
D
dapan1121 已提交
783 784 785
  if (SCH_IS_DATA_SRC_TASK(pTask)) {
    if (pTask->tryTimes >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
      *needRetry = false;
L
Liu Jicong 已提交
786 787
      SCH_TASK_DLOG("task no more retry since all ep tried, tryTimes:%d, epNum:%d", pTask->tryTimes,
                    SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
D
dapan1121 已提交
788 789
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
790 791
  } else {
    int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
L
Liu Jicong 已提交
792

D
dapan1121 已提交
793
    if ((pTask->candidateIdx + 1) >= candidateNum) {
D
dapan1121 已提交
794
      *needRetry = false;
L
Liu Jicong 已提交
795 796
      SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
                    pTask->candidateIdx, candidateNum);
D
dapan1121 已提交
797 798 799 800
      return TSDB_CODE_SUCCESS;
    }
  }

D
dapan1121 已提交
801 802
  *needRetry = true;
  SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->tryTimes, errCode, tstrerror(errCode));
L
Liu Jicong 已提交
803

D
dapan1121 已提交
804
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
805 806 807 808 809
}

int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
  atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);

D
dapan1121 已提交
810 811 812
  SCH_ERR_RET(schRemoveTaskFromExecList(pJob, pTask));
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
  
D
dapan1121 已提交
813 814
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));
D
dapan1121 已提交
815
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
D
dapan1121 已提交
816 817
  }

D
dapan1121 已提交
818
  if (SCH_IS_DATA_SRC_TASK(pTask)) {
D
dapan1121 已提交
819 820 821 822 823 824 825 826
    SCH_SWITCH_EPSET(&pTask->plan->execNode);
  } else {
    ++pTask->candidateIdx;
  }

  SCH_ERR_RET(schLaunchTask(pJob, pTask));

  return TSDB_CODE_SUCCESS;
D
dapan 已提交
827 828
}

D
dapan1121 已提交
829
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
D
dapan1121 已提交
830 831 832 833 834 835
  int32_t      code = 0;
  SSchHbTrans *hb = NULL;

  hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
    qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
D
dapan1121 已提交
836
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
837 838
  }

D
dapan1121 已提交
839
  SCH_LOCK(SCH_WRITE, &hb->lock);
D
dapan1121 已提交
840
  memcpy(&hb->trans, trans, sizeof(*trans));
D
dapan1121 已提交
841
  SCH_UNLOCK(SCH_WRITE, &hb->lock);
D
dapan1121 已提交
842

L
Liu Jicong 已提交
843 844
  qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, handle:%p", schMgmt.sId,
         epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->transInst, trans->transHandle);
L
Liu Jicong 已提交
845

D
dapan1121 已提交
846 847 848
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
  if (TSDB_CODE_SUCCESS == errCode) {
    return;
  }

  int32_t origCode = atomic_load_32(&pJob->errCode);
  if (TSDB_CODE_SUCCESS == origCode) {
    if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) {
      goto _return;
    }

    origCode = atomic_load_32(&pJob->errCode);
  }

  if (NEED_CLIENT_HANDLE_ERROR(origCode)) {
    return;
  }
L
Liu Jicong 已提交
866

D
dapan1121 已提交
867
  if (NEED_CLIENT_HANDLE_ERROR(errCode)) {
D
dapan1121 已提交
868
    atomic_store_32(&pJob->errCode, errCode);
D
dapan1121 已提交
869
    goto _return;
D
dapan1121 已提交
870 871
  }

D
dapan1121 已提交
872
  return;
L
Liu Jicong 已提交
873 874

_return:
D
dapan1121 已提交
875 876 877 878 879 880 881 882 883 884

  SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode));
}

int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) {
  // if already FAILED, no more processing
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, status));

  schUpdateJobErrCode(pJob, errCode);

D
dapan1121 已提交
885
  if (atomic_load_8(&pJob->userFetch) || pJob->attr.syncSchedule) {
D
dapan1121 已提交
886 887 888
    tsem_post(&pJob->rspSem);
  }

D
dapan1121 已提交
889
  int32_t code = atomic_load_32(&pJob->errCode);
D
dapan1121 已提交
890

D
dapan1121 已提交
891
  SCH_JOB_DLOG("job failed with error: %s", tstrerror(code));
L
Liu Jicong 已提交
892

D
dapan1121 已提交
893
  SCH_RET(code);
D
dapan1121 已提交
894 895
}

D
dapan1121 已提交
896
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
897 898 899 900 901 902 903 904 905
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
  SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAILED, errCode));
}

// Note: no more error processing, handled in function internal
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) {
  SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_DROPPING, errCode));
}

D
dapan1121 已提交
906
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
907 908
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
  int32_t code = 0;
L
Liu Jicong 已提交
909

D
dapan1121 已提交
910
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_PARTIAL_SUCCEED));
D
dapan 已提交
911

D
dapan1121 已提交
912
  if (pJob->attr.syncSchedule) {
D
dapan 已提交
913
    tsem_post(&pJob->rspSem);
D
dapan 已提交
914
  }
L
Liu Jicong 已提交
915

D
dapan1121 已提交
916 917 918
  if (atomic_load_8(&pJob->userFetch)) {
    SCH_ERR_JRET(schFetchFromRemote(pJob));
  }
D
dapan 已提交
919

D
dapan 已提交
920
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
921 922 923

_return:

D
dapan1121 已提交
924
  SCH_RET(schProcessOnJobFailure(pJob, code));
D
dapan 已提交
925 926
}

927
void schProcessOnDataFetched(SSchJob *job) {
D
dapan 已提交
928 929
  atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
  tsem_post(&job->rspSem);
D
dapan 已提交
930 931
}

D
dapan1121 已提交
932
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
933
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
D
dapan1121 已提交
934
  int8_t status = 0;
L
Liu Jicong 已提交
935

D
dapan1121 已提交
936
  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
937
    SCH_TASK_DLOG("task failed not processed cause of job status, job status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
938 939 940
    SCH_RET(atomic_load_32(&pJob->errCode));
  }

L
Liu Jicong 已提交
941 942
  bool    needRetry = false;
  bool    moved = false;
D
dapan1121 已提交
943
  int32_t taskDone = 0;
D
dapan1121 已提交
944
  int32_t code = 0;
D
dapan1121 已提交
945

H
Haojun Liao 已提交
946
  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
L
Liu Jicong 已提交
947

D
dapan1121 已提交
948
  SCH_ERR_JRET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
L
Liu Jicong 已提交
949

D
dapan1121 已提交
950
  if (!needRetry) {
H
Haojun Liao 已提交
951
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
D
dapan1121 已提交
952 953

    if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
D
dapan1121 已提交
954 955
      SCH_ERR_JRET(schMoveTaskToFailList(pJob, pTask, &moved));
    } else {
D
dapan1121 已提交
956
      SCH_TASK_ELOG("task not in executing list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
957
      SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
958 959 960
    }

    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED);
L
Liu Jicong 已提交
961

D
dapan1121 已提交
962
    if (SCH_IS_WAIT_ALL_JOB(pJob)) {
D
dapan1121 已提交
963 964 965 966 967
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskFailed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);

D
dapan1121 已提交
968
      schUpdateJobErrCode(pJob, errCode);
L
Liu Jicong 已提交
969

D
dapan1121 已提交
970
      if (taskDone < pTask->level->taskNum) {
L
Liu Jicong 已提交
971
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
D
dapan 已提交
972
        SCH_RET(errCode);
D
dapan1121 已提交
973 974 975
      }
    }
  } else {
D
dapan1121 已提交
976
    SCH_ERR_JRET(schHandleTaskRetry(pJob, pTask));
L
Liu Jicong 已提交
977

D
dapan 已提交
978 979
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
980

D
dapan1121 已提交
981 982
_return:

D
dapan1121 已提交
983
  SCH_RET(schProcessOnJobFailure(pJob, errCode));
D
dapan1121 已提交
984 985
}

D
dapan1121 已提交
986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011
int32_t schLaunchNextLevelTasks(SSchJob *pJob, SSchTask *pTask) {
  if (!SCH_IS_QUERY_JOB(pJob)) {
    return TSDB_CODE_SUCCESS;
  }

  SSchLevel *pLevel = pTask->level;
  int32_t doneNum = atomic_add_fetch_32(&pLevel->taskDoneNum, 1);
  if (doneNum == pLevel->taskNum) {
    pJob->levelIdx--;

    pLevel = taosArrayGet(pJob->levels, pJob->levelIdx);
    for (int32_t i = 0; i < pLevel->taskNum; ++i) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);

      if (pTask->children && taosArrayGetSize(pTask->children) > 0) {
        continue;
      }
      
      SCH_ERR_RET(schLaunchTask(pJob, pTask));
    }
  }

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
1012
// Note: no more task error processing, handled in function internal
D
dapan1121 已提交
1013
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
1014
  bool    moved = false;
D
dapan1121 已提交
1015 1016
  int32_t code = 0;

D
dapan1121 已提交
1017
  SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan 已提交
1018

D
dapan1121 已提交
1019
  SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved));
D
dapan1121 已提交
1020

D
dapan1121 已提交
1021
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED);
D
dapan1121 已提交
1022

D
dapan1121 已提交
1023
  SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask));
D
dapan1121 已提交
1024 1025

  SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
L
Liu Jicong 已提交
1026

D
dapan1121 已提交
1027
  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
D
dapan 已提交
1028
  if (parentNum == 0) {
L
Liu Jicong 已提交
1029
    int32_t taskDone = 0;
D
dapan1121 已提交
1030
    if (SCH_IS_WAIT_ALL_JOB(pJob)) {
D
dapan1121 已提交
1031 1032 1033 1034
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
      pTask->level->taskSucceed++;
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
L
Liu Jicong 已提交
1035

D
dapan1121 已提交
1036
      if (taskDone < pTask->level->taskNum) {
S
Shengliang Guan 已提交
1037
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
1038
        return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1039
      } else if (taskDone > pTask->level->taskNum) {
D
dapan1121 已提交
1040
        SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
D
dapan1121 已提交
1041 1042
      }

D
dapan1121 已提交
1043
      if (pTask->level->taskFailed > 0) {
D
dapan1121 已提交
1044 1045 1046
        SCH_RET(schProcessOnJobFailure(pJob, 0));
      } else {
        SCH_RET(schProcessOnJobPartialSuccess(pJob));
D
dapan1121 已提交
1047 1048
      }
    } else {
D
dapan1121 已提交
1049
      pJob->resNode = pTask->succeedAddr;
D
dapan1121 已提交
1050
    }
D
dapan 已提交
1051

D
dapan1121 已提交
1052
    pJob->fetchTask = pTask;
D
dapan1121 已提交
1053

D
dapan1121 已提交
1054
    SCH_ERR_JRET(schMoveTaskToExecList(pJob, pTask, &moved));
L
Liu Jicong 已提交
1055

D
dapan1121 已提交
1056
    SCH_RET(schProcessOnJobPartialSuccess(pJob));
D
dapan 已提交
1057 1058
  }

L
Liu Jicong 已提交
1059 1060 1061 1062
  /*
    if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
      strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
      job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
D
dapan 已提交
1063

L
Liu Jicong 已提交
1064 1065 1066
      ++job->dataSrcEps.numOfEps;
    }
  */
D
dapan 已提交
1067

D
dapan 已提交
1068
  for (int32_t i = 0; i < parentNum; ++i) {
D
dapan1121 已提交
1069
    SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i);
L
Liu Jicong 已提交
1070
    int32_t   readyNum = atomic_add_fetch_32(&par->childReady, 1);
D
dapan 已提交
1071

D
dapan1121 已提交
1072
    SCH_LOCK(SCH_WRITE, &par->lock);
L
Liu Jicong 已提交
1073 1074 1075 1076
    SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE,
                                    .taskId = pTask->taskId,
                                    .schedId = schMgmt.sId,
                                    .addr = pTask->succeedAddr};
X
Xiaoyu Wang 已提交
1077
    qSetSubplanExecutionNode(par->plan, pTask->plan->id.groupId, &source);
D
dapan1121 已提交
1078
    SCH_UNLOCK(SCH_WRITE, &par->lock);
L
Liu Jicong 已提交
1079

D
dapan1121 已提交
1080 1081
    if (SCH_TASK_READY_FOR_LAUNCH(readyNum, par)) {
      SCH_ERR_RET(schLaunchTask(pJob, par));
D
dapan 已提交
1082 1083 1084
    }
  }

D
dapan1121 已提交
1085 1086
  SCH_ERR_RET(schLaunchNextLevelTasks(pJob, pTask));

D
dapan 已提交
1087 1088
  return TSDB_CODE_SUCCESS;

D
dapan1121 已提交
1089
_return:
D
dapan 已提交
1090

D
dapan1121 已提交
1091 1092
  SCH_RET(schProcessOnJobFailure(pJob, code));
}
D
dapan 已提交
1093

D
dapan1121 已提交
1094 1095 1096
// Note: no more error processing, handled in function internal
int32_t schFetchFromRemote(SSchJob *pJob) {
  int32_t code = 0;
L
Liu Jicong 已提交
1097

D
dapan1121 已提交
1098 1099 1100 1101 1102
  if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) {
    SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch));
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1103 1104
  void *resData = atomic_load_ptr(&pJob->resData);
  if (resData) {
D
dapan1121 已提交
1105 1106
    atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1107
    SCH_JOB_DLOG("res already fetched, res:%p", resData);
D
dapan1121 已提交
1108 1109 1110 1111 1112 1113
    return TSDB_CODE_SUCCESS;
  }

  SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH));

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1114

D
dapan1121 已提交
1115 1116 1117 1118
_return:

  atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1119
  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
D
dapan1121 已提交
1120 1121
}

D
dapan1121 已提交
1122 1123
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
  SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
H
Hongze Cheng 已提交
1124

D
dapan1121 已提交
1125 1126 1127 1128
  atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
  atomic_store_ptr(&pJob->resData, pRsp);

  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
H
Hongze Cheng 已提交
1129

D
dapan1121 已提交
1130 1131 1132 1133 1134
  schProcessOnDataFetched(pJob);

  return TSDB_CODE_SUCCESS;
}

1135 1136
int32_t schSaveJobQueryRes(SSchJob *pJob, SResReadyRsp *rsp) {
  if (rsp->tbFName[0]) {
D
dapan1121 已提交
1137 1138 1139
    if (NULL == pJob->queryRes) {
      pJob->queryRes = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
      if (NULL == pJob->queryRes) {
1140 1141 1142 1143 1144 1145 1146 1147 1148
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
      }
    }

    STbVerInfo tbInfo;
    strcpy(tbInfo.tbFName, rsp->tbFName);
    tbInfo.sversion = rsp->sversion;
    tbInfo.tversion = rsp->tversion;

D
dapan1121 已提交
1149
    taosArrayPush((SArray *)pJob->queryRes, &tbInfo);
1150 1151 1152 1153 1154 1155
  }

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
1156
// Note: no more task error processing, handled in function internal
L
Liu Jicong 已提交
1157 1158
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize,
                             int32_t rspCode) {
D
dapan1121 已提交
1159
  int32_t code = 0;
L
Liu Jicong 已提交
1160 1161
  int8_t  status = 0;

D
dapan1121 已提交
1162
  if (schJobNeedToStop(pJob, &status)) {
L
Liu Jicong 已提交
1163 1164
    SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status),
                  rspCode);
D
dapan1121 已提交
1165 1166
    SCH_RET(atomic_load_32(&pJob->errCode));
  }
H
Haojun Liao 已提交
1167

D
dapan1121 已提交
1168 1169
  SCH_ERR_JRET(schValidateTaskReceivedMsgType(pJob, pTask, msgType));

D
dapan1121 已提交
1170
  switch (msgType) {
H
Haojun Liao 已提交
1171
    case TDMT_VND_CREATE_TABLE_RSP: {
X
Xiaoyu Wang 已提交
1172 1173
      SVCreateTbBatchRsp batchRsp = {0};
      if (msg) {
H
Hongze Cheng 已提交
1174 1175
        SDecoder coder = {0};
        tDecoderInit(&coder, msg, msgSize);
1176 1177 1178 1179
        code = tDecodeSVCreateTbBatchRsp(&coder, &batchRsp);
        if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
          for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
            SVCreateTbRsp *rsp = batchRsp.pRsps + i;
X
Xiaoyu Wang 已提交
1180
            if (TSDB_CODE_SUCCESS != rsp->code) {
1181
              code = rsp->code;
X
Xiaoyu Wang 已提交
1182 1183
              tDecoderClear(&coder);
              SCH_ERR_JRET(code);
D
dapan 已提交
1184 1185
            }
          }
D
dapan1121 已提交
1186
        }
H
Hongze Cheng 已提交
1187
        tDecoderClear(&coder);
1188
        SCH_ERR_JRET(code);
L
Liu Jicong 已提交
1189 1190
      }

L
Liu Jicong 已提交
1191 1192 1193 1194
      SCH_ERR_JRET(rspCode);
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
      break;
    }
X
Xiaoyu Wang 已提交
1195 1196 1197
    case TDMT_VND_DROP_TABLE_RSP: {
      SVDropTbBatchRsp batchRsp = {0};
      if (msg) {
H
Hongze Cheng 已提交
1198 1199
        SDecoder coder = {0};
        tDecoderInit(&coder, msg, msgSize);
X
Xiaoyu Wang 已提交
1200
        code = tDecodeSVDropTbBatchRsp(&coder, &batchRsp);
1201 1202 1203
        if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
          for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
            SVDropTbRsp *rsp = batchRsp.pRsps + i;
X
Xiaoyu Wang 已提交
1204
            if (TSDB_CODE_SUCCESS != rsp->code) {
1205
              code = rsp->code;
X
Xiaoyu Wang 已提交
1206 1207
              tDecoderClear(&coder);
              SCH_ERR_JRET(code);
X
Xiaoyu Wang 已提交
1208 1209 1210
            }
          }
        }
H
Hongze Cheng 已提交
1211
        tDecoderClear(&coder);
X
Xiaoyu Wang 已提交
1212 1213 1214 1215 1216 1217 1218
        SCH_ERR_JRET(code);
      }

      SCH_ERR_JRET(rspCode);
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
      break;
    }
X
Xiaoyu Wang 已提交
1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234
    case TDMT_VND_ALTER_TABLE_RSP: {
      SVAlterTbRsp rsp = {0};
      if (msg) {
        SDecoder coder = {0};
        tDecoderInit(&coder, msg, msgSize);
        code = tDecodeSVAlterTbRsp(&coder, &rsp);
        tDecoderClear(&coder);
        SCH_ERR_JRET(code);
        SCH_ERR_JRET(rsp.code);
      }

      SCH_ERR_JRET(rspCode);

      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }
X
Xiaoyu Wang 已提交
1235
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
X
Xiaoyu Wang 已提交
1236 1237
      break;
    }
D
dapan1121 已提交
1238
    case TDMT_VND_SUBMIT_RSP: {
L
Liu Jicong 已提交
1239
      SCH_ERR_JRET(rspCode);
D
dapan1121 已提交
1240

D
dapan 已提交
1241
      if (msg) {
X
Xiaoyu Wang 已提交
1242
        SDecoder    coder = {0};
D
dapan 已提交
1243 1244 1245 1246 1247 1248 1249 1250
        SSubmitRsp *rsp = taosMemoryMalloc(sizeof(*rsp));
        tDecoderInit(&coder, msg, msgSize);
        code = tDecodeSSubmitRsp(&coder, rsp);
        if (code) {
          SCH_TASK_ELOG("decode submitRsp failed, code:%d", code);
          tFreeSSubmitRsp(rsp);
          SCH_ERR_JRET(code);
        }
X
Xiaoyu Wang 已提交
1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262

        if (rsp->nBlocks > 0) {
          for (int32_t i = 0; i < rsp->nBlocks; ++i) {
            SSubmitBlkRsp *blk = rsp->pBlocks + i;
            if (TSDB_CODE_SUCCESS != blk->code) {
              code = blk->code;
              tFreeSSubmitRsp(rsp);
              SCH_ERR_JRET(code);
            }
          }
        }

D
dapan 已提交
1263 1264
        atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
        SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows);
X
Xiaoyu Wang 已提交
1265

D
dapan1121 已提交
1266
        SCH_LOCK(SCH_WRITE, &pJob->resLock);
D
dapan1121 已提交
1267 1268
        if (pJob->queryRes) {
          SSubmitRsp *sum = pJob->queryRes;
D
dapan1121 已提交
1269 1270 1271 1272 1273 1274
          sum->affectedRows += rsp->affectedRows;
          sum->nBlocks += rsp->nBlocks;
          sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks));
          memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks));
          taosMemoryFree(rsp->pBlocks);
          taosMemoryFree(rsp);
D
dapan 已提交
1275
        } else {
D
dapan1121 已提交
1276
          pJob->queryRes = rsp;
D
dapan 已提交
1277
        }
D
dapan1121 已提交
1278
        SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
L
Liu Jicong 已提交
1279
      }
D
dapan1121 已提交
1280

L
Liu Jicong 已提交
1281
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
D
dapan1121 已提交
1282

L
Liu Jicong 已提交
1283 1284
      break;
    }
D
dapan1121 已提交
1285
    case TDMT_VND_QUERY_RSP: {
L
Liu Jicong 已提交
1286 1287
      SQueryTableRsp rsp = {0};
      if (msg) {
D
dapan1121 已提交
1288
        SCH_ERR_JRET(tDeserializeSQueryTableRsp(msg, msgSize, &rsp));
L
Liu Jicong 已提交
1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300
        SCH_ERR_JRET(rsp.code);
      }

      SCH_ERR_JRET(rspCode);

      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }

      // SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY));

      break;
L
Liu Jicong 已提交
1301
    }
D
dapan1121 已提交
1302
    case TDMT_VND_RES_READY_RSP: {
L
Liu Jicong 已提交
1303 1304 1305 1306 1307
      SResReadyRsp *rsp = (SResReadyRsp *)msg;

      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
1308
      }
L
Liu Jicong 已提交
1309
      SCH_ERR_JRET(rsp->code);
1310 1311 1312

      SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp));
      
L
Liu Jicong 已提交
1313
      SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
D
dapan1121 已提交
1314

L
Liu Jicong 已提交
1315 1316
      break;
    }
D
dapan1121 已提交
1317 1318 1319 1320 1321
    case TDMT_VND_EXPLAIN_RSP: {
      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }
H
Hongze Cheng 已提交
1322

D
dapan1121 已提交
1323 1324 1325 1326 1327 1328 1329 1330 1331 1332
      if (!SCH_IS_EXPLAIN_JOB(pJob)) {
        SCH_TASK_ELOG("invalid msg received for none explain query, msg type:%s", TMSG_INFO(msgType));
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }

      if (pJob->resData) {
        SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->resData);
        SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
      }

D
dapan1121 已提交
1333 1334 1335 1336 1337 1338
      SExplainRsp rsp = {0};
      if (tDeserializeSExplainRsp(msg, msgSize, &rsp)) {
        taosMemoryFree(rsp.subplanInfo);
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

D
dapan1121 已提交
1339
      SRetrieveTableRsp *pRsp = NULL;
D
dapan1121 已提交
1340
      SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, &rsp, pTask->plan->id.groupId, &pRsp));
D
dapan1121 已提交
1341 1342

      if (pRsp) {
D
dapan1121 已提交
1343
        SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
D
dapan1121 已提交
1344 1345 1346
      }
      break;
    }
L
Liu Jicong 已提交
1347 1348
    case TDMT_VND_FETCH_RSP: {
      SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
D
dapan1121 已提交
1349

L
Liu Jicong 已提交
1350 1351 1352 1353
      SCH_ERR_JRET(rspCode);
      if (NULL == msg) {
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
      }
D
dapan1121 已提交
1354

D
dapan1121 已提交
1355
      if (SCH_IS_EXPLAIN_JOB(pJob)) {
H
Hongze Cheng 已提交
1356
        if (rsp->completed) {
D
dapan1121 已提交
1357 1358 1359 1360 1361
          SRetrieveTableRsp *pRsp = NULL;
          SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
          if (pRsp) {
            SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
          }
H
Hongze Cheng 已提交
1362

D
dapan1121 已提交
1363 1364 1365
          return TSDB_CODE_SUCCESS;
        }

D
dapan1121 已提交
1366 1367
        atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);

D
dapan1121 已提交
1368 1369 1370 1371 1372
        SCH_ERR_JRET(schFetchFromRemote(pJob));

        return TSDB_CODE_SUCCESS;
      }

X
Xiaoyu Wang 已提交
1373 1374
      if (pJob->resData) {
        SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->resData);
wafwerar's avatar
wafwerar 已提交
1375
        taosMemoryFreeClear(rsp);
L
Liu Jicong 已提交
1376 1377
        SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
      }
H
Haojun Liao 已提交
1378

X
Xiaoyu Wang 已提交
1379
      atomic_store_ptr(&pJob->resData, rsp);
L
Liu Jicong 已提交
1380
      atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
D
dapan1121 已提交
1381

L
Liu Jicong 已提交
1382 1383
      if (rsp->completed) {
        SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
1384
      }
L
Liu Jicong 已提交
1385 1386 1387 1388 1389 1390

      SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);

      schProcessOnDataFetched(pJob);
      break;
    }
D
dapan1121 已提交
1391
    case TDMT_VND_DROP_TASK_RSP: {
L
Liu Jicong 已提交
1392 1393 1394 1395 1396
      // SHOULD NEVER REACH HERE
      SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:%" PRIx64, pJob->refId);
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
      break;
    }
D
dapan1121 已提交
1397 1398 1399 1400
    case TDMT_SCH_LINK_BROKEN:
      SCH_TASK_ELOG("link broken received, error:%x - %s", rspCode, tstrerror(rspCode));
      SCH_ERR_JRET(rspCode);
      break;
D
dapan1121 已提交
1401
    default:
D
dapan1121 已提交
1402
      SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%s", msgType, SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
1403
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
1404 1405 1406 1407 1408
  }

  return TSDB_CODE_SUCCESS;

_return:
D
dapan1121 已提交
1409

D
dapan1121 已提交
1410
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
1411 1412
}

D
dapan1121 已提交
1413
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
H
Hongze Cheng 已提交
1414 1415 1416 1417
  int32_t s = taosHashGetSize(pTaskList);
  if (s <= 0) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1418

H
Hongze Cheng 已提交
1419 1420 1421 1422
  SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
  if (NULL == task || NULL == (*task)) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1423

H
Hongze Cheng 已提交
1424 1425 1426
  *pTask = *task;

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1427 1428
}

D
dapan1121 已提交
1429
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) {
H
Hongze Cheng 已提交
1430 1431
  if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 ||
      taosArrayGetSize(pTask->execNodes) <= 0) {
D
dapan1121 已提交
1432 1433 1434 1435 1436 1437 1438 1439 1440
    return TSDB_CODE_SUCCESS;
  }

  SSchNodeInfo *nodeInfo = taosArrayGet(pTask->execNodes, 0);
  nodeInfo->handle = handle;

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1441
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
L
Liu Jicong 已提交
1442
  int32_t                code = 0;
D
dapan1121 已提交
1443
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
X
Xiaoyu Wang 已提交
1444
  SSchTask              *pTask = NULL;
L
Liu Jicong 已提交
1445

D
dapan1121 已提交
1446
  SSchJob *pJob = schAcquireJob(pParam->refId);
D
dapan1121 已提交
1447
  if (NULL == pJob) {
D
dapan1121 已提交
1448
    qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:%" PRIx64,
L
Liu Jicong 已提交
1449
          pParam->queryId, pParam->taskId, pParam->refId);
D
dapan1121 已提交
1450
    SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
1451 1452
  }

D
dapan1121 已提交
1453 1454 1455 1456 1457
  schGetTaskFromTaskList(pJob->execTasks, pParam->taskId, &pTask);
  if (NULL == pTask) {
    if (TDMT_VND_EXPLAIN_RSP == msgType) {
      schGetTaskFromTaskList(pJob->succTasks, pParam->taskId, &pTask);
    } else {
H
Hongze Cheng 已提交
1458 1459
      SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
                   pParam->taskId);
D
dapan1121 已提交
1460 1461
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
    }
D
dapan1121 已提交
1462
  }
H
Hongze Cheng 已提交
1463

D
dapan1121 已提交
1464
  if (NULL == pTask) {
H
Hongze Cheng 已提交
1465 1466
    SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
                 pParam->taskId);
D
dapan1121 已提交
1467 1468
    SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
  }
D
dapan1121 已提交
1469

D
dapan1121 已提交
1470
  SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
L
Liu Jicong 已提交
1471

L
Liu Jicong 已提交
1472
  SCH_SET_TASK_HANDLE(pTask, pMsg->handle);
D
dapan1121 已提交
1473
  schUpdateTaskExecNodeHandle(pTask, pMsg->handle, rspCode);
D
dapan1121 已提交
1474
  SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
D
dapan1121 已提交
1475

H
Haojun Liao 已提交
1476
_return:
D
dapan1121 已提交
1477
  if (pJob) {
D
dapan1121 已提交
1478
    schReleaseJob(pParam->refId);
D
dapan1121 已提交
1479 1480
  }

wafwerar's avatar
wafwerar 已提交
1481
  taosMemoryFreeClear(param);
D
dapan1121 已提交
1482 1483 1484
  SCH_RET(code);
}

L
Liu Jicong 已提交
1485
int32_t schHandleSubmitCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1486 1487
  return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
}
H
Haojun Liao 已提交
1488

L
Liu Jicong 已提交
1489
int32_t schHandleCreateTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
H
Haojun Liao 已提交
1490 1491 1492
  return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
}

X
Xiaoyu Wang 已提交
1493 1494 1495 1496
int32_t schHandleDropTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_DROP_TABLE_RSP, code);
}

X
Xiaoyu Wang 已提交
1497 1498 1499 1500
int32_t schHandleAlterTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_ALTER_TABLE_RSP, code);
}

L
Liu Jicong 已提交
1501
int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1502 1503
  return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
H
Haojun Liao 已提交
1504

L
Liu Jicong 已提交
1505
int32_t schHandleFetchCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1506 1507
  return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
}
H
Haojun Liao 已提交
1508

L
Liu Jicong 已提交
1509
int32_t schHandleReadyCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1510 1511
  return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
}
H
Haojun Liao 已提交
1512

D
dapan1121 已提交
1513 1514 1515 1516
int32_t schHandleExplainCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  return schHandleCallback(param, pMsg, TDMT_VND_EXPLAIN_RSP, code);
}

L
Liu Jicong 已提交
1517
int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1518
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
L
Liu Jicong 已提交
1519
  qDebug("QID:%" PRIx64 ",TID:%" PRIx64 " drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code);
1520
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1521 1522
}

L
Liu Jicong 已提交
1523
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
1524 1525 1526
  SSchedulerHbRsp rsp = {0};
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;

D
dapan1121 已提交
1527 1528
  if (code) {
    qError("hb rsp error:%s", tstrerror(code));
D
dapan1121 已提交
1529
    SCH_ERR_JRET(code);
D
dapan1121 已提交
1530
  }
L
Liu Jicong 已提交
1531

D
dapan1121 已提交
1532 1533
  if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) {
    qError("invalid hb rsp msg, size:%d", pMsg->len);
D
dapan1121 已提交
1534
    SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
1535 1536
  }

D
dapan1121 已提交
1537 1538 1539
  SSchTrans trans = {0};
  trans.transInst = pParam->transport;
  trans.transHandle = pMsg->handle;
L
Liu Jicong 已提交
1540

D
dapan1121 已提交
1541
  SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans));
D
dapan1121 已提交
1542 1543

  int32_t taskNum = (int32_t)taosArrayGetSize(rsp.taskStatus);
L
Liu Jicong 已提交
1544 1545
  qDebug("%d task status in hb rsp, nodeId:%d, fqdn:%s, port:%d", taskNum, rsp.epId.nodeId, rsp.epId.ep.fqdn,
         rsp.epId.ep.port);
D
dapan1121 已提交
1546

D
dapan1121 已提交
1547 1548
  for (int32_t i = 0; i < taskNum; ++i) {
    STaskStatus *taskStatus = taosArrayGet(rsp.taskStatus, i);
L
Liu Jicong 已提交
1549

D
dapan1121 已提交
1550 1551
    SSchJob *pJob = schAcquireJob(taskStatus->refId);
    if (NULL == pJob) {
L
Liu Jicong 已提交
1552 1553 1554
      qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId,
            taskStatus->queryId, taskStatus->taskId);
      // TODO DROP TASK FROM SERVER!!!!
D
dapan1121 已提交
1555 1556
      continue;
    }
L
Liu Jicong 已提交
1557

D
dapan1121 已提交
1558
    // TODO
L
Liu Jicong 已提交
1559 1560 1561

    SCH_JOB_DLOG("TID:0x%" PRIx64 " task status in server: %s", taskStatus->taskId,
                 jobTaskStatusStr(taskStatus->status));
D
dapan1121 已提交
1562 1563 1564 1565 1566 1567 1568

    schReleaseJob(taskStatus->refId);
  }

_return:

  tFreeSSchedulerHbRsp(&rsp);
D
dapan1121 已提交
1569
  taosMemoryFree(param);
D
dapan1121 已提交
1570 1571 1572 1573

  SCH_RET(code);
}

D
dapan1121 已提交
1574 1575 1576 1577
int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t code) {
  SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param;
  rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);

D
dapan1121 已提交
1578 1579
  qDebug("handle %p is broken", pMsg->handle);

D
dapan1121 已提交
1580 1581
  if (head->isHbParam) {
    SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param;
L
Liu Jicong 已提交
1582
    SSchTrans            trans = {.transInst = hbParam->transport, .transHandle = NULL};
D
dapan1121 已提交
1583 1584 1585 1586 1587 1588 1589 1590 1591 1592
    SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans));

    SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId));
  } else {
    SCH_ERR_RET(schHandleCallback(param, pMsg, TDMT_SCH_LINK_BROKEN, code));
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1593
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
D
dapan1121 已提交
1594
  switch (msgType) {
H
Haojun Liao 已提交
1595 1596 1597
    case TDMT_VND_CREATE_TABLE:
      *fp = schHandleCreateTableCallback;
      break;
X
Xiaoyu Wang 已提交
1598 1599 1600
    case TDMT_VND_DROP_TABLE:
      *fp = schHandleDropTableCallback;
      break;
X
Xiaoyu Wang 已提交
1601 1602 1603
    case TDMT_VND_ALTER_TABLE:
      *fp = schHandleAlterTableCallback;
      break;
L
Liu Jicong 已提交
1604
    case TDMT_VND_SUBMIT:
D
dapan1121 已提交
1605 1606
      *fp = schHandleSubmitCallback;
      break;
L
Liu Jicong 已提交
1607
    case TDMT_VND_QUERY:
D
dapan1121 已提交
1608 1609
      *fp = schHandleQueryCallback;
      break;
L
Liu Jicong 已提交
1610
    case TDMT_VND_RES_READY:
D
dapan1121 已提交
1611 1612
      *fp = schHandleReadyCallback;
      break;
D
dapan1121 已提交
1613 1614 1615
    case TDMT_VND_EXPLAIN:
      *fp = schHandleExplainCallback;
      break;
L
Liu Jicong 已提交
1616
    case TDMT_VND_FETCH:
D
dapan1121 已提交
1617 1618 1619 1620 1621
      *fp = schHandleFetchCallback;
      break;
    case TDMT_VND_DROP_TASK:
      *fp = schHandleDropCallback;
      break;
D
dapan1121 已提交
1622 1623 1624
    case TDMT_VND_QUERY_HEARTBEAT:
      *fp = schHandleHbCallback;
      break;
D
dapan1121 已提交
1625 1626 1627
    case TDMT_SCH_LINK_BROKEN:
      *fp = schHandleLinkBrokenCallback;
      break;
D
dapan1121 已提交
1628
    default:
D
dapan1121 已提交
1629
      qError("unknown msg type for callback, msgType:%d", msgType);
D
dapan1121 已提交
1630 1631 1632 1633 1634 1635
      SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1636
int32_t schGenerateTaskCallBackAHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, SMsgSendInfo **pMsgSendInfo) {
H
Hongze Cheng 已提交
1637
  int32_t       code = 0;
D
dapan1121 已提交
1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672
  SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (NULL == msgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));

  param->queryId = pJob->queryId;
  param->refId = pJob->refId;
  param->taskId = SCH_TASK_ID(pTask);
  param->transport = pJob->transport;

  msgSendInfo->param = param;
  msgSendInfo->fp = fp;

  *pMsgSendInfo = msgSendInfo;

  return TSDB_CODE_SUCCESS;

_return:

  taosMemoryFree(param);
  taosMemoryFree(msgSendInfo);

  SCH_RET(code);
}

D
dapan1121 已提交
1673
void schFreeRpcCtxVal(const void *arg) {
D
dapan1121 已提交
1674 1675 1676
  if (NULL == arg) {
    return;
  }
L
Liu Jicong 已提交
1677 1678

  SMsgSendInfo *pMsgSendInfo = (SMsgSendInfo *)arg;
D
dapan1121 已提交
1679 1680
  taosMemoryFreeClear(pMsgSendInfo->param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1681
}
D
dapan1121 已提交
1682

D
dapan1121 已提交
1683
int32_t schMakeTaskCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
D
dapan1121 已提交
1684
  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  param->queryId = pJob->queryId;
  param->refId = pJob->refId;
  param->taskId = SCH_TASK_ID(pTask);
  param->transport = pJob->transport;

  *pParam = param;

  return TSDB_CODE_SUCCESS;
}

int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
D
dapan1121 已提交
1701
  SSchHbCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720
  if (NULL == param) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  param->head.isHbParam = true;

  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);

  param->nodeEpId.nodeId = addr->nodeId;
  memcpy(&param->nodeEpId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
  param->transport = pJob->transport;

  *pParam = param;

  return TSDB_CODE_SUCCESS;
}

int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb) {
L
Liu Jicong 已提交
1721 1722 1723
  int32_t       code = 0;
  SMsgSendInfo *pMsgSendInfo = NULL;

D
dapan1121 已提交
1724
  pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735
  if (NULL == pMsgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  if (isHb) {
    SCH_ERR_JRET(schMakeHbCallbackParam(pJob, pTask, &pMsgSendInfo->param));
  } else {
    SCH_ERR_JRET(schMakeTaskCallbackParam(pJob, pTask, &pMsgSendInfo->param));
  }

L
Liu Jicong 已提交
1736
  int32_t              msgType = TDMT_SCH_LINK_BROKEN;
D
dapan1121 已提交
1737 1738
  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
L
Liu Jicong 已提交
1739

D
dapan1121 已提交
1740 1741 1742 1743
  pMsgSendInfo->fp = fp;

  brokenVal->msgType = msgType;
  brokenVal->val = pMsgSendInfo;
D
dapan1121 已提交
1744
  brokenVal->clone = schCloneSMsgSendInfo;
D
dapan1121 已提交
1745
  brokenVal->freeFunc = schFreeRpcCtxVal;
L
Liu Jicong 已提交
1746

D
dapan1121 已提交
1747 1748 1749 1750
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
1751 1752
  taosMemoryFreeClear(pMsgSendInfo->param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1753 1754 1755 1756

  SCH_RET(code);
}

D
dapan1121 已提交
1757
int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
H
Hongze Cheng 已提交
1758
  int32_t       code = 0;
D
dapan1121 已提交
1759 1760
  SMsgSendInfo *pReadyMsgSendInfo = NULL;
  SMsgSendInfo *pExplainMsgSendInfo = NULL;
D
dapan1121 已提交
1761 1762 1763 1764 1765 1766

  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pCtx->args) {
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1767

D
dapan1121 已提交
1768 1769
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_RES_READY, &pReadyMsgSendInfo));
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_EXPLAIN, &pExplainMsgSendInfo));
D
dapan1121 已提交
1770

H
Hongze Cheng 已提交
1771
  int32_t    msgType = TDMT_VND_RES_READY_RSP;
D
dapan1121 已提交
1772 1773 1774
  SRpcCtxVal ctxVal = {.val = pReadyMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
D
dapan1121 已提交
1775 1776 1777
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1778 1779
  msgType = TDMT_VND_EXPLAIN_RSP;
  ctxVal.val = pExplainMsgSendInfo;
D
dapan1121 已提交
1780 1781 1782 1783
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
1784

D
dapan1121 已提交
1785 1786
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, false));

D
dapan1121 已提交
1787 1788 1789 1790 1791
  return TSDB_CODE_SUCCESS;

_return:

  taosHashCleanup(pCtx->args);
H
Hongze Cheng 已提交
1792

D
dapan1121 已提交
1793 1794 1795 1796 1797 1798 1799 1800 1801
  if (pReadyMsgSendInfo) {
    taosMemoryFreeClear(pReadyMsgSendInfo->param);
    taosMemoryFreeClear(pReadyMsgSendInfo);
  }

  if (pExplainMsgSendInfo) {
    taosMemoryFreeClear(pExplainMsgSendInfo->param);
    taosMemoryFreeClear(pExplainMsgSendInfo);
  }
D
dapan1121 已提交
1802 1803 1804 1805 1806

  SCH_RET(code);
}

int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
L
Liu Jicong 已提交
1807
  int32_t              code = 0;
D
dapan1121 已提交
1808
  SSchHbCallbackParam *param = NULL;
X
Xiaoyu Wang 已提交
1809 1810
  SMsgSendInfo        *pMsgSendInfo = NULL;
  SQueryNodeAddr      *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
L
Liu Jicong 已提交
1811
  SQueryNodeEpId       epId = {0};
D
dapan1121 已提交
1812 1813 1814

  epId.nodeId = addr->nodeId;
  memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
D
dapan1121 已提交
1815 1816 1817 1818 1819 1820

  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pCtx->args) {
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1821

D
dapan1121 已提交
1822
  pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
1823 1824 1825 1826 1827
  if (NULL == pMsgSendInfo) {
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1828
  param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1829
  if (NULL == param) {
D
dapan1121 已提交
1830
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1831 1832 1833
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

L
Liu Jicong 已提交
1834
  int32_t              msgType = TDMT_VND_QUERY_HEARTBEAT_RSP;
D
dapan1121 已提交
1835 1836 1837
  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_QUERY_HEARTBEAT, &fp));

D
dapan1121 已提交
1838
  param->nodeEpId = epId;
D
dapan1121 已提交
1839
  param->transport = pJob->transport;
L
Liu Jicong 已提交
1840

D
dapan1121 已提交
1841 1842 1843
  pMsgSendInfo->param = param;
  pMsgSendInfo->fp = fp;

D
dapan1121 已提交
1844
  SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
D
dapan1121 已提交
1845 1846 1847 1848 1849
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1850 1851
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, true));

D
dapan1121 已提交
1852 1853 1854 1855 1856
  return TSDB_CODE_SUCCESS;

_return:

  taosHashCleanup(pCtx->args);
D
dapan1121 已提交
1857 1858
  taosMemoryFreeClear(param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
1859 1860 1861 1862

  SCH_RET(code);
}

D
dapan1121 已提交
1863
int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
L
Liu Jicong 已提交
1864
  int32_t     code = 0;
D
dapan1121 已提交
1865 1866 1867
  SSchHbTrans hb = {0};

  hb.trans.transInst = pJob->transport;
L
Liu Jicong 已提交
1868

D
dapan1121 已提交
1869 1870 1871 1872 1873
  SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx));

  code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans));
  if (code) {
    schFreeRpcCtx(&hb.rpcCtx);
L
Liu Jicong 已提交
1874

D
dapan1121 已提交
1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888
    if (HASH_NODE_EXIST(code)) {
      *exist = true;
      return TSDB_CODE_SUCCESS;
    }

    qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
    SCH_ERR_RET(code);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHeader **pDst) {
  if (pSrc->isHbParam) {
D
dapan1121 已提交
1889
    SSchHbCallbackParam *dst = taosMemoryMalloc(sizeof(SSchHbCallbackParam));
D
dapan1121 已提交
1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900
    if (NULL == dst) {
      qError("malloc SSchHbCallbackParam failed");
      SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    memcpy(dst, pSrc, sizeof(*dst));
    *pDst = (SSchCallbackParamHeader *)dst;

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1901
  SSchTaskCallbackParam *dst = taosMemoryMalloc(sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
1902 1903 1904 1905
  if (NULL == dst) {
    qError("malloc SSchTaskCallbackParam failed");
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
1906

D
dapan1121 已提交
1907 1908
  memcpy(dst, pSrc, sizeof(*dst));
  *pDst = (SSchCallbackParamHeader *)dst;
L
Liu Jicong 已提交
1909

D
dapan1121 已提交
1910 1911 1912
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1913 1914
int32_t schCloneSMsgSendInfo(void *src, void **dst) {
  SMsgSendInfo *pSrc = src;
L
Liu Jicong 已提交
1915
  int32_t       code = 0;
D
dapan1121 已提交
1916
  SMsgSendInfo *pDst = taosMemoryMalloc(sizeof(*pSrc));
D
dapan1121 已提交
1917
  if (NULL == pDst) {
D
dapan1121 已提交
1918 1919 1920 1921
    qError("malloc SMsgSendInfo for rpcCtx failed, len:%d", (int32_t)sizeof(*pSrc));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1922 1923
  memcpy(pDst, pSrc, sizeof(*pSrc));
  pDst->param = NULL;
D
dapan1121 已提交
1924

D
dapan1121 已提交
1925
  SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
D
dapan1121 已提交
1926

D
dapan1121 已提交
1927
  *dst = pDst;
D
dapan1121 已提交
1928

D
dapan1121 已提交
1929
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1930

D
dapan1121 已提交
1931 1932
_return:

D
dapan1121 已提交
1933
  taosMemoryFreeClear(pDst);
D
dapan1121 已提交
1934 1935 1936 1937 1938 1939 1940
  SCH_RET(code);
}

int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
  int32_t code = 0;
  memcpy(&pDst->brokenVal, &pSrc->brokenVal, sizeof(pSrc->brokenVal));
  pDst->brokenVal.val = NULL;
L
Liu Jicong 已提交
1941

D
dapan1121 已提交
1942
  SCH_ERR_RET(schCloneSMsgSendInfo(pSrc->brokenVal.val, &pDst->brokenVal.val));
D
dapan1121 已提交
1943 1944 1945 1946 1947 1948 1949 1950

  pDst->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
  if (NULL == pDst->args) {
    qError("taosHashInit %d RpcCtx failed", 1);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SRpcCtxVal dst = {0};
X
Xiaoyu Wang 已提交
1951
  void      *pIter = taosHashIterate(pSrc->args, NULL);
D
dapan1121 已提交
1952 1953
  while (pIter) {
    SRpcCtxVal *pVal = (SRpcCtxVal *)pIter;
X
Xiaoyu Wang 已提交
1954
    int32_t    *msgType = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
1955

D
dapan1121 已提交
1956 1957
    dst = *pVal;
    dst.val = NULL;
L
Liu Jicong 已提交
1958

D
dapan1121 已提交
1959
    SCH_ERR_JRET(schCloneSMsgSendInfo(pVal->val, &dst.val));
L
Liu Jicong 已提交
1960

D
dapan1121 已提交
1961
    if (taosHashPut(pDst->args, msgType, sizeof(*msgType), &dst, sizeof(dst))) {
D
dapan1121 已提交
1962
      qError("taosHashPut msg %d to rpcCtx failed", *msgType);
D
dapan1121 已提交
1963
      (*dst.freeFunc)(dst.val);
D
dapan1121 已提交
1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    pIter = taosHashIterate(pSrc->args, pIter);
  }

  return TSDB_CODE_SUCCESS;

_return:

  schFreeRpcCtx(pDst);
  SCH_RET(code);
}

L
Liu Jicong 已提交
1978 1979
int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet *epSet, int32_t msgType, void *msg,
                        uint32_t msgSize, bool persistHandle, SRpcCtx *ctx) {
D
dapan1121 已提交
1980
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
1981 1982 1983

  SSchTrans *trans = (SSchTrans *)transport;

D
dapan1121 已提交
1984 1985
  SMsgSendInfo *pMsgSendInfo = NULL;
  SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, msgType, &pMsgSendInfo));
D
dapan1121 已提交
1986 1987 1988

  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
L
Liu Jicong 已提交
1989
  pMsgSendInfo->msgInfo.handle = trans->transHandle;
D
dapan1121 已提交
1990
  pMsgSendInfo->msgType = msgType;
D
dapan1121 已提交
1991

L
Liu Jicong 已提交
1992 1993 1994 1995 1996
  qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "instance:%p, handle:%p", TMSG_INFO(msgType),
         ntohl(((SMsgHead *)msg)->vgId), epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, pJob->refId,
         trans->transInst, trans->transHandle);

  int64_t transporterId = 0;
D
dapan1121 已提交
1997
  code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
D
dapan1121 已提交
1998 1999 2000
  if (code) {
    SCH_ERR_JRET(code);
  }
D
dapan1121 已提交
2001

D
dapan1121 已提交
2002
  SCH_TASK_DLOG("req msg sent, refId:%" PRIx64 ", type:%d, %s", pJob->refId, msgType, TMSG_INFO(msgType));
D
dapan1121 已提交
2003 2004 2005
  return TSDB_CODE_SUCCESS;

_return:
L
Liu Jicong 已提交
2006

D
dapan1121 已提交
2007 2008 2009 2010
  if (pMsgSendInfo) {
    taosMemoryFreeClear(pMsgSendInfo->param);
    taosMemoryFreeClear(pMsgSendInfo);
  }
H
Hongze Cheng 已提交
2011

D
dapan1121 已提交
2012 2013 2014
  SCH_RET(code);
}

D
dapan1121 已提交
2015 2016
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
  SSchedulerHbReq req = {0};
L
Liu Jicong 已提交
2017 2018 2019 2020
  int32_t         code = 0;
  SRpcCtx         rpcCtx = {0};
  SSchTrans       trans = {0};
  int32_t         msgType = TDMT_VND_QUERY_HEARTBEAT;
D
dapan1121 已提交
2021

L
Liu Jicong 已提交
2022
  req.header.vgId = nodeEpId->nodeId;
D
dapan1121 已提交
2023 2024 2025 2026 2027
  req.sId = schMgmt.sId;
  memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));

  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
L
Liu Jicong 已提交
2028 2029
    qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
           nodeEpId->ep.port);
D
dapan1121 已提交
2030 2031 2032 2033 2034 2035 2036
    SCH_ERR_RET(code);
  }

  SCH_LOCK(SCH_WRITE, &hb->lock);
  code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx);
  memcpy(&trans, &hb->trans, sizeof(trans));
  SCH_UNLOCK(SCH_WRITE, &hb->lock);
L
Liu Jicong 已提交
2037

D
dapan1121 已提交
2038
  SCH_ERR_RET(code);
L
Liu Jicong 已提交
2039

D
dapan1121 已提交
2040 2041 2042 2043 2044
  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
  if (msgSize < 0) {
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
2045
  void *msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2046 2047 2048 2049
  if (NULL == msg) {
    qError("calloc hb req %d failed", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
2050

D
dapan1121 已提交
2051 2052 2053 2054 2055
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
2056
  SMsgSendInfo *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
2057 2058 2059 2060 2061
  if (NULL == pMsgSendInfo) {
    qError("calloc SMsgSendInfo failed");
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
2062
  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
D
dapan1121 已提交
2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078
  if (NULL == param) {
    qError("calloc SSchTaskCallbackParam failed");
    SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  __async_send_cb_fn_t fp = NULL;
  SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));

  param->transport = trans.transInst;

  pMsgSendInfo->param = param;
  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
  pMsgSendInfo->msgInfo.handle = trans.transHandle;
  pMsgSendInfo->msgType = msgType;
  pMsgSendInfo->fp = fp;
L
Liu Jicong 已提交
2079 2080 2081

  int64_t transporterId = 0;
  SEpSet  epSet = {.inUse = 0, .numOfEps = 1};
D
dapan1121 已提交
2082
  memcpy(&epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
D
dapan1121 已提交
2083

L
Liu Jicong 已提交
2084 2085 2086
  qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle,
         nodeEpId->ep.fqdn, nodeEpId->ep.port);

D
dapan1121 已提交
2087 2088
  code = asyncSendMsgToServerExt(trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx);
  if (code) {
L
Liu Jicong 已提交
2089 2090
    qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s", trans.transInst,
           trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code));
D
dapan1121 已提交
2091 2092 2093
    SCH_ERR_JRET(code);
  }

D
dapan1121 已提交
2094
  qDebug("hb msg sent");
D
dapan1121 已提交
2095 2096 2097 2098
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
2099 2100 2101
  taosMemoryFreeClear(msg);
  taosMemoryFreeClear(param);
  taosMemoryFreeClear(pMsgSendInfo);
D
dapan1121 已提交
2102 2103 2104 2105
  schFreeRpcCtx(&rpcCtx);
  SCH_RET(code);
}

D
dapan1121 已提交
2106
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
D
dapan1121 已提交
2107
  uint32_t msgSize = 0;
X
Xiaoyu Wang 已提交
2108
  void    *msg = NULL;
L
Liu Jicong 已提交
2109 2110 2111 2112 2113
  int32_t  code = 0;
  bool     isCandidateAddr = false;
  bool     persistHandle = false;
  SRpcCtx  rpcCtx = {0};

D
dapan1121 已提交
2114
  if (NULL == addr) {
D
dapan1121 已提交
2115
    addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
D
dapan1121 已提交
2116 2117 2118
    isCandidateAddr = true;
  }

L
Liu Jicong 已提交
2119
  SEpSet epSet = addr->epSet;
H
Haojun Liao 已提交
2120

D
dapan1121 已提交
2121
  switch (msgType) {
H
Haojun Liao 已提交
2122
    case TDMT_VND_CREATE_TABLE:
X
Xiaoyu Wang 已提交
2123
    case TDMT_VND_DROP_TABLE:
X
Xiaoyu Wang 已提交
2124
    case TDMT_VND_ALTER_TABLE:
D
dapan1121 已提交
2125
    case TDMT_VND_SUBMIT: {
D
dapan1121 已提交
2126
      msgSize = pTask->msgLen;
wafwerar's avatar
wafwerar 已提交
2127
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2128 2129 2130 2131 2132 2133
      if (NULL == msg) {
        SCH_TASK_ELOG("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      memcpy(msg, pTask->msg, msgSize);
D
dapan1121 已提交
2134 2135
      break;
    }
2136

D
dapan1121 已提交
2137
    case TDMT_VND_QUERY: {
D
dapan1121 已提交
2138
      SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
L
Liu Jicong 已提交
2139

2140 2141
      uint32_t len = strlen(pJob->sql);
      msgSize = sizeof(SSubQueryMsg) + pTask->msgLen + len;
wafwerar's avatar
wafwerar 已提交
2142
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2143
      if (NULL == msg) {
D
dapan 已提交
2144
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2145 2146 2147 2148
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

      SSubQueryMsg *pMsg = msg;
D
dapan 已提交
2149
      pMsg->header.vgId = htonl(addr->nodeId);
L
Liu Jicong 已提交
2150 2151 2152 2153 2154
      pMsg->sId = htobe64(schMgmt.sId);
      pMsg->queryId = htobe64(pJob->queryId);
      pMsg->taskId = htobe64(pTask->taskId);
      pMsg->refId = htobe64(pJob->refId);
      pMsg->taskType = TASK_TYPE_TEMP;
D
dapan1121 已提交
2155
      pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob);
L
Liu Jicong 已提交
2156 2157
      pMsg->phyLen = htonl(pTask->msgLen);
      pMsg->sqlLen = htonl(len);
2158 2159 2160

      memcpy(pMsg->msg, pJob->sql, len);
      memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
D
dapan1121 已提交
2161 2162

      persistHandle = true;
D
dapan1121 已提交
2163
      break;
2164 2165
    }

D
dapan1121 已提交
2166
    case TDMT_VND_RES_READY: {
S
Shengliang Guan 已提交
2167
      msgSize = sizeof(SResReadyReq);
wafwerar's avatar
wafwerar 已提交
2168
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2169
      if (NULL == msg) {
D
dapan 已提交
2170
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2171 2172 2173
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }

S
Shengliang Guan 已提交
2174
      SResReadyReq *pMsg = msg;
L
Liu Jicong 已提交
2175 2176 2177 2178

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
2179
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
2180
      pMsg->taskId = htobe64(pTask->taskId);
D
dapan1121 已提交
2181 2182 2183
      break;
    }
    case TDMT_VND_FETCH: {
S
Shengliang Guan 已提交
2184
      msgSize = sizeof(SResFetchReq);
wafwerar's avatar
wafwerar 已提交
2185
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2186
      if (NULL == msg) {
D
dapan 已提交
2187
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2188 2189
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
2190

S
Shengliang Guan 已提交
2191
      SResFetchReq *pMsg = msg;
L
Liu Jicong 已提交
2192 2193 2194 2195

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
2196
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
2197
      pMsg->taskId = htobe64(pTask->taskId);
D
dapan1121 已提交
2198

D
dapan1121 已提交
2199 2200
      break;
    }
L
Liu Jicong 已提交
2201
    case TDMT_VND_DROP_TASK: {
S
Shengliang Guan 已提交
2202
      msgSize = sizeof(STaskDropReq);
wafwerar's avatar
wafwerar 已提交
2203
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2204
      if (NULL == msg) {
D
dapan 已提交
2205
        SCH_TASK_ELOG("calloc %d failed", msgSize);
D
dapan1121 已提交
2206 2207
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
L
Liu Jicong 已提交
2208

S
Shengliang Guan 已提交
2209
      STaskDropReq *pMsg = msg;
L
Liu Jicong 已提交
2210 2211 2212 2213

      pMsg->header.vgId = htonl(addr->nodeId);

      pMsg->sId = htobe64(schMgmt.sId);
D
dapan1121 已提交
2214
      pMsg->queryId = htobe64(pJob->queryId);
L
Liu Jicong 已提交
2215 2216
      pMsg->taskId = htobe64(pTask->taskId);
      pMsg->refId = htobe64(pJob->refId);
D
dapan1121 已提交
2217 2218 2219
      break;
    }
    case TDMT_VND_QUERY_HEARTBEAT: {
D
dapan1121 已提交
2220
      SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &rpcCtx));
L
Liu Jicong 已提交
2221

D
dapan1121 已提交
2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232
      SSchedulerHbReq req = {0};
      req.sId = schMgmt.sId;
      req.header.vgId = addr->nodeId;
      req.epId.nodeId = addr->nodeId;
      memcpy(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));

      msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
      if (msgSize < 0) {
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
wafwerar's avatar
wafwerar 已提交
2233
      msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2234 2235 2236 2237 2238 2239 2240 2241
      if (NULL == msg) {
        SCH_JOB_ELOG("calloc %d failed", msgSize);
        SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
      if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
        SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
      }
D
dapan1121 已提交
2242 2243

      persistHandle = true;
D
dapan1121 已提交
2244 2245 2246
      break;
    }
    default:
D
dapan1121 已提交
2247
      SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType);
D
dapan1121 已提交
2248 2249 2250 2251
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
      break;
  }

D
dapan1121 已提交
2252
  SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType);
D
dapan1121 已提交
2253

D
dapan1121 已提交
2254
  SSchTrans trans = {.transInst = pJob->transport, .transHandle = SCH_GET_TASK_HANDLE(pTask)};
L
Liu Jicong 已提交
2255 2256
  SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle,
                               (rpcCtx.args ? &rpcCtx : NULL)));
D
dapan1121 已提交
2257

D
dapan1121 已提交
2258 2259
  if (msgType == TDMT_VND_QUERY) {
    SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.transHandle));
D
dapan1121 已提交
2260
  }
L
Liu Jicong 已提交
2261

D
dapan1121 已提交
2262 2263 2264 2265
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
2266
  SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
D
dapan1121 已提交
2267
  schFreeRpcCtx(&rpcCtx);
L
Liu Jicong 已提交
2268

wafwerar's avatar
wafwerar 已提交
2269
  taosMemoryFreeClear(msg);
D
dapan1121 已提交
2270 2271
  SCH_RET(code);
}
D
dapan1121 已提交
2272

D
dapan1121 已提交
2273 2274
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
L
Liu Jicong 已提交
2275
  SQueryNodeEpId  epId = {0};
D
dapan1121 已提交
2276 2277 2278

  epId.nodeId = addr->nodeId;
  memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
L
Liu Jicong 已提交
2279

D
dapan1121 已提交
2280
#if 1
D
dapan1121 已提交
2281 2282
  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
D
dapan1121 已提交
2283
    bool exist = false;
D
dapan1121 已提交
2284
    SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist));
D
dapan1121 已提交
2285
    if (!exist) {
D
dapan1121 已提交
2286
      SCH_ERR_RET(schBuildAndSendHbMsg(&epId));
D
dapan1121 已提交
2287
    }
D
dapan1121 已提交
2288
  }
D
dapan1121 已提交
2289
#endif
D
dapan1121 已提交
2290 2291 2292

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
2293

D
dapan1121 已提交
2294
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
2295
  int8_t  status = 0;
D
dapan1121 已提交
2296
  int32_t code = 0;
D
dapan1121 已提交
2297 2298

  atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
L
Liu Jicong 已提交
2299

D
dapan1121 已提交
2300
  if (schJobNeedToStop(pJob, &status)) {
D
dapan1121 已提交
2301
    SCH_TASK_DLOG("no need to launch task cause of job status, job status:%s", jobTaskStatusStr(status));
L
Liu Jicong 已提交
2302

D
dapan1121 已提交
2303
    SCH_RET(atomic_load_32(&pJob->errCode));
D
dapan1121 已提交
2304
  }
D
dapan1121 已提交
2305 2306 2307 2308 2309 2310

  // NOTE: race condition: the task should be put into the hash table before send msg to server
  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) {
    SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING);
  }
L
Liu Jicong 已提交
2311

D
dapan1121 已提交
2312
  SSubplan *plan = pTask->plan;
D
dapan1121 已提交
2313

L
Liu Jicong 已提交
2314
  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
D
dapan1121 已提交
2315
    code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen);
D
dapan1121 已提交
2316
    if (TSDB_CODE_SUCCESS != code) {
L
Liu Jicong 已提交
2317 2318
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
                    pTask->msgLen);
D
dapan1121 已提交
2319
      SCH_ERR_RET(code);
2320
    } else {
D
dapan1121 已提交
2321
      SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg);
D
dapan1121 已提交
2322
    }
D
dapan1121 已提交
2323
  }
L
Liu Jicong 已提交
2324

D
dapan1121 已提交
2325
  SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
D
dapan1121 已提交
2326

D
dapan1121 已提交
2327 2328 2329
  if (SCH_IS_QUERY_JOB(pJob)) {
    SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
  }
L
Liu Jicong 已提交
2330

D
dapan1121 已提交
2331
  SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
L
Liu Jicong 已提交
2332

D
dapan1121 已提交
2333 2334 2335 2336 2337
  return TSDB_CODE_SUCCESS;
}

// Note: no more error processing, handled in function internal
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
L
Liu Jicong 已提交
2338
  bool    enough = false;
D
dapan1121 已提交
2339 2340
  int32_t code = 0;

D
dapan1121 已提交
2341 2342
  SCH_SET_TASK_HANDLE(pTask, NULL);

D
dapan1121 已提交
2343 2344 2345 2346 2347 2348 2349 2350 2351 2352
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));

    if (enough) {
      SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
    }
  } else {
    SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
  }

D
dapan1121 已提交
2353
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2354 2355

_return:
D
dapan1121 已提交
2356

D
dapan1121 已提交
2357
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
D
dapan1121 已提交
2358
}
D
dapan1121 已提交
2359

D
dapan1121 已提交
2360
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
D
dapan 已提交
2361
  for (int32_t i = 0; i < level->taskNum; ++i) {
D
dapan1121 已提交
2362
    SSchTask *pTask = taosArrayGet(level->subTasks, i);
D
dapan1121 已提交
2363

D
dapan1121 已提交
2364
    SCH_ERR_RET(schLaunchTask(pJob, pTask));
D
dapan1121 已提交
2365
  }
D
dapan1121 已提交
2366 2367 2368 2369 2370 2371

  return TSDB_CODE_SUCCESS;
}

int32_t schLaunchJob(SSchJob *pJob) {
  SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
L
Liu Jicong 已提交
2372

D
dapan1121 已提交
2373 2374 2375 2376 2377 2378
  SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_EXECUTING));

  SCH_ERR_RET(schCheckJobNeedFlowCtrl(pJob, level));

  SCH_ERR_RET(schLaunchLevelTasks(pJob, level));

D
dapan1121 已提交
2379
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2380 2381
}

D
dapan1121 已提交
2382
void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) {
D
dapan1121 已提交
2383
  if (NULL == pTask->execNodes) {
D
dapan1121 已提交
2384
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
2385 2386
    return;
  }
H
Haojun Liao 已提交
2387

D
dapan1121 已提交
2388
  int32_t size = (int32_t)taosArrayGetSize(pTask->execNodes);
L
Liu Jicong 已提交
2389

D
dapan1121 已提交
2390
  if (size <= 0) {
D
dapan1121 已提交
2391
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
D
dapan1121 已提交
2392 2393
    return;
  }
H
Haojun Liao 已提交
2394

D
dapan1121 已提交
2395
  SSchNodeInfo *nodeInfo = NULL;
D
dapan1121 已提交
2396
  for (int32_t i = 0; i < size; ++i) {
D
dapan1121 已提交
2397 2398
    nodeInfo = (SSchNodeInfo *)taosArrayGet(pTask->execNodes, i);
    SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
D
dapan1121 已提交
2399

D
dapan1121 已提交
2400
    schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK);
D
dapan1121 已提交
2401
  }
D
dapan1121 已提交
2402 2403

  SCH_TASK_DLOG("task has %d exec address", size);
D
dapan1121 已提交
2404 2405 2406
}

void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
D
dapan1121 已提交
2407 2408 2409 2410
  if (!SCH_IS_NEED_DROP_JOB(pJob)) {
    return;
  }

D
dapan1121 已提交
2411
  void *pIter = taosHashIterate(list, NULL);
D
dapan1121 已提交
2412
  while (pIter) {
D
dapan1121 已提交
2413
    SSchTask *pTask = *(SSchTask **)pIter;
H
Haojun Liao 已提交
2414

D
dapan1121 已提交
2415
    schDropTaskOnExecutedNode(pJob, pTask);
L
Liu Jicong 已提交
2416

D
dapan1121 已提交
2417
    pIter = taosHashIterate(list, pIter);
L
Liu Jicong 已提交
2418
  }
D
dapan1121 已提交
2419
}
H
Haojun Liao 已提交
2420

D
dapan1121 已提交
2421 2422 2423 2424
void schDropJobAllTasks(SSchJob *pJob) {
  schDropTaskInHashList(pJob, pJob->execTasks);
  schDropTaskInHashList(pJob, pJob->succTasks);
  schDropTaskInHashList(pJob, pJob->failTasks);
D
dapan1121 已提交
2425
}
2426

D
dapan1121 已提交
2427
int32_t schCancelJob(SSchJob *pJob) {
L
Liu Jicong 已提交
2428
  // TODO
2429
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
2430
  // TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
D
dapan1121 已提交
2431 2432
}

D
dapan1121 已提交
2433
void schCloseJobRef(void) {
2434
  if (!atomic_load_8((int8_t *)&schMgmt.exit)) {
D
dapan1121 已提交
2435 2436
    return;
  }
2437

D
dapan1121 已提交
2438 2439 2440 2441 2442 2443 2444 2445
  SCH_LOCK(SCH_WRITE, &schMgmt.lock);
  if (atomic_load_32(&schMgmt.jobNum) <= 0 && schMgmt.jobRef >= 0) {
    taosCloseRef(schMgmt.jobRef);
    schMgmt.jobRef = -1;
  }
  SCH_UNLOCK(SCH_WRITE, &schMgmt.lock);
}

D
dapan1121 已提交
2446 2447 2448 2449 2450 2451 2452
void schFreeJobImpl(void *job) {
  if (NULL == job) {
    return;
  }

  SSchJob *pJob = job;
  uint64_t queryId = pJob->queryId;
L
Liu Jicong 已提交
2453
  int64_t  refId = pJob->refId;
D
dapan1121 已提交
2454 2455 2456 2457 2458 2459 2460

  if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
    schCancelJob(pJob);
  }

  schDropJobAllTasks(pJob);

L
Liu Jicong 已提交
2461 2462
  pJob->subPlans = NULL;  // it is a reference to pDag->pSubplans

D
dapan1121 已提交
2463
  int32_t numOfLevels = taosArrayGetSize(pJob->levels);
L
Liu Jicong 已提交
2464
  for (int32_t i = 0; i < numOfLevels; ++i) {
D
dapan1121 已提交
2465 2466 2467
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);

    int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
L
Liu Jicong 已提交
2468 2469
    for (int32_t j = 0; j < numOfTasks; ++j) {
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
D
dapan1121 已提交
2470 2471 2472 2473 2474
      schFreeTask(pTask);
    }

    taosArrayDestroy(pLevel->subTasks);
  }
L
Liu Jicong 已提交
2475

D
dapan1121 已提交
2476 2477
  schFreeFlowCtrl(pJob);

D
dapan1121 已提交
2478 2479 2480
  taosHashCleanup(pJob->execTasks);
  taosHashCleanup(pJob->failTasks);
  taosHashCleanup(pJob->succTasks);
L
Liu Jicong 已提交
2481

D
dapan1121 已提交
2482 2483
  taosArrayDestroy(pJob->levels);
  taosArrayDestroy(pJob->nodeList);
D
dapan1121 已提交
2484
  taosArrayDestroy(pJob->dataSrcTasks);
L
Liu Jicong 已提交
2485

D
dapan1121 已提交
2486 2487
  qExplainFreeCtx(pJob->explainCtx);

D
dapan1121 已提交
2488 2489 2490 2491 2492 2493
  if (SCH_IS_QUERY_JOB(pJob)) {
    taosArrayDestroy((SArray *)pJob->queryRes);
  } else {
    tFreeSSubmitRsp((SSubmitRsp*)pJob->queryRes);
  }

wafwerar's avatar
wafwerar 已提交
2494 2495
  taosMemoryFreeClear(pJob->resData);
  taosMemoryFreeClear(pJob);
D
dapan1121 已提交
2496

L
Liu Jicong 已提交
2497
  qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob);
D
dapan1121 已提交
2498 2499 2500 2501

  atomic_sub_fetch_32(&schMgmt.jobNum, 1);

  schCloseJobRef();
D
dapan1121 已提交
2502 2503
}

L
Liu Jicong 已提交
2504
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
D
dapan1121 已提交
2505
                              int64_t startTs, bool syncSchedule) {
L
Liu Jicong 已提交
2506
  qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
H
Haojun Liao 已提交
2507

D
dapan1121 已提交
2508
  if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) {
L
Liu Jicong 已提交
2509
    qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pDag->queryId);
D
dapan1121 已提交
2510 2511
  }

L
Liu Jicong 已提交
2512
  int32_t  code = 0;
D
dapan1121 已提交
2513
  SSchJob *pJob = NULL;
D
dapan1121 已提交
2514
  SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, syncSchedule));
D
dapan1121 已提交
2515

D
dapan1121 已提交
2516
  SCH_ERR_JRET(schLaunchJob(pJob));
2517

D
dapan1121 已提交
2518
  *job = pJob->refId;
L
Liu Jicong 已提交
2519

D
dapan 已提交
2520
  if (syncSchedule) {
D
dapan1121 已提交
2521
    SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2522
    tsem_wait(&pJob->rspSem);
D
dapan1121 已提交
2523 2524
  }

D
dapan1121 已提交
2525
  SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2526

D
dapan1121 已提交
2527
  schReleaseJob(pJob->refId);
L
Liu Jicong 已提交
2528

D
dapan1121 已提交
2529
  return TSDB_CODE_SUCCESS;
2530

D
dapan1121 已提交
2531
_return:
D
dapan1121 已提交
2532

D
dapan1121 已提交
2533
  schFreeJobImpl(pJob);
D
dapan1121 已提交
2534
  SCH_RET(code);
2535
}
D
dapan1121 已提交
2536

D
dapan1121 已提交
2537
int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
H
Hongze Cheng 已提交
2538
                             bool syncSchedule) {
D
dapan1121 已提交
2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549
  qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);

  int32_t  code = 0;
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
  if (NULL == pJob) {
    qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pJob->sql = sql;
  pJob->attr.queryJob = true;
D
dapan1121 已提交
2550
  pJob->attr.explainMode = pDag->explainInfo.mode;
D
dapan1121 已提交
2551 2552
  pJob->queryId = pDag->queryId;
  pJob->subPlans = pDag->pSubplans;
D
dapan1121 已提交
2553

D
dapan1121 已提交
2554
  SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData));
D
dapan1121 已提交
2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584

  int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
  if (refId < 0) {
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
    SCH_ERR_JRET(terrno);
  }

  if (NULL == schAcquireJob(refId)) {
    SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
    SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  pJob->refId = refId;

  SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);

  pJob->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
  *job = pJob->refId;
  SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));

  schReleaseJob(pJob->refId);

  return TSDB_CODE_SUCCESS;

_return:

  schFreeJobImpl(pJob);
  SCH_RET(code);
}

D
dapan1121 已提交
2585
int32_t schedulerInit(SSchedulerCfg *cfg) {
D
dapan1121 已提交
2586
  if (schMgmt.jobRef >= 0) {
D
dapan1121 已提交
2587 2588 2589 2590 2591 2592
    qError("scheduler already initialized");
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  if (cfg) {
    schMgmt.cfg = *cfg;
L
Liu Jicong 已提交
2593

D
dapan1121 已提交
2594
    if (schMgmt.cfg.maxJobNum == 0) {
D
dapan1121 已提交
2595
      schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
D
dapan1121 已提交
2596
    }
D
dapan1121 已提交
2597 2598 2599
    if (schMgmt.cfg.maxNodeTableNum <= 0) {
      schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
    }
D
dapan1121 已提交
2600
  } else {
D
dapan1121 已提交
2601 2602
    schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
    schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
D
dapan1121 已提交
2603
  }
L
Liu Jicong 已提交
2604

D
dapan1121 已提交
2605 2606
  schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl);
  if (schMgmt.jobRef < 0) {
D
dapan1121 已提交
2607 2608 2609 2610 2611 2612 2613
    qError("init schduler jobRef failed, num:%u", schMgmt.cfg.maxJobNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  schMgmt.hbConnections = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
  if (NULL == schMgmt.hbConnections) {
    qError("taosHashInit hb connections failed");
D
dapan1121 已提交
2614 2615 2616
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
2617
  if (taosGetSystemUUID((char *)&schMgmt.sId, sizeof(schMgmt.sId))) {
D
dapan1121 已提交
2618 2619 2620 2621
    qError("generate schdulerId failed, errno:%d", errno);
    SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR);
  }

L
Liu Jicong 已提交
2622 2623
  qInfo("scheduler %" PRIx64 " initizlized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum);

D
dapan1121 已提交
2624 2625 2626
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2627
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
D
dapan1121 已提交
2628
                         int64_t startTs, SQueryResult *pRes) {
H
Haojun Liao 已提交
2629
  if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
D
dapan1121 已提交
2630 2631 2632
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
2633 2634 2635 2636
  int32_t code = 0;
  
  *pJob = 0;
  
D
dapan1121 已提交
2637
  if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
D
dapan1121 已提交
2638
    SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true));
D
dapan1121 已提交
2639
  } else {
D
dapan1121 已提交
2640
    SCH_ERR_JRET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true));
D
dapan1121 已提交
2641
  }
D
dapan1121 已提交
2642

D
dapan1121 已提交
2643
_return:
D
dapan1121 已提交
2644

D
dapan1121 已提交
2645 2646
  if (*pJob) {
    SSchJob *job = schAcquireJob(*pJob);
L
Liu Jicong 已提交
2647

D
dapan1121 已提交
2648 2649
    pRes->code = atomic_load_32(&job->errCode);
    pRes->numOfRows = job->resNumOfRows;
D
dapan1121 已提交
2650 2651
    pRes->res = job->queryRes;
    job->queryRes = NULL;
L
Liu Jicong 已提交
2652

D
dapan1121 已提交
2653 2654 2655 2656
    schReleaseJob(*pJob);
  }

  return code;
D
dapan1121 已提交
2657 2658
}

L
Liu Jicong 已提交
2659
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, const char *sql, int64_t *pJob) {
2660
  if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
D
dapan1121 已提交
2661 2662 2663
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
2664 2665 2666
  if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
    SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false));
  } else {
D
dapan1121 已提交
2667
    SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false));
D
dapan1121 已提交
2668
  }
L
Liu Jicong 已提交
2669

D
dapan1121 已提交
2670
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2671 2672
}

L
Liu Jicong 已提交
2673
#if 0
X
Xiaoyu Wang 已提交
2674
int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks) {
X
Xiaoyu Wang 已提交
2675
  if (NULL == pDag || pDag->numOfSubplans <= 0 || LIST_LENGTH(pDag->pSubplans) == 0) {
D
dapan1121 已提交
2676 2677 2678
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

X
Xiaoyu Wang 已提交
2679
  int32_t levelNum = LIST_LENGTH(pDag->pSubplans);
D
dapan1121 已提交
2680 2681 2682 2683 2684
  if (1 != levelNum) {
    qError("invalid level num: %d", levelNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

X
Xiaoyu Wang 已提交
2685 2686
  SNodeListNode *plans = (SNodeListNode*)nodesListGetNode(pDag->pSubplans, 0);
  int32_t taskNum = LIST_LENGTH(plans->pNodeList);
D
dapan1121 已提交
2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703
  if (taskNum <= 0) {
    qError("invalid task num: %d", taskNum);
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SArray *info = taosArrayInit(taskNum, sizeof(STaskInfo));
  if (NULL == info) {
    qError("taosArrayInit %d taskInfo failed", taskNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  STaskInfo tInfo = {0};
  char *msg = NULL;
  int32_t msgLen = 0;
  int32_t code = 0;
  
  for (int32_t i = 0; i < taskNum; ++i) {
X
Xiaoyu Wang 已提交
2704
    SSubplan *plan = (SSubplan*)nodesListGetNode(plans->pNodeList, i);
D
dapan1121 已提交
2705 2706 2707
    tInfo.addr = plan->execNode;

    code = qSubPlanToString(plan, &msg, &msgLen);
D
dapan1121 已提交
2708
    if (TSDB_CODE_SUCCESS != code) {
D
dapan1121 已提交
2709 2710 2711 2712 2713 2714 2715 2716 2717 2718
      qError("subplanToString error, code:%x, msg:%p, len:%d", code, msg, msgLen);
      SCH_ERR_JRET(code);
    }

    int32_t msgSize = sizeof(SSubQueryMsg) + msgLen;
    if (NULL == msg) {
      qError("calloc %d failed", msgSize);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
    
wafwerar's avatar
wafwerar 已提交
2719
    SSubQueryMsg* pMsg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
2720
    
L
Liu Jicong 已提交
2721
    pMsg->header.vgId = tInfo.addr.nodeId;
D
dapan1121 已提交
2722
    
2723 2724 2725
    pMsg->sId      = schMgmt.sId;
    pMsg->queryId  = plan->id.queryId;
    pMsg->taskId   = schGenUUID();
D
dapan1121 已提交
2726
    pMsg->taskType = TASK_TYPE_PERSISTENT;
2727 2728
    pMsg->phyLen   = msgLen;
    pMsg->sqlLen   = 0;
L
Liu Jicong 已提交
2729
    memcpy(pMsg->msg, msg, msgLen);
L
fix tq  
Liu Jicong 已提交
2730
    /*memcpy(pMsg->msg, ((SSubQueryMsg*)msg)->msg, msgLen);*/
D
dapan1121 已提交
2731 2732 2733 2734 2735

    tInfo.msg = pMsg;

    if (NULL == taosArrayPush(info, &tInfo)) {
      qError("taosArrayPush failed, idx:%d", i);
wafwerar's avatar
wafwerar 已提交
2736
      taosMemoryFree(msg);
D
dapan1121 已提交
2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

  *pTasks = info;
  info = NULL;
  
_return:
  schedulerFreeTaskList(info);
  SCH_RET(code);
}

D
dapan1121 已提交
2749 2750 2751 2752 2753 2754
int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) {
  if (NULL == src || NULL == dst || copyNum <= 0) {
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
H
Haojun Liao 已提交
2755

D
dapan1121 已提交
2756 2757 2758 2759 2760 2761
  *dst = taosArrayInit(copyNum, sizeof(STaskInfo));
  if (NULL == *dst) {
    qError("taosArrayInit %d taskInfo failed", copyNum);
    SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

2762
  int32_t msgSize = src->msg->phyLen + sizeof(*src->msg);
D
dapan1121 已提交
2763 2764 2765
  STaskInfo info = {0};

  info.addr = src->addr;
H
Haojun Liao 已提交
2766

D
dapan1121 已提交
2767
  for (int32_t i = 0; i < copyNum; ++i) {
wafwerar's avatar
wafwerar 已提交
2768
    info.msg = taosMemoryMalloc(msgSize);
D
dapan1121 已提交
2769 2770 2771 2772 2773 2774 2775 2776
    if (NULL == info.msg) {
      qError("malloc %d failed", msgSize);
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    memcpy(info.msg, src->msg, msgSize);

    info.msg->taskId = schGenUUID();
H
Haojun Liao 已提交
2777

D
dapan1121 已提交
2778 2779
    if (NULL == taosArrayPush(*dst, &info)) {
      qError("taosArrayPush failed, idx:%d", i);
wafwerar's avatar
wafwerar 已提交
2780
      taosMemoryFree(info.msg);
D
dapan1121 已提交
2781 2782 2783 2784 2785 2786 2787 2788 2789 2790
      SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

  return TSDB_CODE_SUCCESS;

_return:

  schedulerFreeTaskList(*dst);
  *dst = NULL;
H
Haojun Liao 已提交
2791

D
dapan1121 已提交
2792 2793
  SCH_RET(code);
}
L
Liu Jicong 已提交
2794
#endif
D
dapan1121 已提交
2795

L
Liu Jicong 已提交
2796
int32_t schedulerFetchRows(int64_t job, void **pData) {
D
dapan1121 已提交
2797
  if (NULL == pData) {
D
dapan1121 已提交
2798
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan 已提交
2799 2800
  }

L
Liu Jicong 已提交
2801
  int32_t  code = 0;
D
dapan1121 已提交
2802
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2803 2804 2805 2806
  if (NULL == pJob) {
    qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
D
dapan1121 已提交
2807

D
dapan1121 已提交
2808 2809
  int8_t status = SCH_GET_JOB_STATUS(pJob);
  if (status == JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
2810
    SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2811
    schReleaseJob(job);
D
dapan1121 已提交
2812
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan1121 已提交
2813 2814
  }

D
dapan1121 已提交
2815
  if (!SCH_JOB_NEED_FETCH(pJob)) {
D
dapan1121 已提交
2816
    SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
D
dapan1121 已提交
2817
    schReleaseJob(job);
D
dapan1121 已提交
2818
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
2819 2820
  }

D
dapan1121 已提交
2821 2822
  if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) {
    SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch));
D
dapan1121 已提交
2823
    schReleaseJob(job);
D
dapan1121 已提交
2824
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan 已提交
2825 2826
  }

D
dapan1121 已提交
2827
  if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
D
dapan1121 已提交
2828
    SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2829 2830
    SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
  } else if (status == JOB_TASK_STATUS_SUCCEED) {
D
dapan1121 已提交
2831
    SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2832 2833
    goto _return;
  } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
D
dapan1121 已提交
2834
    if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
D
dapan1121 已提交
2835 2836
      SCH_ERR_JRET(schFetchFromRemote(pJob));
      tsem_wait(&pJob->rspSem);
H
Hongze Cheng 已提交
2837
    }
D
dapan1121 已提交
2838 2839 2840
  } else {
    SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
    SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
D
dapan 已提交
2841 2842
  }

D
dapan1121 已提交
2843
  status = SCH_GET_JOB_STATUS(pJob);
D
dapan 已提交
2844

D
dapan1121 已提交
2845
  if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
D
dapan1121 已提交
2846
    SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
2847
    SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
D
dapan 已提交
2848
  }
L
Liu Jicong 已提交
2849

D
dapan1121 已提交
2850
  if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) {
D
dapan1121 已提交
2851
    SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED));
D
dapan 已提交
2852 2853
  }

D
dapan1121 已提交
2854
  while (true) {
D
dapan1121 已提交
2855 2856
    *pData = atomic_load_ptr(&pJob->resData);
    if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) {
D
dapan1121 已提交
2857 2858 2859 2860 2861
      continue;
    }

    break;
  }
D
dapan 已提交
2862

D
dapan1121 已提交
2863
  if (NULL == *pData) {
wafwerar's avatar
wafwerar 已提交
2864
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
D
dapan1121 已提交
2865 2866 2867 2868 2869
    if (rsp) {
      rsp->completed = 1;
    }

    *pData = rsp;
D
dapan1121 已提交
2870
    SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
D
dapan1121 已提交
2871
  }
D
dapan1121 已提交
2872

2873
  SCH_JOB_DLOG("fetch done, totalRows:%d, code:%s", pJob->resNumOfRows, tstrerror(code));
D
dapan1121 已提交
2874 2875 2876 2877

_return:

  atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
L
Liu Jicong 已提交
2878

D
dapan1121 已提交
2879
  schReleaseJob(job);
D
dapan 已提交
2880

D
dapan1121 已提交
2881
  SCH_RET(code);
D
dapan 已提交
2882
}
D
dapan1121 已提交
2883

D
dapan1121 已提交
2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
  int32_t  code = 0;
  SSchJob *pJob = schAcquireJob(job);
  if (NULL == pJob) {
    qDebug("acquire job from jobRef list failed, may not started or dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  if (pJob->status < JOB_TASK_STATUS_NOT_START || pJob->levelNum <= 0 || NULL == pJob->levels) {
    qDebug("job not initialized or not executable job, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }

  for (int32_t i = pJob->levelNum - 1; i >= 0; --i) {
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
H
Hongze Cheng 已提交
2899

D
dapan1121 已提交
2900
    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
X
Xiaoyu Wang 已提交
2901
      SSchTask     *pTask = taosArrayGet(pLevel->subTasks, m);
D
dapan1121 已提交
2902
      SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status};
H
Hongze Cheng 已提交
2903

D
dapan1121 已提交
2904 2905 2906 2907 2908 2909 2910
      taosArrayPush(pSub, &subDesc);
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2911
int32_t scheduleCancelJob(int64_t job) {
D
dapan1121 已提交
2912
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2913 2914 2915 2916
  if (NULL == pJob) {
    qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
  }
D
dapan1121 已提交
2917

D
dapan1121 已提交
2918 2919
  int32_t code = schCancelJob(pJob);

D
dapan1121 已提交
2920
  schReleaseJob(job);
D
dapan1121 已提交
2921 2922

  SCH_RET(code);
D
dapan1121 已提交
2923 2924
}

D
dapan1121 已提交
2925
void schedulerFreeJob(int64_t job) {
D
dapan1121 已提交
2926
  SSchJob *pJob = schAcquireJob(job);
D
dapan1121 已提交
2927
  if (NULL == pJob) {
D
dapan1121 已提交
2928
    qDebug("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
D
dapan 已提交
2929 2930
    return;
  }
D
dapan1121 已提交
2931

D
dapan1121 已提交
2932 2933
  if (atomic_load_8(&pJob->userFetch) > 0) {
    schProcessOnJobDropped(pJob, TSDB_CODE_QRY_JOB_FREED);
D
dapan1121 已提交
2934
  }
D
dapan1121 已提交
2935

D
dapan1121 已提交
2936
  SCH_JOB_DLOG("start to remove job from jobRef list, refId:%" PRIx64, job);
2937

D
dapan1121 已提交
2938 2939
  if (taosRemoveRef(schMgmt.jobRef, job)) {
    SCH_JOB_ELOG("remove job from job list failed, refId:%" PRIx64, job);
2940
  }
D
dapan1121 已提交
2941 2942

  schReleaseJob(job);
D
dapan1121 已提交
2943
}
D
dapan1121 已提交
2944 2945 2946 2947 2948 2949 2950 2951 2952

void schedulerFreeTaskList(SArray *taskList) {
  if (NULL == taskList) {
    return;
  }

  int32_t taskNum = taosArrayGetSize(taskList);
  for (int32_t i = 0; i < taskNum; ++i) {
    STaskInfo *info = taosArrayGet(taskList, i);
wafwerar's avatar
wafwerar 已提交
2953
    taosMemoryFreeClear(info->msg);
D
dapan1121 已提交
2954 2955 2956 2957
  }

  taosArrayDestroy(taskList);
}
L
Liu Jicong 已提交
2958

D
dapan1121 已提交
2959
void schedulerDestroy(void) {
2960 2961
  atomic_store_8((int8_t *)&schMgmt.exit, 1);

D
dapan1121 已提交
2962
  if (schMgmt.jobRef >= 0) {
D
dapan1121 已提交
2963
    SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0);
H
Hongze Cheng 已提交
2964
    int64_t  refId = 0;
C
Cary Xu 已提交
2965

D
dapan1121 已提交
2966
    while (pJob) {
D
dapan1121 已提交
2967
      refId = pJob->refId;
C
Cary Xu 已提交
2968 2969 2970
      if (refId == 0) {
        break;
      }
D
dapan1121 已提交
2971
      taosRemoveRef(schMgmt.jobRef, pJob->refId);
L
Liu Jicong 已提交
2972

D
dapan1121 已提交
2973
      pJob = taosIterateRef(schMgmt.jobRef, refId);
D
dapan1121 已提交
2974
    }
D
dapan1121 已提交
2975
  }
D
dapan1121 已提交
2976 2977

  if (schMgmt.hbConnections) {
H
Hongze Cheng 已提交
2978
    void *pIter = taosHashIterate(schMgmt.hbConnections, NULL);
D
dapan1121 已提交
2979 2980 2981 2982
    while (pIter != NULL) {
      SSchHbTrans *hb = pIter;
      schFreeRpcCtx(&hb->rpcCtx);
      pIter = taosHashIterate(schMgmt.hbConnections, pIter);
H
Hongze Cheng 已提交
2983
    }
D
dapan1121 已提交
2984 2985 2986
    taosHashCleanup(schMgmt.hbConnections);
    schMgmt.hbConnections = NULL;
  }
D
dapan1121 已提交
2987
}