schInt.h 25.1 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_SCHEDULER_INT_H_
#define _TD_SCHEDULER_INT_H_

#ifdef __cplusplus
extern "C" {
#endif

H
Hongze Cheng 已提交
23
#include "command.h"
24 25 26
#include "os.h"
#include "planner.h"
#include "scheduler.h"
H
Hongze Cheng 已提交
27
#include "tarray.h"
28
#include "thash.h"
D
dapan1121 已提交
29
#include "trpc.h"
30

D
dapan1121 已提交
31 32 33 34 35
enum {
  SCH_READ = 1,
  SCH_WRITE,
};

D
dapan1121 已提交
36 37 38 39 40
enum {
  SCH_EXEC_CB = 1,
  SCH_FETCH_CB,
};

D
dapan1121 已提交
41 42 43 44
typedef enum {
  SCH_OP_NULL = 0,
  SCH_OP_EXEC,
  SCH_OP_FETCH,
D
dapan1121 已提交
45
  SCH_OP_GET_STATUS,
D
dapan1121 已提交
46 47
} SCH_OP_TYPE;

D
dapan1121 已提交
48 49 50 51 52 53
typedef enum {
  SCH_LOAD_SEQ = 1,
  SCH_RANDOM,
  SCH_ALL,
} SCH_POLICY;

H
Hongze Cheng 已提交
54 55
#define SCHEDULE_DEFAULT_MAX_JOB_NUM        1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM       1000
D
dapan1121 已提交
56
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200  // unit is TSDB_TABLE_NUM_UNIT
H
Hongze Cheng 已提交
57 58
#define SCHEDULE_DEFAULT_POLICY             SCH_LOAD_SEQ
#define SCHEDULE_DEFAULT_MAX_NODE_NUM       20
D
dapan1121 已提交
59

D
dapan1121 已提交
60 61
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 60000000
#define SCH_MAX_TASK_TIMEOUT_USEC     300000000
H
Hongze Cheng 已提交
62 63
#define SCH_DEFAULT_MAX_RETRY_NUM     6
#define SCH_MIN_AYSNC_EXEC_NUM        3
D
dapan1121 已提交
64

D
dapan1121 已提交
65
typedef struct SSchDebug {
H
Hongze Cheng 已提交
66 67
  bool lockEnable;
  bool apiEnable;
D
dapan1121 已提交
68 69
} SSchDebug;

D
dapan1121 已提交
70
typedef struct SSchTrans {
D
dapan1121 已提交
71 72
  void *pTrans;
  void *pHandle;
D
dapan1121 已提交
73 74
} SSchTrans;

D
dapan1121 已提交
75 76
typedef struct SSchHbTrans {
  SRWLatch  lock;
D
dapan1121 已提交
77
  int64_t   taskNum;
D
dapan1121 已提交
78
  SRpcCtx   rpcCtx;
D
dapan1121 已提交
79 80 81
  SSchTrans trans;
} SSchHbTrans;

D
dapan1121 已提交
82
typedef struct SSchApiStat {
wafwerar's avatar
wafwerar 已提交
83
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
wafwerar's avatar
wafwerar 已提交
84 85 86
  size_t avoidCompilationErrors;
#endif

D
dapan1121 已提交
87 88 89
} SSchApiStat;

typedef struct SSchRuntimeStat {
wafwerar's avatar
wafwerar 已提交
90
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
wafwerar's avatar
wafwerar 已提交
91 92 93
  size_t avoidCompilationErrors;
#endif

D
dapan1121 已提交
94 95 96
} SSchRuntimeStat;

typedef struct SSchJobStat {
wafwerar's avatar
wafwerar 已提交
97
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
wafwerar's avatar
wafwerar 已提交
98 99 100
  size_t avoidCompilationErrors;
#endif

D
dapan1121 已提交
101 102
} SSchJobStat;

D
dapan1121 已提交
103
typedef struct SSchStat {
H
Hongze Cheng 已提交
104 105 106
  SSchApiStat     api;
  SSchRuntimeStat runtime;
  SSchJobStat     job;
D
dapan1121 已提交
107
} SSchStat;
D
dapan1121 已提交
108

D
dapan1121 已提交
109
typedef struct SSchResInfo {
H
Hongze Cheng 已提交
110 111 112 113 114
  SExecResult     *execRes;
  void           **fetchRes;
  schedulerExecFp  execFp;
  schedulerFetchFp fetchFp;
  void            *cbParam;
D
dapan1121 已提交
115
} SSchResInfo;
D
dapan1121 已提交
116

D
dapan1121 已提交
117 118 119 120 121 122
typedef struct SSchOpEvent {
  SCH_OP_TYPE    type;
  bool           begin;
  SSchedulerReq *pReq;
} SSchOpEvent;

H
Hongze Cheng 已提交
123 124 125
typedef int32_t (*schStatusEnterFp)(void *pHandle, void *pParam);
typedef int32_t (*schStatusLeaveFp)(void *pHandle, void *pParam);
typedef int32_t (*schStatusEventFp)(void *pHandle, void *pParam, void *pEvent);
D
dapan1121 已提交
126 127 128 129 130 131 132 133

typedef struct SSchStatusFps {
  EJobTaskType     status;
  schStatusEnterFp enterFp;
  schStatusLeaveFp leaveFp;
  schStatusEventFp eventFp;
} SSchStatusFps;

D
dapan1121 已提交
134 135 136 137 138 139 140
typedef struct SSchedulerCfg {
  uint32_t   maxJobNum;
  int32_t    maxNodeTableNum;
  SCH_POLICY schPolicy;
  bool       enableReSchedule;
} SSchedulerCfg;

141
typedef struct SSchedulerMgmt {
H
Hongze Cheng 已提交
142 143 144 145 146 147 148 149 150 151
  uint64_t      taskId;  // sequential taksId
  uint64_t      sId;     // schedulerId
  SSchedulerCfg cfg;
  bool          exit;
  int32_t       jobRef;
  int32_t       jobNum;
  SSchStat      stat;
  SRWLatch      hbLock;
  SHashObj     *hbConnections;
  void         *queryMgmt;
152
} SSchedulerMgmt;
153

D
dapan1121 已提交
154 155 156 157 158 159 160 161 162
typedef struct SSchCallbackParamHeader {
  bool isHbParam;
} SSchCallbackParamHeader;

typedef struct SSchTaskCallbackParam {
  SSchCallbackParamHeader head;
  uint64_t                queryId;
  int64_t                 refId;
  uint64_t                taskId;
D
dapan1121 已提交
163
  int32_t                 execId;
D
dapan1121 已提交
164
  void                   *pTrans;
D
dapan1121 已提交
165 166 167 168 169
} SSchTaskCallbackParam;

typedef struct SSchHbCallbackParam {
  SSchCallbackParamHeader head;
  SQueryNodeEpId          nodeEpId;
D
dapan1121 已提交
170
  void                   *pTrans;
D
dapan1121 已提交
171
} SSchHbCallbackParam;
D
dapan1121 已提交
172

D
dapan1121 已提交
173
typedef struct SSchFlowControl {
H
Hongze Cheng 已提交
174 175 176 177 178
  SRWLatch lock;
  bool     sorted;
  int32_t  tableNumSum;
  uint32_t execTaskNum;
  SArray  *taskList;  // Element is SSchTask*
D
dapan1121 已提交
179 180
} SSchFlowControl;

D
dapan1121 已提交
181 182 183 184 185
typedef struct SSchNodeInfo {
  SQueryNodeAddr addr;
  void          *handle;
} SSchNodeInfo;

D
dapan 已提交
186
typedef struct SSchLevel {
H
Hongze Cheng 已提交
187 188 189 190 191 192 193 194 195
  int32_t  level;
  int8_t   status;
  SRWLatch lock;
  int32_t  taskFailed;
  int32_t  taskSucceed;
  int32_t  taskNum;
  int32_t  taskLaunchedNum;
  int32_t  taskDoneNum;
  SArray  *subTasks;  // Element is SSchTask
D
dapan 已提交
196
} SSchLevel;
D
dapan1121 已提交
197

D
dapan1121 已提交
198
typedef struct SSchTaskProfile {
H
Hongze Cheng 已提交
199 200 201 202
  int64_t startTs;
  SArray *execTime;
  int64_t waitTime;
  int64_t endTs;
D
dapan1121 已提交
203 204
} SSchTaskProfile;

D
dapan 已提交
205
typedef struct SSchTask {
H
Hongze Cheng 已提交
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
  uint64_t        taskId;          // task id
  SRWLatch        lock;            // task reentrant lock
  int32_t         maxExecTimes;    // task max exec times
  int32_t         maxRetryTimes;   // task max retry times
  int32_t         retryTimes;      // task retry times
  bool            waitRetry;       // wait for retry
  int32_t         execId;          // task current execute index
  SSchLevel      *level;           // level
  SRWLatch        planLock;        // task update plan lock
  SSubplan       *plan;            // subplan
  char           *msg;             // operator tree
  int32_t         msgLen;          // msg length
  int8_t          status;          // task status
  int32_t         lastMsgType;     // last sent msg type
  int64_t         timeoutUsec;     // task timeout useconds before reschedule
  SQueryNodeAddr  succeedAddr;     // task executed success node address
  int8_t          candidateIdx;    // current try condidation index
  SArray         *candidateAddrs;  // condidate node addresses, element is SQueryNodeAddr
  SHashObj       *execNodes;       // all tried node for current task, element is SSchNodeInfo
  SSchTaskProfile profile;         // task execution profile
  int32_t         childReady;      // child task ready number
  SArray         *children;        // the datasource tasks,from which to fetch the result, element is SQueryTask*
  SArray         *parents;         // the data destination tasks, get data from current task, element is SQueryTask*
  void           *handle;          // task send handle
  bool            registerdHb;     // registered in hb
D
dapan 已提交
231
} SSchTask;
D
dapan1121 已提交
232

D
dapan 已提交
233
typedef struct SSchJobAttr {
D
dapan1121 已提交
234 235
  EExplainMode explainMode;
  bool         queryJob;
D
dapan1121 已提交
236
  bool         insertJob;
D
dapan1121 已提交
237
  bool         needFetch;
D
dapan1121 已提交
238
  bool         needFlowCtrl;
D
dapan1121 已提交
239
  bool         localExec;
D
dapan 已提交
240
} SSchJobAttr;
D
dapan1121 已提交
241

D
dapan1121 已提交
242
typedef struct {
H
Hongze Cheng 已提交
243 244 245
  int32_t  op;
  SRWLatch lock;
  bool     syncReq;
D
dapan1121 已提交
246 247
} SSchOpStatus;

D
dapan 已提交
248
typedef struct SSchJob {
H
Hongze Cheng 已提交
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
  int64_t          refId;
  uint64_t         queryId;
  SSchJobAttr      attr;
  int32_t          levelNum;
  int32_t          taskNum;
  SRequestConnInfo conn;
  SArray          *nodeList;  // qnode/vnode list, SArray<SQueryNodeLoad>
  SArray          *levels;    // starting from 0. SArray<SSchLevel>
  SQueryPlan      *pDag;
  int64_t          allocatorRefId;

  SArray   *dataSrcTasks;  // SArray<SQueryTask*>
  int32_t   levelIdx;
  SEpSet    dataSrcEps;
  SHashObj *taskList;
  SHashObj *execTasks;  // executing and executed tasks, key:taskid, value:SQueryTask*
  SHashObj *flowCtrl;   // key is ep, element is SSchFlowControl

  SExplainCtx         *explainCtx;
  int8_t               status;
  SQueryNodeAddr       resNode;
  tsem_t               rspSem;
  SSchOpStatus         opStatus;
  schedulerChkKillFp   chkKillFp;
  void                *chkKillParam;
  SSchTask            *fetchTask;
  int32_t              errCode;
  SRWLatch             resLock;
  SExecResult          execRes;
  void                *fetchRes;  // TODO free it or not
  bool                 fetched;
  int32_t              resNumOfRows;
  SSchResInfo          userRes;
  char                *sql;
283
  SQueryProfileSummary summary;
D
dapan 已提交
284
} SSchJob;
D
dapan1121 已提交
285

D
dapan1121 已提交
286
typedef struct SSchTaskCtx {
287
  int64_t   jobRid;
D
dapan1121 已提交
288
  SSchTask *pTask;
289
  bool      asyncLaunch;
D
dapan1121 已提交
290 291
} SSchTaskCtx;

D
dapan1121 已提交
292 293
extern SSchedulerMgmt schMgmt;

H
Hongze Cheng 已提交
294 295
#define SCH_TASK_TIMEOUT(_task) \
  ((taosGetTimestampUs() - *(int64_t *)taosArrayGet((_task)->profile.execTime, (_task)->execId)) > (_task)->timeoutUsec)
D
dapan1121 已提交
296

D
dapan1121 已提交
297
#define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
D
dapan1121 已提交
298

H
Hongze Cheng 已提交
299
#define SCH_LOCK_TASK(_task)   SCH_LOCK(SCH_WRITE, &(_task)->lock)
D
dapan1121 已提交
300 301
#define SCH_UNLOCK_TASK(_task) SCH_UNLOCK(SCH_WRITE, &(_task)->lock)

H
Hongze Cheng 已提交
302
#define SCH_TASK_ID(_task)  ((_task) ? (_task)->taskId : -1)
D
dapan1121 已提交
303
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
D
dapan1121 已提交
304

D
dapan1121 已提交
305
#define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
H
Hongze Cheng 已提交
306 307
#define SCH_IS_DATA_BIND_TASK(task) \
  (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
D
dapan1121 已提交
308
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
H
Hongze Cheng 已提交
309 310 311 312
#define SCH_IS_DATA_MERGE_TASK(task)  (!SCH_IS_DATA_BIND_TASK(task))
#define SCH_IS_LOCAL_EXEC_TASK(_job, _task)                                          \
  ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && \
   (!SCH_IS_DATA_BIND_QRY_TASK(_task)))
313

H
Haojun Liao 已提交
314
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
H
Hongze Cheng 已提交
315
#define SCH_GET_TASK_STATUS(task)     atomic_load_8(&(task)->status)
D
dapan1121 已提交
316 317
#define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task))

H
Hongze Cheng 已提交
318
#define SCH_GET_TASK_HANDLE(_task)          ((_task) ? (_task)->handle : NULL)
D
dapan1121 已提交
319
#define SCH_SET_TASK_HANDLE(_task, _handle) ((_task)->handle = (_handle))
D
dapan1121 已提交
320

H
Haojun Liao 已提交
321
#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st)
H
Hongze Cheng 已提交
322
#define SCH_GET_JOB_STATUS(job)     atomic_load_8(&(job)->status)
D
dapan1121 已提交
323
#define SCH_GET_JOB_STATUS_STR(job) jobTaskStatusStr(SCH_GET_JOB_STATUS(job))
D
dapan1121 已提交
324

D
dapan1121 已提交
325
#define SCH_JOB_IN_SYNC_OP(job) ((job)->opStatus.op && (job)->opStatus.syncReq)
H
Hongze Cheng 已提交
326 327 328 329 330 331
#define SCH_JOB_IN_ASYNC_EXEC_OP(job)                                                                \
  ((SCH_OP_EXEC == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_EXEC, SCH_OP_NULL)) && \
   (!(job)->opStatus.syncReq))
#define SCH_JOB_IN_ASYNC_FETCH_OP(job)                                                                 \
  ((SCH_OP_FETCH == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_FETCH, SCH_OP_NULL)) && \
   (!(job)->opStatus.syncReq))
D
dapan1121 已提交
332

D
dapan1121 已提交
333
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
H
Hongze Cheng 已提交
334 335 336 337
#define SCH_JOB_NEED_FLOW_CTRL(_job)     ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) \
  (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_FETCH_TYPE(_pSrcTask)  (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
D
dapan1121 已提交
338
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
D
dapan1121 已提交
339

H
Hongze Cheng 已提交
340 341 342 343 344 345 346 347 348 349
#define SCH_SET_JOB_TYPE(_job, type)     \
  do {                                   \
    if ((type) != SUBPLAN_TYPE_MODIFY) { \
      (_job)->attr.queryJob = true;      \
    } else {                             \
      (_job)->attr.insertJob = true;     \
    }                                    \
  } while (0)
#define SCH_IS_QUERY_JOB(_job)   ((_job)->attr.queryJob)
#define SCH_IS_INSERT_JOB(_job)  ((_job)->attr.insertJob)
D
dapan1121 已提交
350
#define SCH_JOB_NEED_FETCH(_job) ((_job)->attr.needFetch)
H
Hongze Cheng 已提交
351 352
#define SCH_JOB_NEED_WAIT(_job)  (!SCH_IS_QUERY_JOB(_job))
#define SCH_JOB_NEED_DROP(_job)  (SCH_IS_QUERY_JOB(_job))
D
dapan1121 已提交
353
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
H
Hongze Cheng 已提交
354 355 356 357 358 359 360 361 362 363 364
#define SCH_NETWORK_ERR(_code)   ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) \
  (SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task))))
#define SCH_REDIRECT_MSGTYPE(_msgType)                                                                         \
  ((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || \
   (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_REDIRECT(_task, _msgType, _code, _rspLen) \
  (SCH_REDIRECT_MSGTYPE(_msgType) &&                            \
   (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_MERGE_TASK_NETWORK_ERR((_task), (_code), (_rspLen))))
#define SCH_NEED_RETRY(_msgType, _code) \
  ((SCH_NETWORK_ERR(_code) && SCH_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
D
dapan1121 已提交
365

D
dapan1121 已提交
366
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
H
Hongze Cheng 已提交
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
#define SCH_GET_CUR_EP(_addr)           (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
#define SCH_SWITCH_EPSET(_addr)         ((_addr)->epSet.inUse = ((_addr)->epSet.inUse + 1) % (_addr)->epSet.numOfEps)
#define SCH_TASK_NUM_OF_EPS(_addr)      ((_addr)->epSet.numOfEps)

#define SCH_LOG_TASK_START_TS(_task)               \
  do {                                             \
    int64_t us = taosGetTimestampUs();             \
    taosArrayPush((_task)->profile.execTime, &us); \
    if (0 == (_task)->execId) {                    \
      (_task)->profile.startTs = us;               \
    }                                              \
  } while (0)

#define SCH_LOG_TASK_WAIT_TS(_task)                                                                         \
  do {                                                                                                      \
    int64_t us = taosGetTimestampUs();                                                                      \
    (_task)->profile.waitTime += us - *(int64_t *)taosArrayGet((_task)->profile.execTime, (_task)->execId); \
  } while (0)

#define SCH_LOG_TASK_END_TS(_task)                                               \
  do {                                                                           \
    int64_t  us = taosGetTimestampUs();                                          \
    int32_t  idx = (_task)->execId % (_task)->maxExecTimes;                      \
D
dapan1121 已提交
390
    int64_t *startts = taosArrayGet((_task)->profile.execTime, (_task)->execId); \
H
Hongze Cheng 已提交
391 392 393
    *startts = us - *startts;                                                    \
    (_task)->profile.endTs = us;                                                 \
  } while (0)
D
dapan1121 已提交
394

H
Haojun Liao 已提交
395 396
#define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
S
Shengliang Guan 已提交
397

H
Hongze Cheng 已提交
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450
#define SCH_TASK_ELOG(param, ...)                                                                                     \
  qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
         __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...)                                                                                     \
  qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
         __VA_ARGS__)
#define SCH_TASK_TLOG(param, ...)                                                                                     \
  qTrace("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
         __VA_ARGS__)
#define SCH_TASK_DLOGL(param, ...)                                                                                     \
  qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
          __VA_ARGS__)
#define SCH_TASK_WLOG(param, ...)                                                                                    \
  qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
        __VA_ARGS__)

#define SCH_SET_ERRNO(_err)                     \
  do {                                          \
    if (TSDB_CODE_SCH_IGNORE_ERROR != (_err)) { \
      terrno = (_err);                          \
    }                                           \
  } while (0)
#define SCH_ERR_RET(c)                \
  do {                                \
    int32_t _code = c;                \
    if (_code != TSDB_CODE_SUCCESS) { \
      SCH_SET_ERRNO(_code);           \
      return _code;                   \
    }                                 \
  } while (0)
#define SCH_RET(c)                    \
  do {                                \
    int32_t _code = c;                \
    if (_code != TSDB_CODE_SUCCESS) { \
      SCH_SET_ERRNO(_code);           \
    }                                 \
    return _code;                     \
  } while (0)
#define SCH_ERR_JRET(c)              \
  do {                               \
    code = c;                        \
    if (code != TSDB_CODE_SUCCESS) { \
      SCH_SET_ERRNO(code);           \
      goto _return;                  \
    }                                \
  } while (0)

#define SCH_LOCK_DEBUG(...)     \
  do {                          \
    if (gSCHDebug.lockEnable) { \
      qDebug(__VA_ARGS__);      \
    }                           \
  } while (0)
D
dapan1121 已提交
451 452 453

#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000

H
Hongze Cheng 已提交
454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491
#define SCH_LOCK(type, _lock)                                                                        \
  do {                                                                                               \
    if (SCH_READ == (type)) {                                                                        \
      assert(atomic_load_32(_lock) >= 0);                                                            \
      SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      taosRLockLatch(_lock);                                                                         \
      SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      assert(atomic_load_32(_lock) > 0);                                                             \
    } else {                                                                                         \
      assert(atomic_load_32(_lock) >= 0);                                                            \
      SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      taosWLockLatch(_lock);                                                                         \
      SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      assert(atomic_load_32(_lock) == TD_RWLATCH_WRITE_FLAG_COPY);                                   \
    }                                                                                                \
  } while (0)

#define SCH_UNLOCK(type, _lock)                                                                       \
  do {                                                                                                \
    if (SCH_READ == (type)) {                                                                         \
      assert(atomic_load_32((_lock)) > 0);                                                            \
      SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      taosRUnLockLatch(_lock);                                                                        \
      SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      assert(atomic_load_32((_lock)) >= 0);                                                           \
    } else {                                                                                          \
      assert(atomic_load_32((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY);                                   \
      SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      taosWUnLockLatch(_lock);                                                                        \
      SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      assert(atomic_load_32((_lock)) >= 0);                                                           \
    }                                                                                                 \
  } while (0)

void     schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
void     schCleanClusterHb(void *pTrans);
int32_t  schLaunchTask(SSchJob *job, SSchTask *task);
int32_t  schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
D
dapan1121 已提交
492
SSchJob *schAcquireJob(int64_t refId);
H
Hongze Cheng 已提交
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524
int32_t  schReleaseJob(int64_t refId);
void     schFreeFlowCtrl(SSchJob *pJob);
int32_t  schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
int32_t  schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
int32_t  schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
int32_t  schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
int32_t  schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
int32_t  schLaunchFetchTask(SSchJob *pJob);
int32_t  schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
int32_t  schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction);
int32_t  schCloneSMsgSendInfo(void *src, void **dst);
int32_t  schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob);
void     schFreeJobImpl(void *job);
int32_t  schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx);
int32_t  schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask);
int32_t  schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans);
int32_t  schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code);
void     schFreeRpcCtx(SRpcCtx *pCtx);
int32_t  schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp);
bool     schJobNeedToStop(SSchJob *pJob, int8_t *pStatus);
int32_t  schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask);
int32_t  schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp);
int32_t  schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp);
void     schProcessOnDataFetched(SSchJob *job);
int32_t  schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask);
void     schFreeRpcCtxVal(const void *arg);
int32_t  schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb);
int32_t  schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId);
int32_t  schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync);
int32_t  schUpdateJobStatus(SSchJob *pJob, int8_t newStatus);
int32_t  schCancelJob(SSchJob *pJob);
int32_t  schProcessOnJobDropped(SSchJob *pJob, int32_t errCode);
D
dapan1121 已提交
525
uint64_t schGenTaskId(void);
H
Hongze Cheng 已提交
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566
void     schCloseJobRef(void);
int32_t  schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob);
int32_t  schJobFetchRows(SSchJob *pJob);
int32_t  schJobFetchRowsA(SSchJob *pJob);
int32_t  schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId);
int32_t  schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList);
char    *schGetOpStr(SCH_OP_TYPE type);
int32_t  schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
int32_t  schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
int32_t  schExecJob(SSchJob *pJob, SSchedulerReq *pReq);
int32_t  schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes);
int32_t  schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet);
int32_t  schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode);
void     schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode);
int32_t  schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq);
void     schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
int32_t  schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_t rId, uint64_t tId);
void     schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask);
bool     schJobDone(SSchJob *pJob);
int32_t  schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask);
int32_t  schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask);
int32_t  schSwitchJobStatus(SSchJob *pJob, int32_t status, void *param);
int32_t  schHandleOpBeginEvent(int64_t jobId, SSchJob **job, SCH_OP_TYPE type, SSchedulerReq *pReq);
int32_t  schHandleOpEndEvent(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode);
int32_t  schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask);
void     schUpdateJobErrCode(SSchJob *pJob, int32_t errCode);
int32_t  schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry);
int32_t  schProcessOnJobFailure(SSchJob *pJob, int32_t errCode);
int32_t  schProcessOnJobPartialSuccess(SSchJob *pJob);
void     schFreeTask(SSchJob *pJob, SSchTask *pTask);
void     schDropTaskInHashList(SSchJob *pJob, SHashObj *list);
int32_t  schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
int32_t  schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
int32_t  schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel);
int32_t  schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
void     schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode);
int32_t  schHandleJobFailure(SSchJob *pJob, int32_t errCode);
int32_t  schHandleJobDrop(SSchJob *pJob, int32_t errCode);
bool     schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync);
int32_t  schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode);
int32_t  schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp);
D
dapan1121 已提交
567

D
dapan1121 已提交
568 569
extern SSchDebug gSCHDebug;

H
refact  
Hongze Cheng 已提交
570 571 572 573
#ifdef __cplusplus
}
#endif

D
dapan1121 已提交
574
#endif /*_TD_SCHEDULER_INT_H_*/