schInt.h 20.6 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_SCHEDULER_INT_H_
#define _TD_SCHEDULER_INT_H_

#ifdef __cplusplus
extern "C" {
#endif

23 24 25 26
#include "os.h"
#include "tarray.h"
#include "planner.h"
#include "scheduler.h"
27
#include "thash.h"
D
dapan1121 已提交
28
#include "trpc.h"
D
dapan1121 已提交
29
#include "command.h"
30

D
dapan1121 已提交
31 32
#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
D
dapan1121 已提交
33
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200  // unit is TSDB_TABLE_NUM_UNIT
34

D
dapan1121 已提交
35 36 37
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000

38
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
D
dapan 已提交
39

D
dapan1121 已提交
40 41 42 43 44
enum {
  SCH_READ = 1,
  SCH_WRITE,
};

D
dapan1121 已提交
45 46 47 48 49
enum {
  SCH_EXEC_CB = 1,
  SCH_FETCH_CB,
};

D
dapan1121 已提交
50 51 52 53
typedef enum {
  SCH_OP_NULL = 0,
  SCH_OP_EXEC,
  SCH_OP_FETCH,
D
dapan1121 已提交
54
  SCH_OP_GET_STATUS,
D
dapan1121 已提交
55 56
} SCH_OP_TYPE;

D
dapan1121 已提交
57 58 59 60 61
typedef struct SSchDebug {
  bool     lockEnable;
  bool     apiEnable;
} SSchDebug;

D
dapan1121 已提交
62
typedef struct SSchTrans {
D
dapan1121 已提交
63 64
  void *pTrans;
  void *pHandle;
D
dapan1121 已提交
65 66
} SSchTrans;

D
dapan1121 已提交
67 68
typedef struct SSchHbTrans {
  SRWLatch  lock;
D
dapan1121 已提交
69
  int64_t   taskNum;
D
dapan1121 已提交
70
  SRpcCtx   rpcCtx;
D
dapan1121 已提交
71 72 73
  SSchTrans trans;
} SSchHbTrans;

D
dapan1121 已提交
74 75
typedef struct SSchApiStat {

wafwerar's avatar
wafwerar 已提交
76
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
wafwerar's avatar
wafwerar 已提交
77 78 79
  size_t avoidCompilationErrors;
#endif

D
dapan1121 已提交
80 81 82 83
} SSchApiStat;

typedef struct SSchRuntimeStat {

wafwerar's avatar
wafwerar 已提交
84
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
wafwerar's avatar
wafwerar 已提交
85 86 87
  size_t avoidCompilationErrors;
#endif

D
dapan1121 已提交
88 89 90 91
} SSchRuntimeStat;

typedef struct SSchJobStat {

wafwerar's avatar
wafwerar 已提交
92
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
wafwerar's avatar
wafwerar 已提交
93 94 95
  size_t avoidCompilationErrors;
#endif

D
dapan1121 已提交
96 97
} SSchJobStat;

D
dapan1121 已提交
98
typedef struct SSchStat {
D
dapan1121 已提交
99 100 101
  SSchApiStat      api;
  SSchRuntimeStat  runtime;
  SSchJobStat      job;
D
dapan1121 已提交
102
} SSchStat;
D
dapan1121 已提交
103

D
dapan1121 已提交
104
typedef struct SSchResInfo {
D
dapan1121 已提交
105
  SExecResult*           execRes;
D
dapan1121 已提交
106
  void**                 fetchRes;
D
dapan1121 已提交
107 108
  schedulerExecFp        execFp; 
  schedulerFetchFp       fetchFp; 
D
dapan1121 已提交
109
  void*                  cbParam;
D
dapan1121 已提交
110
} SSchResInfo;
D
dapan1121 已提交
111

D
dapan1121 已提交
112 113 114 115 116 117
typedef struct SSchOpEvent {
  SCH_OP_TYPE    type;
  bool           begin;
  SSchedulerReq *pReq;
} SSchOpEvent;

D
dapan1121 已提交
118 119 120 121 122 123 124 125 126 127 128
typedef int32_t (*schStatusEnterFp)(void* pHandle, void* pParam);
typedef int32_t (*schStatusLeaveFp)(void* pHandle, void* pParam);
typedef int32_t (*schStatusEventFp)(void* pHandle, void* pParam, void* pEvent);

typedef struct SSchStatusFps {
  EJobTaskType     status;
  schStatusEnterFp enterFp;
  schStatusLeaveFp leaveFp;
  schStatusEventFp eventFp;
} SSchStatusFps;

129
typedef struct SSchedulerMgmt {
D
dapan1121 已提交
130 131 132
  uint64_t        taskId; // sequential taksId
  uint64_t        sId;    // schedulerId
  SSchedulerCfg   cfg;
D
dapan1121 已提交
133
  bool            exit;
D
dapan1121 已提交
134
  int32_t         jobRef;
D
dapan1121 已提交
135
  int32_t         jobNum;
D
dapan1121 已提交
136
  SSchStat        stat;
D
dapan1121 已提交
137
  SRWLatch        hbLock;
D
dapan1121 已提交
138
  SHashObj       *hbConnections;
139
} SSchedulerMgmt;
140

D
dapan1121 已提交
141 142 143 144 145 146 147 148 149
typedef struct SSchCallbackParamHeader {
  bool isHbParam;
} SSchCallbackParamHeader;

typedef struct SSchTaskCallbackParam {
  SSchCallbackParamHeader head;
  uint64_t                queryId;
  int64_t                 refId;
  uint64_t                taskId;
D
dapan1121 已提交
150
  int32_t                 execId;
D
dapan1121 已提交
151
  void                   *pTrans;
D
dapan1121 已提交
152 153 154 155 156
} SSchTaskCallbackParam;

typedef struct SSchHbCallbackParam {
  SSchCallbackParamHeader head;
  SQueryNodeEpId          nodeEpId;
D
dapan1121 已提交
157
  void                   *pTrans;
D
dapan1121 已提交
158
} SSchHbCallbackParam;
D
dapan1121 已提交
159

D
dapan1121 已提交
160 161
typedef struct SSchFlowControl {
  SRWLatch  lock;
D
dapan1121 已提交
162
  bool      sorted;
D
dapan 已提交
163
  int32_t   tableNumSum;
D
dapan1121 已提交
164
  uint32_t  execTaskNum;
D
dapan1121 已提交
165
  SArray   *taskList;      // Element is SSchTask*
D
dapan1121 已提交
166 167
} SSchFlowControl;

D
dapan1121 已提交
168 169 170 171 172
typedef struct SSchNodeInfo {
  SQueryNodeAddr addr;
  void          *handle;
} SSchNodeInfo;

D
dapan 已提交
173
typedef struct SSchLevel {
D
dapan1121 已提交
174 175 176 177 178 179 180
  int32_t         level;
  int8_t          status;
  SRWLatch        lock;
  int32_t         taskFailed;
  int32_t         taskSucceed;
  int32_t         taskNum;
  int32_t         taskLaunchedNum;
D
dapan1121 已提交
181
  int32_t         taskDoneNum;
D
dapan1121 已提交
182
  SArray         *subTasks;      // Element is SSchTask
D
dapan 已提交
183
} SSchLevel;
D
dapan1121 已提交
184

D
dapan1121 已提交
185
typedef struct SSchTaskProfile {
D
dapan1121 已提交
186 187 188 189
  int64_t  startTs;
  int64_t* execTime;
  int64_t  waitTime;
  int64_t  endTs;
D
dapan1121 已提交
190 191
} SSchTaskProfile;

D
dapan 已提交
192
typedef struct SSchTask {
D
dapan1121 已提交
193
  uint64_t             taskId;         // task id
D
dapan1121 已提交
194
  SRWLatch             lock;           // task lock
D
dapan1121 已提交
195
  int32_t              maxExecTimes;   // task may exec times
D
dapan1121 已提交
196
  int32_t              execId;        // task current execute try index
D
dapan1121 已提交
197
  SSchLevel           *level;          // level
D
dapan1121 已提交
198
  SRWLatch             planLock;       // task update plan lock
D
dapan1121 已提交
199 200 201 202
  SSubplan            *plan;           // subplan
  char                *msg;            // operator tree
  int32_t              msgLen;         // msg length
  int8_t               status;         // task status
D
dapan1121 已提交
203
  int32_t              lastMsgType;    // last sent msg type
D
dapan1121 已提交
204
  int64_t              timeoutUsec;    // taks timeout useconds before reschedule
D
dapan1121 已提交
205
  SQueryNodeAddr       succeedAddr;    // task executed success node address
206 207
  int8_t               candidateIdx;   // current try condidation index
  SArray              *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr
D
dapan1121 已提交
208 209
  SHashObj            *execNodes;      // all tried node for current task, element is SSchNodeInfo
  SSchTaskProfile      profile;        // task execution profile
D
dapan1121 已提交
210 211 212
  int32_t              childReady;     // child task ready number
  SArray              *children;       // the datasource tasks,from which to fetch the result, element is SQueryTask*
  SArray              *parents;        // the data destination tasks, get data from current task, element is SQueryTask*
D
dapan1121 已提交
213 214
  void*                handle;         // task send handle 
  bool                 registerdHb;    // registered in hb
D
dapan 已提交
215
} SSchTask;
D
dapan1121 已提交
216

D
dapan 已提交
217
typedef struct SSchJobAttr {
D
dapan1121 已提交
218 219
  EExplainMode explainMode;
  bool         queryJob;
D
dapan1121 已提交
220
  bool         needFetch;
D
dapan1121 已提交
221
  bool         needFlowCtrl;
D
dapan 已提交
222
} SSchJobAttr;
D
dapan1121 已提交
223

D
dapan1121 已提交
224 225
typedef struct {
  int32_t     op;
D
dapan1121 已提交
226
  bool        syncReq;
D
dapan1121 已提交
227 228
} SSchOpStatus;

D
dapan 已提交
229
typedef struct SSchJob {
D
dapan1121 已提交
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
  int64_t            refId;
  uint64_t           queryId;
  SSchJobAttr        attr;
  int32_t            levelNum;
  int32_t            taskNum;
  SRequestConnInfo   conn;
  SArray            *nodeList;   // qnode/vnode list, SArray<SQueryNodeLoad>
  SArray            *levels;    // starting from 0. SArray<SSchLevel>
  SQueryPlan        *pDag;  

  SArray            *dataSrcTasks; // SArray<SQueryTask*>
  int32_t            levelIdx;
  SEpSet             dataSrcEps;
  SHashObj          *taskList;
  SHashObj          *execTasks; // executing and executed tasks, key:taskid, value:SQueryTask*
  SHashObj          *flowCtrl;  // key is ep, element is SSchFlowControl

  SExplainCtx       *explainCtx;
  int8_t             status;  
  SQueryNodeAddr     resNode;
  tsem_t             rspSem;
  SSchOpStatus       opStatus;
  schedulerChkKillFp chkKillFp;
  void*              chkKillParam;
  SSchTask          *fetchTask;
  int32_t            errCode;
  SRWLatch           resLock;
D
dapan1121 已提交
257
  SExecResult        execRes;
D
dapan1121 已提交
258 259 260 261
  void              *resData;         //TODO free it or not
  int32_t            resNumOfRows;
  SSchResInfo        userRes;
  const char        *sql;
262
  SQueryProfileSummary summary;
D
dapan 已提交
263
} SSchJob;
D
dapan1121 已提交
264

D
dapan1121 已提交
265 266
extern SSchedulerMgmt schMgmt;

D
dapan1121 已提交
267
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.execTime[(_task)->execId % (_task)->maxExecTimes]) > (_task)->timeoutUsec)
D
dapan1121 已提交
268

D
dapan1121 已提交
269
#define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
D
dapan1121 已提交
270

D
dapan1121 已提交
271 272 273
#define SCH_LOCK_TASK(_task) SCH_LOCK(SCH_WRITE, &(_task)->lock)
#define SCH_UNLOCK_TASK(_task) SCH_UNLOCK(SCH_WRITE, &(_task)->lock)

D
dapan1121 已提交
274
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
D
dapan1121 已提交
275
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
D
dapan1121 已提交
276

D
dapan1121 已提交
277 278
#define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
#define SCH_IS_DATA_BIND_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
D
dapan1121 已提交
279
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
280

H
Haojun Liao 已提交
281
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
D
dapan1121 已提交
282
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
D
dapan1121 已提交
283 284
#define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task))

D
dapan1121 已提交
285 286
#define SCH_GET_TASK_HANDLE(_task) ((_task) ? (_task)->handle : NULL)
#define SCH_SET_TASK_HANDLE(_task, _handle) ((_task)->handle = (_handle))
D
dapan1121 已提交
287

H
Haojun Liao 已提交
288
#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st)
D
dapan1121 已提交
289
#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
D
dapan1121 已提交
290
#define SCH_GET_JOB_STATUS_STR(job) jobTaskStatusStr(SCH_GET_JOB_STATUS(job))
D
dapan1121 已提交
291

D
dapan1121 已提交
292 293 294
#define SCH_JOB_IN_SYNC_OP(job) ((job)->opStatus.op && (job)->opStatus.syncReq)
#define SCH_JOB_IN_ASYNC_EXEC_OP(job) ((SCH_OP_EXEC == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_EXEC, SCH_OP_NULL)) && (!(job)->opStatus.syncReq))
#define SCH_JOB_IN_ASYNC_FETCH_OP(job) ((SCH_OP_FETCH == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_FETCH, SCH_OP_NULL)) && (!(job)->opStatus.syncReq))
D
dapan1121 已提交
295

D
dapan1121 已提交
296 297
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
D
dapan1121 已提交
298
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
D
dapan1121 已提交
299
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
D
dapan1121 已提交
300
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
D
dapan1121 已提交
301
#define SCH_TASK_MAX_EXEC_TIMES(_levelIdx, _levelNum) (SCH_MAX_CANDIDATE_EP_NUM * ((_levelNum) - (_levelIdx)))
D
dapan1121 已提交
302

D
dapan1121 已提交
303
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0)
D
dapan1121 已提交
304
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) 
D
dapan1121 已提交
305 306 307
#define SCH_JOB_NEED_FETCH(_job) ((_job)->attr.needFetch)
#define SCH_JOB_NEED_WAIT(_job) (!SCH_IS_QUERY_JOB(_job))
#define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job))
D
dapan1121 已提交
308
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
D
dapan1121 已提交
309
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
D
dapan1121 已提交
310
#define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) (SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task))))
D
dapan1121 已提交
311 312 313
#define SCH_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_REDIRECT(_task, _msgType, _code, _rspLen) (SCH_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_MERGE_TASK_NETWORK_ERR((_task), (_code), (_rspLen))))
#define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
D
dapan1121 已提交
314

D
dapan1121 已提交
315
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
X
Xiaoyu Wang 已提交
316 317
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
#define SCH_SWITCH_EPSET(_addr) ((_addr)->epSet.inUse = ((_addr)->epSet.inUse + 1) % (_addr)->epSet.numOfEps)
D
dapan1121 已提交
318
#define SCH_TASK_NUM_OF_EPS(_addr) ((_addr)->epSet.numOfEps)
D
dapan1121 已提交
319

D
dapan1121 已提交
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
#define SCH_LOG_TASK_START_TS(_task)                          \
  do {                                                        \
    int64_t us = taosGetTimestampUs();                        \
    int32_t idx = (_task)->execId % (_task)->maxExecTimes; \
    (_task)->profile.execTime[idx] = us;                    \
    if (0 == (_task)->execId) {                              \
      (_task)->profile.startTs = us;                          \
    }                                                         \
  } while (0)  

#define SCH_LOG_TASK_WAIT_TS(_task)                        \
  do {                                                    \
    int64_t us = taosGetTimestampUs();                    \
    int32_t idx = (_task)->execId % (_task)->maxExecTimes; \
    (_task)->profile.waitTime += us - (_task)->profile.execTime[idx];    \
  } while (0)  


#define SCH_LOG_TASK_END_TS(_task)                        \
  do {                                                    \
    int64_t us = taosGetTimestampUs();                    \
    int32_t idx = (_task)->execId % (_task)->maxExecTimes; \
    (_task)->profile.execTime[idx] = us - (_task)->profile.execTime[idx];    \
    (_task)->profile.endTs = us;                          \
  } while (0)  


H
Haojun Liao 已提交
347 348
#define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
S
Shengliang Guan 已提交
349 350

#define SCH_TASK_ELOG(param, ...) \
D
dapan1121 已提交
351
  qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
S
Shengliang Guan 已提交
352
#define SCH_TASK_DLOG(param, ...) \
D
dapan1121 已提交
353
  qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
D
dapan1121 已提交
354
#define SCH_TASK_DLOGL(param, ...) \
D
dapan1121 已提交
355
  qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
S
Shengliang Guan 已提交
356
#define SCH_TASK_WLOG(param, ...) \
D
dapan1121 已提交
357
  qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
D
dapan1121 已提交
358

D
dapan1121 已提交
359 360 361
#define SCH_SET_ERRNO(_err) do { if (TSDB_CODE_SCH_IGNORE_ERROR != (_err)) { terrno = (_err); } } while (0)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); } return _code; } while (0)
D
dapan1121 已提交
362
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(code); goto _return; } } while (0)
363

D
dapan1121 已提交
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
#define SCH_LOCK_DEBUG(...) do { if (gSCHDebug.lockEnable) { qDebug(__VA_ARGS__); } } while (0)

#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000

#define SCH_LOCK(type, _lock) do {   \
  if (SCH_READ == (type)) {          \
    assert(atomic_load_32((_lock)) >= 0);  \
    SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
    taosRLockLatch(_lock);           \
    SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
    assert(atomic_load_32((_lock)) > 0);  \
  } else {                                                \
    assert(atomic_load_32((_lock)) >= 0);  \
    SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);  \
    taosWLockLatch(_lock);                                \
    SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);  \
    assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY);  \
  }                                                       \
} while (0)

#define SCH_UNLOCK(type, _lock) do {                       \
  if (SCH_READ == (type)) {                                \
    assert(atomic_load_32((_lock)) > 0);  \
    SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
    taosRUnLockLatch(_lock);                              \
    SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
    assert(atomic_load_32((_lock)) >= 0);  \
  } else {                                                \
    assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY);  \
    SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
    taosWUnLockLatch(_lock);                              \
    SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
    assert(atomic_load_32((_lock)) >= 0);  \
  }                                                       \
} while (0)
D
dapan1121 已提交
399

400

D
dapan1121 已提交
401 402
void    schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
void    schCleanClusterHb(void* pTrans);
D
dapan1121 已提交
403 404
int32_t schLaunchTask(SSchJob *job, SSchTask *task);
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
D
dapan1121 已提交
405 406
SSchJob *schAcquireJob(int64_t refId);
int32_t schReleaseJob(int64_t refId);
D
dapan1121 已提交
407
void    schFreeFlowCtrl(SSchJob *pJob);
D
dapan1121 已提交
408
int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
D
dapan1121 已提交
409 410 411 412
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
D
dapan1121 已提交
413
int32_t schLaunchFetchTask(SSchJob *pJob);
D
dapan1121 已提交
414
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
D
dapan1121 已提交
415
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction);
D
dapan1121 已提交
416
int32_t schCloneSMsgSendInfo(void *src, void **dst);
D
dapan1121 已提交
417
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob);
D
dapan1121 已提交
418
void    schFreeJobImpl(void *job);
D
dapan1121 已提交
419 420 421
int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx);
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask);
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans);
D
dapan1121 已提交
422
int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code);
D
dapan1121 已提交
423
void    schFreeRpcCtx(SRpcCtx *pCtx);
D
dapan1121 已提交
424
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp);
D
dapan1121 已提交
425
bool    schJobNeedToStop(SSchJob *pJob, int8_t *pStatus);
D
dapan1121 已提交
426
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask);
D
dapan1121 已提交
427
int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp);
D
dapan1121 已提交
428
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp);
D
dapan1121 已提交
429
void    schProcessOnDataFetched(SSchJob *job);
430
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask);
D
dapan1121 已提交
431
void    schFreeRpcCtxVal(const void *arg);
D
dapan1121 已提交
432
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb);
D
dapan1121 已提交
433
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId);
D
dapan1121 已提交
434
int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync);
D
dapan1121 已提交
435
int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus);
D
dapan1121 已提交
436 437 438
int32_t schCancelJob(SSchJob *pJob);
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode);
uint64_t schGenTaskId(void);
D
dapan1121 已提交
439
void    schCloseJobRef(void);
D
dapan1121 已提交
440
int32_t schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob);
D
dapan1121 已提交
441 442
int32_t schJobFetchRows(SSchJob *pJob);
int32_t schJobFetchRowsA(SSchJob *pJob);
D
dapan1121 已提交
443
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId);
D
dapan1121 已提交
444
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList);
D
dapan1121 已提交
445 446
void    schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo);
char*   schGetOpStr(SCH_OP_TYPE type);
D
dapan1121 已提交
447
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
D
dapan1121 已提交
448
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
D
dapan1121 已提交
449
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq);
D
dapan1121 已提交
450
int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes);
D
dapan1121 已提交
451 452
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet);
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode);
D
dapan1121 已提交
453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
void    schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode);
int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq);
void    schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
int32_t schProcessOnCbBegin(SSchJob** job, SSchTask** task, uint64_t qId, int64_t rId, uint64_t tId);
void    schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask);
bool    schJobDone(SSchJob *pJob);
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask);
int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask);
int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param);
int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SSchedulerReq* pReq);
int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode);
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask);
void    schUpdateJobErrCode(SSchJob *pJob, int32_t errCode);
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry);
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode);
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob);
void    schFreeTask(SSchJob *pJob, SSchTask *pTask);
void    schDropTaskInHashList(SSchJob *pJob, SHashObj *list);
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
D
dapan1121 已提交
473 474
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum);
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
D
dapan1121 已提交
475

D
dapan1121 已提交
476 477
extern SSchDebug gSCHDebug;

D
dapan 已提交
478

H
refact  
Hongze Cheng 已提交
479 480 481 482
#ifdef __cplusplus
}
#endif

D
dapan1121 已提交
483
#endif /*_TD_SCHEDULER_INT_H_*/