schInt.h 26.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_SCHEDULER_INT_H_
#define _TD_SCHEDULER_INT_H_

#ifdef __cplusplus
extern "C" {
#endif

H
Hongze Cheng 已提交
23
#include "command.h"
24 25 26
#include "os.h"
#include "planner.h"
#include "scheduler.h"
H
Hongze Cheng 已提交
27
#include "tarray.h"
28
#include "thash.h"
D
dapan1121 已提交
29
#include "trpc.h"
30
#include "ttimer.h"
31

D
dapan1121 已提交
32 33 34 35 36
enum {
  SCH_READ = 1,
  SCH_WRITE,
};

D
dapan1121 已提交
37 38 39 40 41
enum {
  SCH_EXEC_CB = 1,
  SCH_FETCH_CB,
};

D
dapan1121 已提交
42 43 44 45
typedef enum {
  SCH_OP_NULL = 0,
  SCH_OP_EXEC,
  SCH_OP_FETCH,
D
dapan1121 已提交
46
  SCH_OP_GET_STATUS,
D
dapan1121 已提交
47 48
} SCH_OP_TYPE;

D
dapan1121 已提交
49 50 51 52 53 54
typedef enum {
  SCH_LOAD_SEQ = 1,
  SCH_RANDOM,
  SCH_ALL,
} SCH_POLICY;

H
Hongze Cheng 已提交
55 56
#define SCHEDULE_DEFAULT_MAX_JOB_NUM        1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM       1000
D
dapan1121 已提交
57
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200  // unit is TSDB_TABLE_NUM_UNIT
H
Hongze Cheng 已提交
58 59
#define SCHEDULE_DEFAULT_POLICY             SCH_LOAD_SEQ
#define SCHEDULE_DEFAULT_MAX_NODE_NUM       20
D
dapan1121 已提交
60

D
dapan1121 已提交
61 62
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 60000000
#define SCH_MAX_TASK_TIMEOUT_USEC     300000000
H
Hongze Cheng 已提交
63 64
#define SCH_DEFAULT_MAX_RETRY_NUM     6
#define SCH_MIN_AYSNC_EXEC_NUM        3
D
dapan1121 已提交
65

D
dapan1121 已提交
66
typedef struct SSchDebug {
H
Hongze Cheng 已提交
67 68
  bool lockEnable;
  bool apiEnable;
D
dapan1121 已提交
69 70
} SSchDebug;

D
dapan1121 已提交
71
typedef struct SSchTrans {
D
dapan1121 已提交
72 73
  void *pTrans;
  void *pHandle;
D
dapan1121 已提交
74 75
} SSchTrans;

D
dapan1121 已提交
76 77
typedef struct SSchHbTrans {
  SRWLatch  lock;
D
dapan1121 已提交
78
  int64_t   taskNum;
D
dapan1121 已提交
79
  SRpcCtx   rpcCtx;
D
dapan1121 已提交
80 81 82
  SSchTrans trans;
} SSchHbTrans;

D
dapan1121 已提交
83
typedef struct SSchApiStat {
wafwerar's avatar
wafwerar 已提交
84
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
wafwerar's avatar
wafwerar 已提交
85 86 87
  size_t avoidCompilationErrors;
#endif

D
dapan1121 已提交
88 89 90
} SSchApiStat;

typedef struct SSchRuntimeStat {
wafwerar's avatar
wafwerar 已提交
91
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
wafwerar's avatar
wafwerar 已提交
92 93 94
  size_t avoidCompilationErrors;
#endif

D
dapan1121 已提交
95 96 97
} SSchRuntimeStat;

typedef struct SSchJobStat {
wafwerar's avatar
wafwerar 已提交
98
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
wafwerar's avatar
wafwerar 已提交
99 100 101
  size_t avoidCompilationErrors;
#endif

D
dapan1121 已提交
102 103
} SSchJobStat;

D
dapan1121 已提交
104
typedef struct SSchStat {
H
Hongze Cheng 已提交
105 106 107
  SSchApiStat     api;
  SSchRuntimeStat runtime;
  SSchJobStat     job;
D
dapan1121 已提交
108
} SSchStat;
D
dapan1121 已提交
109

D
dapan1121 已提交
110
typedef struct SSchResInfo {
H
Hongze Cheng 已提交
111 112 113 114 115
  SExecResult     *execRes;
  void           **fetchRes;
  schedulerExecFp  execFp;
  schedulerFetchFp fetchFp;
  void            *cbParam;
D
dapan1121 已提交
116
} SSchResInfo;
D
dapan1121 已提交
117

D
dapan1121 已提交
118 119 120 121 122 123
typedef struct SSchOpEvent {
  SCH_OP_TYPE    type;
  bool           begin;
  SSchedulerReq *pReq;
} SSchOpEvent;

H
Hongze Cheng 已提交
124 125 126
typedef int32_t (*schStatusEnterFp)(void *pHandle, void *pParam);
typedef int32_t (*schStatusLeaveFp)(void *pHandle, void *pParam);
typedef int32_t (*schStatusEventFp)(void *pHandle, void *pParam, void *pEvent);
D
dapan1121 已提交
127 128 129 130 131 132 133 134

typedef struct SSchStatusFps {
  EJobTaskType     status;
  schStatusEnterFp enterFp;
  schStatusLeaveFp leaveFp;
  schStatusEventFp eventFp;
} SSchStatusFps;

D
dapan1121 已提交
135 136 137 138 139 140 141
typedef struct SSchedulerCfg {
  uint32_t   maxJobNum;
  int32_t    maxNodeTableNum;
  SCH_POLICY schPolicy;
  bool       enableReSchedule;
} SSchedulerCfg;

142
typedef struct SSchedulerMgmt {
H
Hongze Cheng 已提交
143 144 145 146 147 148
  uint64_t      taskId;  // sequential taksId
  uint64_t      sId;     // schedulerId
  SSchedulerCfg cfg;
  bool          exit;
  int32_t       jobRef;
  int32_t       jobNum;
dengyihao's avatar
dengyihao 已提交
149
  SSchStat      stat;
150
  void         *timer;
H
Hongze Cheng 已提交
151 152 153
  SRWLatch      hbLock;
  SHashObj     *hbConnections;
  void         *queryMgmt;
154
} SSchedulerMgmt;
155

D
dapan1121 已提交
156 157 158 159 160 161 162 163 164
typedef struct SSchCallbackParamHeader {
  bool isHbParam;
} SSchCallbackParamHeader;

typedef struct SSchTaskCallbackParam {
  SSchCallbackParamHeader head;
  uint64_t                queryId;
  int64_t                 refId;
  uint64_t                taskId;
D
dapan1121 已提交
165
  int32_t                 execId;
D
dapan1121 已提交
166
  void                   *pTrans;
D
dapan1121 已提交
167 168 169 170 171
} SSchTaskCallbackParam;

typedef struct SSchHbCallbackParam {
  SSchCallbackParamHeader head;
  SQueryNodeEpId          nodeEpId;
D
dapan1121 已提交
172
  void                   *pTrans;
D
dapan1121 已提交
173
} SSchHbCallbackParam;
D
dapan1121 已提交
174

D
dapan1121 已提交
175
typedef struct SSchFlowControl {
H
Hongze Cheng 已提交
176 177 178 179 180
  SRWLatch lock;
  bool     sorted;
  int32_t  tableNumSum;
  uint32_t execTaskNum;
  SArray  *taskList;  // Element is SSchTask*
D
dapan1121 已提交
181 182
} SSchFlowControl;

D
dapan1121 已提交
183 184 185 186 187
typedef struct SSchNodeInfo {
  SQueryNodeAddr addr;
  void          *handle;
} SSchNodeInfo;

D
dapan 已提交
188
typedef struct SSchLevel {
H
Hongze Cheng 已提交
189 190 191 192 193 194 195 196 197
  int32_t  level;
  int8_t   status;
  SRWLatch lock;
  int32_t  taskFailed;
  int32_t  taskSucceed;
  int32_t  taskNum;
  int32_t  taskLaunchedNum;
  int32_t  taskDoneNum;
  SArray  *subTasks;  // Element is SSchTask
D
dapan 已提交
198
} SSchLevel;
D
dapan1121 已提交
199

D
dapan1121 已提交
200
typedef struct SSchTaskProfile {
H
Hongze Cheng 已提交
201 202 203 204
  int64_t startTs;
  SArray *execTime;
  int64_t waitTime;
  int64_t endTs;
D
dapan1121 已提交
205 206
} SSchTaskProfile;

207 208 209 210 211 212 213 214 215 216
typedef struct SSchRedirectCtx {
  int32_t periodMs;
  bool    inRedirect;
  int32_t totalTimes;
  int32_t roundTotal;
  int32_t roundTimes;  // retry times in current round
  int64_t startTs;
} SSchRedirectCtx;

typedef struct SSchTimerParam {
dengyihao's avatar
dengyihao 已提交
217 218 219
  int64_t  rId;
  uint64_t queryId;
  uint64_t taskId;
220 221
} SSchTimerParam;

D
dapan 已提交
222
typedef struct SSchTask {
H
Hongze Cheng 已提交
223 224 225 226 227
  uint64_t        taskId;          // task id
  SRWLatch        lock;            // task reentrant lock
  int32_t         maxExecTimes;    // task max exec times
  int32_t         maxRetryTimes;   // task max retry times
  int32_t         retryTimes;      // task retry times
228 229 230
  int32_t         delayExecMs;     // task execution delay time
  tmr_h           delayTimer;      // task delay execution timer
  SSchRedirectCtx redirectCtx;     // task redirect context
H
Hongze Cheng 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
  bool            waitRetry;       // wait for retry
  int32_t         execId;          // task current execute index
  SSchLevel      *level;           // level
  SRWLatch        planLock;        // task update plan lock
  SSubplan       *plan;            // subplan
  char           *msg;             // operator tree
  int32_t         msgLen;          // msg length
  int8_t          status;          // task status
  int32_t         lastMsgType;     // last sent msg type
  int64_t         timeoutUsec;     // task timeout useconds before reschedule
  SQueryNodeAddr  succeedAddr;     // task executed success node address
  int8_t          candidateIdx;    // current try condidation index
  SArray         *candidateAddrs;  // condidate node addresses, element is SQueryNodeAddr
  SHashObj       *execNodes;       // all tried node for current task, element is SSchNodeInfo
  SSchTaskProfile profile;         // task execution profile
  int32_t         childReady;      // child task ready number
  SArray         *children;        // the datasource tasks,from which to fetch the result, element is SQueryTask*
  SArray         *parents;         // the data destination tasks, get data from current task, element is SQueryTask*
  void           *handle;          // task send handle
  bool            registerdHb;     // registered in hb
D
dapan 已提交
251
} SSchTask;
D
dapan1121 已提交
252

D
dapan 已提交
253
typedef struct SSchJobAttr {
D
dapan1121 已提交
254 255
  EExplainMode explainMode;
  bool         queryJob;
D
dapan1121 已提交
256
  bool         insertJob;
D
dapan1121 已提交
257
  bool         needFetch;
D
dapan1121 已提交
258
  bool         needFlowCtrl;
D
dapan1121 已提交
259
  bool         localExec;
D
dapan 已提交
260
} SSchJobAttr;
D
dapan1121 已提交
261

D
dapan1121 已提交
262
typedef struct {
H
Hongze Cheng 已提交
263 264 265
  int32_t  op;
  SRWLatch lock;
  bool     syncReq;
D
dapan1121 已提交
266 267
} SSchOpStatus;

D
dapan 已提交
268
typedef struct SSchJob {
H
Hongze Cheng 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
  int64_t          refId;
  uint64_t         queryId;
  SSchJobAttr      attr;
  int32_t          levelNum;
  int32_t          taskNum;
  SRequestConnInfo conn;
  SArray          *nodeList;  // qnode/vnode list, SArray<SQueryNodeLoad>
  SArray          *levels;    // starting from 0. SArray<SSchLevel>
  SQueryPlan      *pDag;
  int64_t          allocatorRefId;

  SArray   *dataSrcTasks;  // SArray<SQueryTask*>
  int32_t   levelIdx;
  SEpSet    dataSrcEps;
  SHashObj *taskList;
  SHashObj *execTasks;  // executing and executed tasks, key:taskid, value:SQueryTask*
  SHashObj *flowCtrl;   // key is ep, element is SSchFlowControl

  SExplainCtx         *explainCtx;
  int8_t               status;
  SQueryNodeAddr       resNode;
  tsem_t               rspSem;
  SSchOpStatus         opStatus;
  schedulerChkKillFp   chkKillFp;
  void                *chkKillParam;
  SSchTask            *fetchTask;
  int32_t              errCode;
296
  int32_t              redirectCode;
H
Hongze Cheng 已提交
297 298 299 300
  SRWLatch             resLock;
  SExecResult          execRes;
  void                *fetchRes;  // TODO free it or not
  bool                 fetched;
301
  int64_t              resNumOfRows; // from int32_t to int64_t
H
Hongze Cheng 已提交
302 303
  SSchResInfo          userRes;
  char                *sql;
304
  SQueryProfileSummary summary;
D
dapan 已提交
305
} SSchJob;
D
dapan1121 已提交
306

D
dapan1121 已提交
307
typedef struct SSchTaskCtx {
308
  int64_t   jobRid;
D
dapan1121 已提交
309
  SSchTask *pTask;
310
  bool      asyncLaunch;
D
dapan1121 已提交
311 312
} SSchTaskCtx;

D
dapan1121 已提交
313 314
extern SSchedulerMgmt schMgmt;

H
Hongze Cheng 已提交
315 316
#define SCH_TASK_TIMEOUT(_task) \
  ((taosGetTimestampUs() - *(int64_t *)taosArrayGet((_task)->profile.execTime, (_task)->execId)) > (_task)->timeoutUsec)
D
dapan1121 已提交
317

D
dapan1121 已提交
318
#define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
D
dapan1121 已提交
319

H
Hongze Cheng 已提交
320
#define SCH_LOCK_TASK(_task)   SCH_LOCK(SCH_WRITE, &(_task)->lock)
D
dapan1121 已提交
321 322
#define SCH_UNLOCK_TASK(_task) SCH_UNLOCK(SCH_WRITE, &(_task)->lock)

H
Hongze Cheng 已提交
323
#define SCH_TASK_ID(_task)  ((_task) ? (_task)->taskId : -1)
D
dapan1121 已提交
324
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
D
dapan1121 已提交
325

D
dapan1121 已提交
326
#define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
H
Hongze Cheng 已提交
327 328
#define SCH_IS_DATA_BIND_TASK(task) \
  (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
D
dapan1121 已提交
329
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
H
Hongze Cheng 已提交
330 331 332 333
#define SCH_IS_DATA_MERGE_TASK(task)  (!SCH_IS_DATA_BIND_TASK(task))
#define SCH_IS_LOCAL_EXEC_TASK(_job, _task)                                          \
  ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && \
   (!SCH_IS_DATA_BIND_QRY_TASK(_task)))
334

335 336 337
#define SCH_UPDATE_REDICT_CODE(job, _code) atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code)
#define SCH_GET_REDICT_CODE(job, _code) (((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode)

H
Haojun Liao 已提交
338
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
H
Hongze Cheng 已提交
339
#define SCH_GET_TASK_STATUS(task)     atomic_load_8(&(task)->status)
D
dapan1121 已提交
340 341
#define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task))

H
Hongze Cheng 已提交
342
#define SCH_GET_TASK_HANDLE(_task)          ((_task) ? (_task)->handle : NULL)
D
dapan1121 已提交
343
#define SCH_SET_TASK_HANDLE(_task, _handle) ((_task)->handle = (_handle))
D
dapan1121 已提交
344

H
Haojun Liao 已提交
345
#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st)
H
Hongze Cheng 已提交
346
#define SCH_GET_JOB_STATUS(job)     atomic_load_8(&(job)->status)
D
dapan1121 已提交
347
#define SCH_GET_JOB_STATUS_STR(job) jobTaskStatusStr(SCH_GET_JOB_STATUS(job))
D
dapan1121 已提交
348

D
dapan1121 已提交
349
#define SCH_JOB_IN_SYNC_OP(job) ((job)->opStatus.op && (job)->opStatus.syncReq)
H
Hongze Cheng 已提交
350 351 352 353 354 355
#define SCH_JOB_IN_ASYNC_EXEC_OP(job)                                                                \
  ((SCH_OP_EXEC == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_EXEC, SCH_OP_NULL)) && \
   (!(job)->opStatus.syncReq))
#define SCH_JOB_IN_ASYNC_FETCH_OP(job)                                                                 \
  ((SCH_OP_FETCH == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_FETCH, SCH_OP_NULL)) && \
   (!(job)->opStatus.syncReq))
D
dapan1121 已提交
356

D
dapan1121 已提交
357
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
H
Hongze Cheng 已提交
358 359 360 361
#define SCH_JOB_NEED_FLOW_CTRL(_job)     ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) \
  (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_FETCH_TYPE(_pSrcTask)  (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
D
dapan1121 已提交
362
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
D
dapan1121 已提交
363

H
Hongze Cheng 已提交
364 365 366 367 368 369 370 371 372 373
#define SCH_SET_JOB_TYPE(_job, type)     \
  do {                                   \
    if ((type) != SUBPLAN_TYPE_MODIFY) { \
      (_job)->attr.queryJob = true;      \
    } else {                             \
      (_job)->attr.insertJob = true;     \
    }                                    \
  } while (0)
#define SCH_IS_QUERY_JOB(_job)   ((_job)->attr.queryJob)
#define SCH_IS_INSERT_JOB(_job)  ((_job)->attr.insertJob)
D
dapan1121 已提交
374
#define SCH_JOB_NEED_FETCH(_job) ((_job)->attr.needFetch)
H
Hongze Cheng 已提交
375 376
#define SCH_JOB_NEED_WAIT(_job)  (!SCH_IS_QUERY_JOB(_job))
#define SCH_JOB_NEED_DROP(_job)  (SCH_IS_QUERY_JOB(_job))
D
dapan1121 已提交
377
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
H
Hongze Cheng 已提交
378 379
#define SCH_NETWORK_ERR(_code)   ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) \
D
dapan1121 已提交
380
  (SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task)) || (_task)->redirectCtx.inRedirect))
H
Hongze Cheng 已提交
381 382 383 384 385 386 387 388
#define SCH_REDIRECT_MSGTYPE(_msgType)                                                                         \
  ((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || \
   (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_REDIRECT(_task, _msgType, _code, _rspLen) \
  (SCH_REDIRECT_MSGTYPE(_msgType) &&                            \
   (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_MERGE_TASK_NETWORK_ERR((_task), (_code), (_rspLen))))
#define SCH_NEED_RETRY(_msgType, _code) \
  ((SCH_NETWORK_ERR(_code) && SCH_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
D
dapan1121 已提交
389

D
dapan1121 已提交
390
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
H
Hongze Cheng 已提交
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413
#define SCH_GET_CUR_EP(_addr)           (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
#define SCH_SWITCH_EPSET(_addr)         ((_addr)->epSet.inUse = ((_addr)->epSet.inUse + 1) % (_addr)->epSet.numOfEps)
#define SCH_TASK_NUM_OF_EPS(_addr)      ((_addr)->epSet.numOfEps)

#define SCH_LOG_TASK_START_TS(_task)               \
  do {                                             \
    int64_t us = taosGetTimestampUs();             \
    taosArrayPush((_task)->profile.execTime, &us); \
    if (0 == (_task)->execId) {                    \
      (_task)->profile.startTs = us;               \
    }                                              \
  } while (0)

#define SCH_LOG_TASK_WAIT_TS(_task)                                                                         \
  do {                                                                                                      \
    int64_t us = taosGetTimestampUs();                                                                      \
    (_task)->profile.waitTime += us - *(int64_t *)taosArrayGet((_task)->profile.execTime, (_task)->execId); \
  } while (0)

#define SCH_LOG_TASK_END_TS(_task)                                               \
  do {                                                                           \
    int64_t  us = taosGetTimestampUs();                                          \
    int32_t  idx = (_task)->execId % (_task)->maxExecTimes;                      \
D
dapan1121 已提交
414
    int64_t *startts = taosArrayGet((_task)->profile.execTime, (_task)->execId); \
H
Hongze Cheng 已提交
415 416 417
    *startts = us - *startts;                                                    \
    (_task)->profile.endTs = us;                                                 \
  } while (0)
D
dapan1121 已提交
418

H
Haojun Liao 已提交
419 420
#define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
S
Shengliang Guan 已提交
421

H
Hongze Cheng 已提交
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
#define SCH_TASK_ELOG(param, ...)                                                                                     \
  qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
         __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...)                                                                                     \
  qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
         __VA_ARGS__)
#define SCH_TASK_TLOG(param, ...)                                                                                     \
  qTrace("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
         __VA_ARGS__)
#define SCH_TASK_DLOGL(param, ...)                                                                                     \
  qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
          __VA_ARGS__)
#define SCH_TASK_WLOG(param, ...)                                                                                    \
  qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
        __VA_ARGS__)

#define SCH_SET_ERRNO(_err)                     \
  do {                                          \
    if (TSDB_CODE_SCH_IGNORE_ERROR != (_err)) { \
      terrno = (_err);                          \
    }                                           \
  } while (0)
#define SCH_ERR_RET(c)                \
  do {                                \
    int32_t _code = c;                \
    if (_code != TSDB_CODE_SUCCESS) { \
      SCH_SET_ERRNO(_code);           \
      return _code;                   \
    }                                 \
  } while (0)
#define SCH_RET(c)                    \
  do {                                \
    int32_t _code = c;                \
    if (_code != TSDB_CODE_SUCCESS) { \
      SCH_SET_ERRNO(_code);           \
    }                                 \
    return _code;                     \
  } while (0)
#define SCH_ERR_JRET(c)              \
  do {                               \
    code = c;                        \
    if (code != TSDB_CODE_SUCCESS) { \
      SCH_SET_ERRNO(code);           \
      goto _return;                  \
    }                                \
  } while (0)

#define SCH_LOCK_DEBUG(...)     \
  do {                          \
    if (gSCHDebug.lockEnable) { \
      qDebug(__VA_ARGS__);      \
    }                           \
  } while (0)
D
dapan1121 已提交
475 476 477

#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000

H
Hongze Cheng 已提交
478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514
#define SCH_LOCK(type, _lock)                                                                        \
  do {                                                                                               \
    if (SCH_READ == (type)) {                                                                        \
      assert(atomic_load_32(_lock) >= 0);                                                            \
      SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      taosRLockLatch(_lock);                                                                         \
      SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      assert(atomic_load_32(_lock) > 0);                                                             \
    } else {                                                                                         \
      assert(atomic_load_32(_lock) >= 0);                                                            \
      SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      taosWLockLatch(_lock);                                                                         \
      SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      assert(atomic_load_32(_lock) == TD_RWLATCH_WRITE_FLAG_COPY);                                   \
    }                                                                                                \
  } while (0)

#define SCH_UNLOCK(type, _lock)                                                                       \
  do {                                                                                                \
    if (SCH_READ == (type)) {                                                                         \
      assert(atomic_load_32((_lock)) > 0);                                                            \
      SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      taosRUnLockLatch(_lock);                                                                        \
      SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      assert(atomic_load_32((_lock)) >= 0);                                                           \
    } else {                                                                                          \
      assert(atomic_load_32((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY);                                   \
      SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      taosWUnLockLatch(_lock);                                                                        \
      SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      assert(atomic_load_32((_lock)) >= 0);                                                           \
    }                                                                                                 \
  } while (0)

void     schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
void     schCleanClusterHb(void *pTrans);
int32_t  schLaunchTask(SSchJob *job, SSchTask *task);
515
int32_t  schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask);
H
Hongze Cheng 已提交
516
int32_t  schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
D
dapan1121 已提交
517
SSchJob *schAcquireJob(int64_t refId);
H
Hongze Cheng 已提交
518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549
int32_t  schReleaseJob(int64_t refId);
void     schFreeFlowCtrl(SSchJob *pJob);
int32_t  schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
int32_t  schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
int32_t  schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
int32_t  schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
int32_t  schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
int32_t  schLaunchFetchTask(SSchJob *pJob);
int32_t  schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
int32_t  schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction);
int32_t  schCloneSMsgSendInfo(void *src, void **dst);
int32_t  schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob);
void     schFreeJobImpl(void *job);
int32_t  schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx);
int32_t  schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask);
int32_t  schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans);
int32_t  schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code);
void     schFreeRpcCtx(SRpcCtx *pCtx);
int32_t  schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp);
bool     schJobNeedToStop(SSchJob *pJob, int8_t *pStatus);
int32_t  schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask);
int32_t  schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp);
int32_t  schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp);
void     schProcessOnDataFetched(SSchJob *job);
int32_t  schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask);
void     schFreeRpcCtxVal(const void *arg);
int32_t  schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb);
int32_t  schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId);
int32_t  schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync);
int32_t  schUpdateJobStatus(SSchJob *pJob, int8_t newStatus);
int32_t  schCancelJob(SSchJob *pJob);
int32_t  schProcessOnJobDropped(SSchJob *pJob, int32_t errCode);
D
dapan1121 已提交
550
uint64_t schGenTaskId(void);
H
Hongze Cheng 已提交
551 552 553 554 555 556
void     schCloseJobRef(void);
int32_t  schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob);
int32_t  schJobFetchRows(SSchJob *pJob);
int32_t  schJobFetchRowsA(SSchJob *pJob);
int32_t  schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId);
int32_t  schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList);
557
char    *schDumpEpSet(SEpSet *pEpSet);
H
Hongze Cheng 已提交
558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592
char    *schGetOpStr(SCH_OP_TYPE type);
int32_t  schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
int32_t  schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
int32_t  schExecJob(SSchJob *pJob, SSchedulerReq *pReq);
int32_t  schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes);
int32_t  schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet);
int32_t  schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode);
void     schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode);
int32_t  schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq);
void     schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
int32_t  schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_t rId, uint64_t tId);
void     schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask);
bool     schJobDone(SSchJob *pJob);
int32_t  schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask);
int32_t  schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask);
int32_t  schSwitchJobStatus(SSchJob *pJob, int32_t status, void *param);
int32_t  schHandleOpBeginEvent(int64_t jobId, SSchJob **job, SCH_OP_TYPE type, SSchedulerReq *pReq);
int32_t  schHandleOpEndEvent(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode);
int32_t  schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask);
void     schUpdateJobErrCode(SSchJob *pJob, int32_t errCode);
int32_t  schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry);
int32_t  schProcessOnJobFailure(SSchJob *pJob, int32_t errCode);
int32_t  schProcessOnJobPartialSuccess(SSchJob *pJob);
void     schFreeTask(SSchJob *pJob, SSchTask *pTask);
void     schDropTaskInHashList(SSchJob *pJob, SHashObj *list);
int32_t  schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
int32_t  schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
int32_t  schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel);
int32_t  schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
void     schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode);
int32_t  schHandleJobFailure(SSchJob *pJob, int32_t errCode);
int32_t  schHandleJobDrop(SSchJob *pJob, int32_t errCode);
bool     schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync);
int32_t  schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode);
int32_t  schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp);
D
dapan1121 已提交
593

D
dapan1121 已提交
594 595
extern SSchDebug gSCHDebug;

H
refact  
Hongze Cheng 已提交
596 597 598 599
#ifdef __cplusplus
}
#endif

D
dapan1121 已提交
600
#endif /*_TD_SCHEDULER_INT_H_*/