schInt.h 27.3 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_SCHEDULER_INT_H_
#define _TD_SCHEDULER_INT_H_

#ifdef __cplusplus
extern "C" {
#endif

H
Hongze Cheng 已提交
23
#include "command.h"
24 25 26
#include "os.h"
#include "planner.h"
#include "scheduler.h"
H
Hongze Cheng 已提交
27
#include "tarray.h"
28
#include "thash.h"
D
dapan1121 已提交
29
#include "trpc.h"
30
#include "ttimer.h"
31

D
dapan1121 已提交
32 33 34 35 36
enum {
  SCH_READ = 1,
  SCH_WRITE,
};

D
dapan1121 已提交
37 38 39 40 41
enum {
  SCH_EXEC_CB = 1,
  SCH_FETCH_CB,
};

D
dapan1121 已提交
42 43 44 45
typedef enum {
  SCH_OP_NULL = 0,
  SCH_OP_EXEC,
  SCH_OP_FETCH,
D
dapan1121 已提交
46
  SCH_OP_GET_STATUS,
D
dapan1121 已提交
47 48
} SCH_OP_TYPE;

D
dapan1121 已提交
49 50 51 52 53 54
typedef enum {
  SCH_LOAD_SEQ = 1,
  SCH_RANDOM,
  SCH_ALL,
} SCH_POLICY;

H
Hongze Cheng 已提交
55 56 57 58
#define SCHEDULE_DEFAULT_MAX_JOB_NUM        1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM       1000
#define SCHEDULE_DEFAULT_POLICY             SCH_LOAD_SEQ
#define SCHEDULE_DEFAULT_MAX_NODE_NUM       20
D
dapan1121 已提交
59

D
dapan1121 已提交
60
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 5000000
D
dapan1121 已提交
61
#define SCH_MAX_TASK_TIMEOUT_USEC     300000000
H
Hongze Cheng 已提交
62 63
#define SCH_DEFAULT_MAX_RETRY_NUM     6
#define SCH_MIN_AYSNC_EXEC_NUM        3
D
dapan1121 已提交
64

D
dapan1121 已提交
65
typedef struct SSchDebug {
H
Hongze Cheng 已提交
66 67
  bool lockEnable;
  bool apiEnable;
D
dapan1121 已提交
68 69
} SSchDebug;

D
dapan1121 已提交
70
typedef struct SSchTrans {
D
dapan1121 已提交
71 72
  void *pTrans;
  void *pHandle;
D
dapan1121 已提交
73 74
} SSchTrans;

D
dapan1121 已提交
75 76
typedef struct SSchHbTrans {
  SRWLatch  lock;
D
dapan1121 已提交
77
  int64_t   taskNum;
D
dapan1121 已提交
78
  SRpcCtx   rpcCtx;
D
dapan1121 已提交
79 80 81
  SSchTrans trans;
} SSchHbTrans;

D
dapan1121 已提交
82
typedef struct SSchApiStat {
wafwerar's avatar
wafwerar 已提交
83
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
wafwerar's avatar
wafwerar 已提交
84 85 86
  size_t avoidCompilationErrors;
#endif

D
dapan1121 已提交
87 88 89
} SSchApiStat;

typedef struct SSchRuntimeStat {
wafwerar's avatar
wafwerar 已提交
90
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
wafwerar's avatar
wafwerar 已提交
91 92 93
  size_t avoidCompilationErrors;
#endif

D
dapan1121 已提交
94 95 96
} SSchRuntimeStat;

typedef struct SSchJobStat {
wafwerar's avatar
wafwerar 已提交
97
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
wafwerar's avatar
wafwerar 已提交
98 99 100
  size_t avoidCompilationErrors;
#endif

D
dapan1121 已提交
101 102
} SSchJobStat;

D
dapan1121 已提交
103
typedef struct SSchStat {
H
Hongze Cheng 已提交
104 105 106
  SSchApiStat     api;
  SSchRuntimeStat runtime;
  SSchJobStat     job;
D
dapan1121 已提交
107
} SSchStat;
D
dapan1121 已提交
108

D
dapan1121 已提交
109
typedef struct SSchResInfo {
H
Hongze Cheng 已提交
110 111 112 113 114
  SExecResult     *execRes;
  void           **fetchRes;
  schedulerExecFp  execFp;
  schedulerFetchFp fetchFp;
  void            *cbParam;
D
dapan1121 已提交
115
} SSchResInfo;
D
dapan1121 已提交
116

D
dapan1121 已提交
117 118 119 120 121 122
typedef struct SSchOpEvent {
  SCH_OP_TYPE    type;
  bool           begin;
  SSchedulerReq *pReq;
} SSchOpEvent;

H
Hongze Cheng 已提交
123 124 125
typedef int32_t (*schStatusEnterFp)(void *pHandle, void *pParam);
typedef int32_t (*schStatusLeaveFp)(void *pHandle, void *pParam);
typedef int32_t (*schStatusEventFp)(void *pHandle, void *pParam, void *pEvent);
D
dapan1121 已提交
126 127 128 129 130 131 132 133

typedef struct SSchStatusFps {
  EJobTaskType     status;
  schStatusEnterFp enterFp;
  schStatusLeaveFp leaveFp;
  schStatusEventFp eventFp;
} SSchStatusFps;

D
dapan1121 已提交
134 135
typedef struct SSchedulerCfg {
  uint32_t   maxJobNum;
136
  int64_t    maxNodeTableNum;
D
dapan1121 已提交
137 138 139 140
  SCH_POLICY schPolicy;
  bool       enableReSchedule;
} SSchedulerCfg;

141
typedef struct SSchedulerMgmt {
H
Hongze Cheng 已提交
142 143 144 145 146 147
  uint64_t      taskId;  // sequential taksId
  uint64_t      sId;     // schedulerId
  SSchedulerCfg cfg;
  bool          exit;
  int32_t       jobRef;
  int32_t       jobNum;
dengyihao's avatar
dengyihao 已提交
148
  SSchStat      stat;
149
  void         *timer;
H
Hongze Cheng 已提交
150 151 152
  SRWLatch      hbLock;
  SHashObj     *hbConnections;
  void         *queryMgmt;
153
} SSchedulerMgmt;
154

D
dapan1121 已提交
155 156 157 158 159 160 161 162 163
typedef struct SSchCallbackParamHeader {
  bool isHbParam;
} SSchCallbackParamHeader;

typedef struct SSchTaskCallbackParam {
  SSchCallbackParamHeader head;
  uint64_t                queryId;
  int64_t                 refId;
  uint64_t                taskId;
D
dapan1121 已提交
164
  int32_t                 execId;
D
dapan1121 已提交
165
  void                   *pTrans;
D
dapan1121 已提交
166 167 168 169 170
} SSchTaskCallbackParam;

typedef struct SSchHbCallbackParam {
  SSchCallbackParamHeader head;
  SQueryNodeEpId          nodeEpId;
D
dapan1121 已提交
171
  void                   *pTrans;
D
dapan1121 已提交
172
} SSchHbCallbackParam;
D
dapan1121 已提交
173

D
dapan1121 已提交
174
typedef struct SSchFlowControl {
H
Hongze Cheng 已提交
175 176
  SRWLatch lock;
  bool     sorted;
177
  int64_t  tableNumSum;
H
Hongze Cheng 已提交
178 179
  uint32_t execTaskNum;
  SArray  *taskList;  // Element is SSchTask*
D
dapan1121 已提交
180 181
} SSchFlowControl;

D
dapan1121 已提交
182 183 184 185 186
typedef struct SSchNodeInfo {
  SQueryNodeAddr addr;
  void          *handle;
} SSchNodeInfo;

D
dapan 已提交
187
typedef struct SSchLevel {
H
Hongze Cheng 已提交
188 189 190 191 192 193 194
  int32_t  level;
  int8_t   status;
  SRWLatch lock;
  int32_t  taskFailed;
  int32_t  taskSucceed;
  int32_t  taskNum;
  int32_t  taskLaunchedNum;
D
dapan1121 已提交
195
  int32_t  taskExecDoneNum;
H
Hongze Cheng 已提交
196
  SArray  *subTasks;  // Element is SSchTask
D
dapan 已提交
197
} SSchLevel;
D
dapan1121 已提交
198

D
dapan1121 已提交
199
typedef struct SSchTaskProfile {
H
Hongze Cheng 已提交
200 201 202 203
  int64_t startTs;
  SArray *execTime;
  int64_t waitTime;
  int64_t endTs;
D
dapan1121 已提交
204 205
} SSchTaskProfile;

206 207 208 209 210 211 212 213 214 215
typedef struct SSchRedirectCtx {
  int32_t periodMs;
  bool    inRedirect;
  int32_t totalTimes;
  int32_t roundTotal;
  int32_t roundTimes;  // retry times in current round
  int64_t startTs;
} SSchRedirectCtx;

typedef struct SSchTimerParam {
dengyihao's avatar
dengyihao 已提交
216 217 218
  int64_t  rId;
  uint64_t queryId;
  uint64_t taskId;
219 220
} SSchTimerParam;

D
dapan 已提交
221
typedef struct SSchTask {
H
Hongze Cheng 已提交
222 223 224 225 226
  uint64_t        taskId;          // task id
  SRWLatch        lock;            // task reentrant lock
  int32_t         maxExecTimes;    // task max exec times
  int32_t         maxRetryTimes;   // task max retry times
  int32_t         retryTimes;      // task retry times
227 228 229
  int32_t         delayExecMs;     // task execution delay time
  tmr_h           delayTimer;      // task delay execution timer
  SSchRedirectCtx redirectCtx;     // task redirect context
H
Hongze Cheng 已提交
230 231
  bool            waitRetry;       // wait for retry
  int32_t         execId;          // task current execute index
232
  int32_t         failedExecId;    // last failed task execute index
H
Hongze Cheng 已提交
233 234 235 236 237 238 239 240 241
  SSchLevel      *level;           // level
  SRWLatch        planLock;        // task update plan lock
  SSubplan       *plan;            // subplan
  char           *msg;             // operator tree
  int32_t         msgLen;          // msg length
  int8_t          status;          // task status
  int32_t         lastMsgType;     // last sent msg type
  int64_t         timeoutUsec;     // task timeout useconds before reschedule
  SQueryNodeAddr  succeedAddr;     // task executed success node address
D
dapan1121 已提交
242
  int32_t         candidateIdx;    // current try condidation index
H
Hongze Cheng 已提交
243 244 245 246 247 248 249 250
  SArray         *candidateAddrs;  // condidate node addresses, element is SQueryNodeAddr
  SHashObj       *execNodes;       // all tried node for current task, element is SSchNodeInfo
  SSchTaskProfile profile;         // task execution profile
  int32_t         childReady;      // child task ready number
  SArray         *children;        // the datasource tasks,from which to fetch the result, element is SQueryTask*
  SArray         *parents;         // the data destination tasks, get data from current task, element is SQueryTask*
  void           *handle;          // task send handle
  bool            registerdHb;     // registered in hb
D
dapan 已提交
251
} SSchTask;
D
dapan1121 已提交
252

D
dapan 已提交
253
typedef struct SSchJobAttr {
D
dapan1121 已提交
254 255
  EExplainMode explainMode;
  bool         queryJob;
D
dapan1121 已提交
256
  bool         insertJob;
D
dapan1121 已提交
257
  bool         needFetch;
D
dapan1121 已提交
258
  bool         needFlowCtrl;
D
dapan1121 已提交
259
  bool         localExec;
D
dapan 已提交
260
} SSchJobAttr;
D
dapan1121 已提交
261

D
dapan1121 已提交
262
typedef struct {
H
Hongze Cheng 已提交
263 264 265
  int32_t  op;
  SRWLatch lock;
  bool     syncReq;
D
dapan1121 已提交
266 267
} SSchOpStatus;

D
dapan 已提交
268
typedef struct SSchJob {
H
Hongze Cheng 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
  int64_t          refId;
  uint64_t         queryId;
  SSchJobAttr      attr;
  int32_t          levelNum;
  int32_t          taskNum;
  SRequestConnInfo conn;
  SArray          *nodeList;  // qnode/vnode list, SArray<SQueryNodeLoad>
  SArray          *levels;    // starting from 0. SArray<SSchLevel>
  SQueryPlan      *pDag;
  int64_t          allocatorRefId;

  SArray   *dataSrcTasks;  // SArray<SQueryTask*>
  int32_t   levelIdx;
  SEpSet    dataSrcEps;
  SHashObj *taskList;
  SHashObj *execTasks;  // executing and executed tasks, key:taskid, value:SQueryTask*
  SHashObj *flowCtrl;   // key is ep, element is SSchFlowControl

  SExplainCtx         *explainCtx;
  int8_t               status;
  SQueryNodeAddr       resNode;
  tsem_t               rspSem;
  SSchOpStatus         opStatus;
  schedulerChkKillFp   chkKillFp;
  void                *chkKillParam;
  SSchTask            *fetchTask;
  int32_t              errCode;
296
  int32_t              redirectCode;
H
Hongze Cheng 已提交
297 298 299 300
  SRWLatch             resLock;
  SExecResult          execRes;
  void                *fetchRes;  // TODO free it or not
  bool                 fetched;
D
dapan1121 已提交
301
  bool                 noMoreRetry;
302
  int64_t              resNumOfRows; // from int32_t to int64_t
H
Hongze Cheng 已提交
303 304
  SSchResInfo          userRes;
  char                *sql;
305
  SQueryProfileSummary summary;
D
dapan 已提交
306
} SSchJob;
D
dapan1121 已提交
307

D
dapan1121 已提交
308
typedef struct SSchTaskCtx {
309
  int64_t   jobRid;
D
dapan1121 已提交
310
  SSchTask *pTask;
311
  bool      asyncLaunch;
D
dapan1121 已提交
312 313
} SSchTaskCtx;

D
dapan1121 已提交
314 315
extern SSchedulerMgmt schMgmt;

H
Hongze Cheng 已提交
316 317
#define SCH_TASK_TIMEOUT(_task) \
  ((taosGetTimestampUs() - *(int64_t *)taosArrayGet((_task)->profile.execTime, (_task)->execId)) > (_task)->timeoutUsec)
D
dapan1121 已提交
318

D
dapan1121 已提交
319
#define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
D
dapan1121 已提交
320

H
Hongze Cheng 已提交
321
#define SCH_LOCK_TASK(_task)   SCH_LOCK(SCH_WRITE, &(_task)->lock)
D
dapan1121 已提交
322 323
#define SCH_UNLOCK_TASK(_task) SCH_UNLOCK(SCH_WRITE, &(_task)->lock)

H
Hongze Cheng 已提交
324
#define SCH_TASK_ID(_task)  ((_task) ? (_task)->taskId : -1)
D
dapan1121 已提交
325
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
D
dapan1121 已提交
326

D
dapan1121 已提交
327
#define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
H
Hongze Cheng 已提交
328 329
#define SCH_IS_DATA_BIND_TASK(task) \
  (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
D
dapan1121 已提交
330
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
H
Hongze Cheng 已提交
331 332 333 334
#define SCH_IS_DATA_MERGE_TASK(task)  (!SCH_IS_DATA_BIND_TASK(task))
#define SCH_IS_LOCAL_EXEC_TASK(_job, _task)                                          \
  ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && \
   (!SCH_IS_DATA_BIND_QRY_TASK(_task)))
335

D
dapan1121 已提交
336 337
#define SCH_UPDATE_REDIRECT_CODE(job, _code) atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code)
#define SCH_GET_REDIRECT_CODE(job, _code) (((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode)
338

H
Haojun Liao 已提交
339
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
H
Hongze Cheng 已提交
340
#define SCH_GET_TASK_STATUS(task)     atomic_load_8(&(task)->status)
D
dapan1121 已提交
341 342
#define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task))

D
dapan1121 已提交
343 344 345
#define SCH_TASK_ALREADY_LAUNCHED(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_EXEC)
#define SCH_TASK_EXEC_DONE(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_PART_SUCC)

H
Hongze Cheng 已提交
346
#define SCH_GET_TASK_HANDLE(_task)          ((_task) ? (_task)->handle : NULL)
D
dapan1121 已提交
347
#define SCH_SET_TASK_HANDLE(_task, _handle) ((_task)->handle = (_handle))
D
dapan1121 已提交
348

H
Haojun Liao 已提交
349
#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st)
H
Hongze Cheng 已提交
350
#define SCH_GET_JOB_STATUS(job)     atomic_load_8(&(job)->status)
D
dapan1121 已提交
351
#define SCH_GET_JOB_STATUS_STR(job) jobTaskStatusStr(SCH_GET_JOB_STATUS(job))
D
dapan1121 已提交
352

D
dapan1121 已提交
353
#define SCH_JOB_IN_SYNC_OP(job) ((job)->opStatus.op && (job)->opStatus.syncReq)
H
Hongze Cheng 已提交
354 355 356 357 358 359
#define SCH_JOB_IN_ASYNC_EXEC_OP(job)                                                                \
  ((SCH_OP_EXEC == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_EXEC, SCH_OP_NULL)) && \
   (!(job)->opStatus.syncReq))
#define SCH_JOB_IN_ASYNC_FETCH_OP(job)                                                                 \
  ((SCH_OP_FETCH == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_FETCH, SCH_OP_NULL)) && \
   (!(job)->opStatus.syncReq))
D
dapan1121 已提交
360

D
dapan1121 已提交
361
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
H
Hongze Cheng 已提交
362 363 364 365
#define SCH_JOB_NEED_FLOW_CTRL(_job)     ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) \
  (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_FETCH_TYPE(_pSrcTask)  (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
D
dapan1121 已提交
366
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
D
dapan1121 已提交
367
#define SCH_MULTI_LEVEL_LAUNCHED(_job) ((_job)->levelIdx != ((_job)->levelNum - 1))
D
dapan1121 已提交
368

H
Hongze Cheng 已提交
369 370 371 372 373 374 375 376 377 378
#define SCH_SET_JOB_TYPE(_job, type)     \
  do {                                   \
    if ((type) != SUBPLAN_TYPE_MODIFY) { \
      (_job)->attr.queryJob = true;      \
    } else {                             \
      (_job)->attr.insertJob = true;     \
    }                                    \
  } while (0)
#define SCH_IS_QUERY_JOB(_job)   ((_job)->attr.queryJob)
#define SCH_IS_INSERT_JOB(_job)  ((_job)->attr.insertJob)
D
dapan1121 已提交
379
#define SCH_JOB_NEED_FETCH(_job) ((_job)->attr.needFetch)
H
Hongze Cheng 已提交
380 381
#define SCH_JOB_NEED_WAIT(_job)  (!SCH_IS_QUERY_JOB(_job))
#define SCH_JOB_NEED_DROP(_job)  (SCH_IS_QUERY_JOB(_job))
D
dapan1121 已提交
382
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
A
Alex Duan 已提交
383
#define SCH_NETWORK_ERR(_code)   ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED)
H
Hongze Cheng 已提交
384 385 386
#define SCH_REDIRECT_MSGTYPE(_msgType)                                                                         \
  ((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || \
   (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
D
dapan1121 已提交
387 388 389 390 391 392 393 394 395 396 397 398 399 400
#define SCH_LOW_LEVEL_NETWORK_ERR(_job, _task, _code) \
    (SCH_NETWORK_ERR(_code) && ((_task)->level->level == (_job)->levelIdx))
#define SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code) \
    (SCH_NETWORK_ERR(_code) && ((_task)->level->level > (_job)->levelIdx))
#define SCH_TASK_RETRY_NETWORK_ERR(_task, _code) \
    (SCH_NETWORK_ERR(_code) && (_task)->redirectCtx.inRedirect)

#define SCH_JOB_NEED_RETRY(_job, _task, _msgType, _code)      \
   (SCH_REDIRECT_MSGTYPE(_msgType) && SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code))
#define SCH_TASKSET_NEED_RETRY(_job, _task, _msgType, _code) \
   (SCH_REDIRECT_MSGTYPE(_msgType) &&                         \
   (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_LOW_LEVEL_NETWORK_ERR((_job), (_task), (_code)) || SCH_TASK_RETRY_NETWORK_ERR((_task), (_code))))
#define SCH_TASK_NEED_RETRY(_msgType, _code) \
   ((SCH_REDIRECT_MSGTYPE(_msgType) && SCH_NETWORK_ERR(_code)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
D
dapan1121 已提交
401

D
dapan1121 已提交
402

D
dapan1121 已提交
403
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
H
Hongze Cheng 已提交
404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426
#define SCH_GET_CUR_EP(_addr)           (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
#define SCH_SWITCH_EPSET(_addr)         ((_addr)->epSet.inUse = ((_addr)->epSet.inUse + 1) % (_addr)->epSet.numOfEps)
#define SCH_TASK_NUM_OF_EPS(_addr)      ((_addr)->epSet.numOfEps)

#define SCH_LOG_TASK_START_TS(_task)               \
  do {                                             \
    int64_t us = taosGetTimestampUs();             \
    taosArrayPush((_task)->profile.execTime, &us); \
    if (0 == (_task)->execId) {                    \
      (_task)->profile.startTs = us;               \
    }                                              \
  } while (0)

#define SCH_LOG_TASK_WAIT_TS(_task)                                                                         \
  do {                                                                                                      \
    int64_t us = taosGetTimestampUs();                                                                      \
    (_task)->profile.waitTime += us - *(int64_t *)taosArrayGet((_task)->profile.execTime, (_task)->execId); \
  } while (0)

#define SCH_LOG_TASK_END_TS(_task)                                               \
  do {                                                                           \
    int64_t  us = taosGetTimestampUs();                                          \
    int32_t  idx = (_task)->execId % (_task)->maxExecTimes;                      \
D
dapan1121 已提交
427
    int64_t *startts = taosArrayGet((_task)->profile.execTime, (_task)->execId); \
H
Hongze Cheng 已提交
428 429 430
    *startts = us - *startts;                                                    \
    (_task)->profile.endTs = us;                                                 \
  } while (0)
D
dapan1121 已提交
431

H
Haojun Liao 已提交
432 433
#define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
S
Shengliang Guan 已提交
434

H
Hongze Cheng 已提交
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
#define SCH_TASK_ELOG(param, ...)                                                                                     \
  qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
         __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...)                                                                                     \
  qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
         __VA_ARGS__)
#define SCH_TASK_TLOG(param, ...)                                                                                     \
  qTrace("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
         __VA_ARGS__)
#define SCH_TASK_DLOGL(param, ...)                                                                                     \
  qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
          __VA_ARGS__)
#define SCH_TASK_WLOG(param, ...)                                                                                    \
  qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
        __VA_ARGS__)

#define SCH_SET_ERRNO(_err)                     \
  do {                                          \
    if (TSDB_CODE_SCH_IGNORE_ERROR != (_err)) { \
      terrno = (_err);                          \
    }                                           \
  } while (0)
#define SCH_ERR_RET(c)                \
  do {                                \
    int32_t _code = c;                \
    if (_code != TSDB_CODE_SUCCESS) { \
      SCH_SET_ERRNO(_code);           \
      return _code;                   \
    }                                 \
  } while (0)
#define SCH_RET(c)                    \
  do {                                \
    int32_t _code = c;                \
    if (_code != TSDB_CODE_SUCCESS) { \
      SCH_SET_ERRNO(_code);           \
    }                                 \
    return _code;                     \
  } while (0)
#define SCH_ERR_JRET(c)              \
  do {                               \
    code = c;                        \
    if (code != TSDB_CODE_SUCCESS) { \
      SCH_SET_ERRNO(code);           \
      goto _return;                  \
    }                                \
  } while (0)

#define SCH_LOCK_DEBUG(...)     \
  do {                          \
    if (gSCHDebug.lockEnable) { \
      qDebug(__VA_ARGS__);      \
    }                           \
  } while (0)
D
dapan1121 已提交
488 489 490

#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000

H
Hongze Cheng 已提交
491 492 493
#define SCH_LOCK(type, _lock)                                                                        \
  do {                                                                                               \
    if (SCH_READ == (type)) {                                                                        \
D
dapan1121 已提交
494
      ASSERTS(atomic_load_32(_lock) >= 0, "invalid lock value before read lock");                    \
H
Hongze Cheng 已提交
495 496 497
      SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      taosRLockLatch(_lock);                                                                         \
      SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
D
dapan1121 已提交
498
      ASSERTS(atomic_load_32(_lock) > 0, "invalid lock value after read lock");                      \
H
Hongze Cheng 已提交
499
    } else {                                                                                         \
D
dapan1121 已提交
500
      ASSERTS(atomic_load_32(_lock) >= 0, "invalid lock value before write lock");                   \
H
Hongze Cheng 已提交
501 502 503
      SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      taosWLockLatch(_lock);                                                                         \
      SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
D
dapan1121 已提交
504
      ASSERTS(atomic_load_32(_lock) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock");  \
H
Hongze Cheng 已提交
505 506 507 508 509 510
    }                                                                                                \
  } while (0)

#define SCH_UNLOCK(type, _lock)                                                                       \
  do {                                                                                                \
    if (SCH_READ == (type)) {                                                                         \
D
dapan1121 已提交
511
      ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock");                  \
H
Hongze Cheng 已提交
512 513 514
      SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      taosRUnLockLatch(_lock);                                                                        \
      SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
D
dapan1121 已提交
515
      ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock");                  \
H
Hongze Cheng 已提交
516
    } else {                                                                                          \
D
dapan1121 已提交
517
      ASSERTS(atomic_load_32((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock");  \
H
Hongze Cheng 已提交
518 519 520
      SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      taosWUnLockLatch(_lock);                                                                        \
      SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
D
dapan1121 已提交
521
      ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock");                 \
H
Hongze Cheng 已提交
522 523 524
    }                                                                                                 \
  } while (0)

D
dapan1121 已提交
525 526 527 528 529
#define SCH_RESET_JOB_LEVEL_IDX(_job) do {                        \
  (_job)->levelIdx = (_job)->levelNum - 1;                       \
  SCH_JOB_DLOG("set job levelIdx to %d", (_job)->levelIdx);       \
} while (0)

H
Hongze Cheng 已提交
530 531 532
void     schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
void     schCleanClusterHb(void *pTrans);
int32_t  schLaunchTask(SSchJob *job, SSchTask *task);
533
int32_t  schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask);
H
Hongze Cheng 已提交
534
int32_t  schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
D
dapan1121 已提交
535
SSchJob *schAcquireJob(int64_t refId);
H
Hongze Cheng 已提交
536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567
int32_t  schReleaseJob(int64_t refId);
void     schFreeFlowCtrl(SSchJob *pJob);
int32_t  schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
int32_t  schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
int32_t  schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
int32_t  schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
int32_t  schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
int32_t  schLaunchFetchTask(SSchJob *pJob);
int32_t  schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
int32_t  schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction);
int32_t  schCloneSMsgSendInfo(void *src, void **dst);
int32_t  schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob);
void     schFreeJobImpl(void *job);
int32_t  schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx);
int32_t  schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask);
int32_t  schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans);
int32_t  schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code);
void     schFreeRpcCtx(SRpcCtx *pCtx);
int32_t  schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp);
bool     schJobNeedToStop(SSchJob *pJob, int8_t *pStatus);
int32_t  schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask);
int32_t  schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp);
int32_t  schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp);
void     schProcessOnDataFetched(SSchJob *job);
int32_t  schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask);
void     schFreeRpcCtxVal(const void *arg);
int32_t  schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb);
int32_t  schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId);
int32_t  schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync);
int32_t  schUpdateJobStatus(SSchJob *pJob, int8_t newStatus);
int32_t  schCancelJob(SSchJob *pJob);
int32_t  schProcessOnJobDropped(SSchJob *pJob, int32_t errCode);
D
dapan1121 已提交
568
uint64_t schGenTaskId(void);
H
Hongze Cheng 已提交
569 570 571 572 573 574
void     schCloseJobRef(void);
int32_t  schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob);
int32_t  schJobFetchRows(SSchJob *pJob);
int32_t  schJobFetchRowsA(SSchJob *pJob);
int32_t  schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId);
int32_t  schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList);
575
char    *schDumpEpSet(SEpSet *pEpSet);
H
Hongze Cheng 已提交
576 577 578 579 580 581
char    *schGetOpStr(SCH_OP_TYPE type);
int32_t  schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
int32_t  schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
int32_t  schExecJob(SSchJob *pJob, SSchedulerReq *pReq);
int32_t  schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes);
int32_t  schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet);
D
dapan1121 已提交
582
int32_t  schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode);
H
Hongze Cheng 已提交
583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610
void     schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode);
int32_t  schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq);
void     schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
int32_t  schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_t rId, uint64_t tId);
void     schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask);
bool     schJobDone(SSchJob *pJob);
int32_t  schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask);
int32_t  schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask);
int32_t  schSwitchJobStatus(SSchJob *pJob, int32_t status, void *param);
int32_t  schHandleOpBeginEvent(int64_t jobId, SSchJob **job, SCH_OP_TYPE type, SSchedulerReq *pReq);
int32_t  schHandleOpEndEvent(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode);
int32_t  schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask);
void     schUpdateJobErrCode(SSchJob *pJob, int32_t errCode);
int32_t  schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry);
int32_t  schProcessOnJobFailure(SSchJob *pJob, int32_t errCode);
int32_t  schProcessOnJobPartialSuccess(SSchJob *pJob);
void     schFreeTask(SSchJob *pJob, SSchTask *pTask);
void     schDropTaskInHashList(SSchJob *pJob, SHashObj *list);
int32_t  schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
int32_t  schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
int32_t  schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel);
int32_t  schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
void     schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode);
int32_t  schHandleJobFailure(SSchJob *pJob, int32_t errCode);
int32_t  schHandleJobDrop(SSchJob *pJob, int32_t errCode);
bool     schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync);
int32_t  schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode);
int32_t  schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp);
D
dapan1121 已提交
611 612 613 614
int32_t  schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode);
int32_t  schChkResetJobRetry(SSchJob *pJob, int32_t rspCode);
void     schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask);
int32_t  schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode);
D
dapan1121 已提交
615

D
dapan1121 已提交
616 617
extern SSchDebug gSCHDebug;

H
refact  
Hongze Cheng 已提交
618 619 620 621
#ifdef __cplusplus
}
#endif

D
dapan1121 已提交
622
#endif /*_TD_SCHEDULER_INT_H_*/