schedulerInt.h 16.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_SCHEDULER_INT_H_
#define _TD_SCHEDULER_INT_H_

#ifdef __cplusplus
extern "C" {
#endif

23 24 25 26
#include "os.h"
#include "tarray.h"
#include "planner.h"
#include "scheduler.h"
27
#include "thash.h"
D
dapan1121 已提交
28
#include "trpc.h"
D
dapan1121 已提交
29
#include "command.h"
30

D
dapan1121 已提交
31 32
#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
D
dapan1121 已提交
33
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200  // unit is TSDB_TABLE_NUM_UNIT
34

D
dapan1121 已提交
35 36 37 38
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000

#define SCH_TASK_MAX_EXEC_TIMES 5
39
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
D
dapan 已提交
40

D
dapan1121 已提交
41 42 43 44 45
enum {
  SCH_READ = 1,
  SCH_WRITE,
};

D
dapan1121 已提交
46 47 48 49 50
enum {
  SCH_EXEC_CB = 1,
  SCH_FETCH_CB,
};

D
dapan1121 已提交
51 52 53 54 55 56
typedef enum {
  SCH_OP_NULL = 0,
  SCH_OP_EXEC,
  SCH_OP_FETCH,
} SCH_OP_TYPE;

D
dapan1121 已提交
57
typedef struct SSchTrans {
D
dapan1121 已提交
58 59
  void *pTrans;
  void *pHandle;
D
dapan1121 已提交
60 61
} SSchTrans;

D
dapan1121 已提交
62 63
typedef struct SSchHbTrans {
  SRWLatch  lock;
D
dapan1121 已提交
64
  int64_t   taskNum;
D
dapan1121 已提交
65
  SRpcCtx   rpcCtx;
D
dapan1121 已提交
66 67 68
  SSchTrans trans;
} SSchHbTrans;

D
dapan1121 已提交
69 70
typedef struct SSchApiStat {

wafwerar's avatar
wafwerar 已提交
71 72 73 74
#ifdef WINDOWS
  size_t avoidCompilationErrors;
#endif

D
dapan1121 已提交
75 76 77 78
} SSchApiStat;

typedef struct SSchRuntimeStat {

wafwerar's avatar
wafwerar 已提交
79 80 81 82
#ifdef WINDOWS
  size_t avoidCompilationErrors;
#endif

D
dapan1121 已提交
83 84 85 86
} SSchRuntimeStat;

typedef struct SSchJobStat {

wafwerar's avatar
wafwerar 已提交
87 88 89 90
#ifdef WINDOWS
  size_t avoidCompilationErrors;
#endif

D
dapan1121 已提交
91 92
} SSchJobStat;

D
dapan1121 已提交
93
typedef struct SSchStat {
D
dapan1121 已提交
94 95 96
  SSchApiStat      api;
  SSchRuntimeStat  runtime;
  SSchJobStat      job;
D
dapan1121 已提交
97
} SSchStat;
D
dapan1121 已提交
98

D
dapan1121 已提交
99
typedef struct SSchResInfo {
D
dapan1121 已提交
100 101
  SQueryResult*          queryRes;
  void**                 fetchRes;
D
dapan1121 已提交
102 103
  schedulerExecFp        execFp; 
  schedulerFetchFp       fetchFp; 
D
dapan1121 已提交
104
  void*                  userParam;
D
dapan1121 已提交
105
} SSchResInfo;
D
dapan1121 已提交
106

107
typedef struct SSchedulerMgmt {
D
dapan1121 已提交
108 109 110
  uint64_t        taskId; // sequential taksId
  uint64_t        sId;    // schedulerId
  SSchedulerCfg   cfg;
D
dapan1121 已提交
111
  bool            exit;
D
dapan1121 已提交
112
  int32_t         jobRef;
D
dapan1121 已提交
113
  int32_t         jobNum;
D
dapan1121 已提交
114
  SSchStat        stat;
D
dapan1121 已提交
115
  SRWLatch        hbLock;
D
dapan1121 已提交
116
  SHashObj       *hbConnections;
117
} SSchedulerMgmt;
118

D
dapan1121 已提交
119 120 121 122 123 124 125 126 127
typedef struct SSchCallbackParamHeader {
  bool isHbParam;
} SSchCallbackParamHeader;

typedef struct SSchTaskCallbackParam {
  SSchCallbackParamHeader head;
  uint64_t                queryId;
  int64_t                 refId;
  uint64_t                taskId;
D
dapan1121 已提交
128
  int32_t                 execId;
D
dapan1121 已提交
129
  void                   *pTrans;
D
dapan1121 已提交
130 131 132 133 134
} SSchTaskCallbackParam;

typedef struct SSchHbCallbackParam {
  SSchCallbackParamHeader head;
  SQueryNodeEpId          nodeEpId;
D
dapan1121 已提交
135
  void                   *pTrans;
D
dapan1121 已提交
136
} SSchHbCallbackParam;
D
dapan1121 已提交
137

D
dapan1121 已提交
138 139
typedef struct SSchFlowControl {
  SRWLatch  lock;
D
dapan1121 已提交
140
  bool      sorted;
D
dapan 已提交
141
  int32_t   tableNumSum;
D
dapan1121 已提交
142
  uint32_t  execTaskNum;
D
dapan1121 已提交
143
  SArray   *taskList;      // Element is SSchTask*
D
dapan1121 已提交
144 145
} SSchFlowControl;

D
dapan1121 已提交
146 147 148 149 150
typedef struct SSchNodeInfo {
  SQueryNodeAddr addr;
  void          *handle;
} SSchNodeInfo;

D
dapan 已提交
151
typedef struct SSchLevel {
D
dapan1121 已提交
152 153 154 155 156 157 158
  int32_t         level;
  int8_t          status;
  SRWLatch        lock;
  int32_t         taskFailed;
  int32_t         taskSucceed;
  int32_t         taskNum;
  int32_t         taskLaunchedNum;
D
dapan1121 已提交
159
  int32_t         taskDoneNum;
D
dapan1121 已提交
160
  SArray         *subTasks;      // Element is SQueryTask
D
dapan 已提交
161
} SSchLevel;
D
dapan1121 已提交
162

D
dapan1121 已提交
163 164 165 166 167 168 169
typedef struct SSchTaskProfile {
  int64_t startTs;
  int64_t execUseTime[SCH_TASK_MAX_EXEC_TIMES];
  int64_t waitTime;
  int64_t endTs;
} SSchTaskProfile;

D
dapan 已提交
170
typedef struct SSchTask {
D
dapan1121 已提交
171
  uint64_t             taskId;         // task id
D
dapan1121 已提交
172
  SRWLatch             lock;           // task lock
D
dapan1121 已提交
173
  int32_t              maxExecTimes;   // task may exec times
D
dapan1121 已提交
174
  int32_t              execId;        // task current execute try index
D
dapan1121 已提交
175
  SSchLevel           *level;          // level
D
dapan1121 已提交
176
  SRWLatch             planLock;       // task update plan lock
D
dapan1121 已提交
177 178 179 180
  SSubplan            *plan;           // subplan
  char                *msg;            // operator tree
  int32_t              msgLen;         // msg length
  int8_t               status;         // task status
D
dapan1121 已提交
181
  int32_t              lastMsgType;    // last sent msg type
D
dapan1121 已提交
182
  int64_t              timeoutUsec;    // taks timeout useconds before reschedule
D
dapan1121 已提交
183
  SQueryNodeAddr       succeedAddr;    // task executed success node address
184 185
  int8_t               candidateIdx;   // current try condidation index
  SArray              *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr
D
dapan1121 已提交
186 187
  SHashObj            *execNodes;      // all tried node for current task, element is SSchNodeInfo
  SSchTaskProfile      profile;        // task execution profile
D
dapan1121 已提交
188 189 190
  int32_t              childReady;     // child task ready number
  SArray              *children;       // the datasource tasks,from which to fetch the result, element is SQueryTask*
  SArray              *parents;        // the data destination tasks, get data from current task, element is SQueryTask*
D
dapan1121 已提交
191 192
  void*                handle;         // task send handle 
  bool                 registerdHb;    // registered in hb
D
dapan 已提交
193
} SSchTask;
D
dapan1121 已提交
194

D
dapan 已提交
195
typedef struct SSchJobAttr {
D
dapan1121 已提交
196 197 198
  EExplainMode explainMode;
  bool         queryJob;
  bool         needFlowCtrl;
D
dapan 已提交
199
} SSchJobAttr;
D
dapan1121 已提交
200

D
dapan1121 已提交
201 202 203 204 205
typedef struct {
  int32_t     op;
  bool        sync;
} SSchOpStatus;

D
dapan 已提交
206
typedef struct SSchJob {
D
dapan1121 已提交
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
  int64_t            refId;
  uint64_t           queryId;
  SSchJobAttr        attr;
  int32_t            levelNum;
  int32_t            taskNum;
  SRequestConnInfo   conn;
  SArray            *nodeList;   // qnode/vnode list, SArray<SQueryNodeLoad>
  SArray            *levels;    // starting from 0. SArray<SSchLevel>
  SQueryPlan        *pDag;  

  SArray            *dataSrcTasks; // SArray<SQueryTask*>
  int32_t            levelIdx;
  SEpSet             dataSrcEps;
  SHashObj          *taskList;
  SHashObj          *execTasks; // executing and executed tasks, key:taskid, value:SQueryTask*
  SHashObj          *flowCtrl;  // key is ep, element is SSchFlowControl

  SExplainCtx       *explainCtx;
  int8_t             status;  
  SQueryNodeAddr     resNode;
  tsem_t             rspSem;
  SSchOpStatus       opStatus;
  schedulerChkKillFp chkKillFp;
  void*              chkKillParam;
  SSchTask          *fetchTask;
  int32_t            errCode;
  SRWLatch           resLock;
  SQueryExecRes      execRes;
  void              *resData;         //TODO free it or not
  int32_t            resNumOfRows;
  SSchResInfo        userRes;
  const char        *sql;
239
  SQueryProfileSummary summary;
D
dapan 已提交
240
} SSchJob;
D
dapan1121 已提交
241

D
dapan1121 已提交
242 243
extern SSchedulerMgmt schMgmt;

D
dapan1121 已提交
244 245 246
#define SCH_LOG_TASK_START_TS(_task)                          \
  do {                                                        \
    int64_t us = taosGetTimestampUs();                        \
D
dapan1121 已提交
247
    int32_t idx = (_task)->execId % SCH_TASK_MAX_EXEC_TIMES; \
D
dapan1121 已提交
248
    (_task)->profile.execUseTime[idx] = us;                    \
D
dapan1121 已提交
249
    if (0 == (_task)->execId) {                              \
D
dapan1121 已提交
250 251 252 253 254
      (_task)->profile.startTs = us;                          \
    }                                                         \
  } while (0)  

#define SCH_LOG_TASK_WAIT_TS(_task)                        \
D
dapan1121 已提交
255 256
  do {                                                    \
    int64_t us = taosGetTimestampUs();                    \
D
dapan1121 已提交
257
    int32_t idx = (_task)->execId % SCH_TASK_MAX_EXEC_TIMES; \
D
dapan1121 已提交
258
    (_task)->profile.waitTime += us - (_task)->profile.execUseTime[idx];    \
D
dapan1121 已提交
259 260
  } while (0)  

D
dapan1121 已提交
261

D
dapan1121 已提交
262 263 264
#define SCH_LOG_TASK_END_TS(_task)                        \
  do {                                                    \
    int64_t us = taosGetTimestampUs();                    \
D
dapan1121 已提交
265
    int32_t idx = (_task)->execId % SCH_TASK_MAX_EXEC_TIMES; \
D
dapan1121 已提交
266
    (_task)->profile.execUseTime[idx] = us - (_task)->profile.execUseTime[idx];    \
D
dapan1121 已提交
267 268 269
    (_task)->profile.endTs = us;                          \
  } while (0)  

D
dapan1121 已提交
270
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.execUseTime[(_task)->execId % SCH_TASK_MAX_EXEC_TIMES]) > (_task)->timeoutUsec)
D
dapan1121 已提交
271

D
dapan1121 已提交
272
#define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
D
dapan1121 已提交
273

D
dapan1121 已提交
274 275 276
#define SCH_LOCK_TASK(_task) SCH_LOCK(SCH_WRITE, &(_task)->lock)
#define SCH_UNLOCK_TASK(_task) SCH_UNLOCK(SCH_WRITE, &(_task)->lock)

D
dapan1121 已提交
277
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
D
dapan1121 已提交
278
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
D
dapan1121 已提交
279

D
dapan1121 已提交
280 281 282
#define SCH_IS_DATA_SRC_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
#define SCH_IS_DATA_SRC_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
283

H
Haojun Liao 已提交
284
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
D
dapan1121 已提交
285
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
D
dapan1121 已提交
286 287
#define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task))

D
dapan1121 已提交
288 289
#define SCH_GET_TASK_HANDLE(_task) ((_task) ? (_task)->handle : NULL)
#define SCH_SET_TASK_HANDLE(_task, _handle) ((_task)->handle = (_handle))
D
dapan1121 已提交
290

H
Haojun Liao 已提交
291
#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st)
D
dapan1121 已提交
292
#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
D
dapan1121 已提交
293
#define SCH_GET_JOB_STATUS_STR(job) jobTaskStatusStr(SCH_GET_JOB_STATUS(job))
D
dapan1121 已提交
294

D
dapan1121 已提交
295 296 297 298
#define SCH_JOB_IN_SYNC_OP(job) ((job)->opStatus.op && (job)->opStatus.sync)
#define SCH_JOB_IN_ASYNC_EXEC_OP(job) (((job)->opStatus.op == SCH_OP_EXEC) && (!(job)->opStatus.sync))
#define SCH_JOB_IN_ASYNC_FETCH_OP(job) (((job)->opStatus.op == SCH_OP_FETCH) && (!(job)->opStatus.sync))

D
dapan1121 已提交
299 300
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
D
dapan1121 已提交
301
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
D
dapan1121 已提交
302 303 304 305

#define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) 
#define SCH_JOB_NEED_FETCH(_job) SCH_IS_QUERY_JOB(_job)
D
dapan1121 已提交
306 307
#define SCH_IS_WAIT_ALL_JOB(_job) (!SCH_IS_QUERY_JOB(_job))
#define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job))
D
dapan1121 已提交
308
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
D
dapan1121 已提交
309
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
D
dapan1121 已提交
310 311 312 313
#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (SCH_NETWORK_ERR(_code) && ((_len) > 0))
#define SCH_NEED_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH)
#define SCH_NEED_REDIRECT(_msgType, _code, _rspLen) (SCH_NEED_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_SUB_TASK_NETWORK_ERR(_code, _rspLen)))
#define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_NEED_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
D
dapan1121 已提交
314

D
dapan1121 已提交
315
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
X
Xiaoyu Wang 已提交
316 317
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
#define SCH_SWITCH_EPSET(_addr) ((_addr)->epSet.inUse = ((_addr)->epSet.inUse + 1) % (_addr)->epSet.numOfEps)
D
dapan1121 已提交
318
#define SCH_TASK_NUM_OF_EPS(_addr) ((_addr)->epSet.numOfEps)
D
dapan1121 已提交
319

H
Haojun Liao 已提交
320 321
#define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
S
Shengliang Guan 已提交
322 323

#define SCH_TASK_ELOG(param, ...) \
D
dapan1121 已提交
324
  qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
S
Shengliang Guan 已提交
325
#define SCH_TASK_DLOG(param, ...) \
D
dapan1121 已提交
326
  qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
D
dapan1121 已提交
327
#define SCH_TASK_DLOGL(param, ...) \
D
dapan1121 已提交
328
  qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
S
Shengliang Guan 已提交
329
#define SCH_TASK_WLOG(param, ...) \
D
dapan1121 已提交
330
  qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
D
dapan1121 已提交
331 332 333 334

#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
335

D
dapan1121 已提交
336 337 338
#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))

339

D
dapan1121 已提交
340 341
void    schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
void    schCleanClusterHb(void* pTrans);
D
dapan1121 已提交
342 343
int32_t schLaunchTask(SSchJob *job, SSchTask *task);
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
D
dapan1121 已提交
344 345
SSchJob *schAcquireJob(int64_t refId);
int32_t schReleaseJob(int64_t refId);
D
dapan1121 已提交
346
void    schFreeFlowCtrl(SSchJob *pJob);
D
dapan1121 已提交
347
int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
D
dapan1121 已提交
348 349 350 351 352
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
int32_t schFetchFromRemote(SSchJob *pJob);
D
dapan1121 已提交
353
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
D
dapan1121 已提交
354
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction);
D
dapan1121 已提交
355
int32_t schCloneSMsgSendInfo(void *src, void **dst);
D
dapan1121 已提交
356
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob);
D
dapan1121 已提交
357
void    schFreeJobImpl(void *job);
D
dapan1121 已提交
358 359 360
int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx);
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask);
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans);
D
dapan1121 已提交
361
int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code);
D
dapan1121 已提交
362
void    schFreeRpcCtx(SRpcCtx *pCtx);
D
dapan1121 已提交
363
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp);
D
dapan1121 已提交
364
bool    schJobNeedToStop(SSchJob *pJob, int8_t *pStatus);
D
dapan1121 已提交
365
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask);
D
dapan1121 已提交
366
int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp);
D
dapan1121 已提交
367
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp);
D
dapan1121 已提交
368
void    schProcessOnDataFetched(SSchJob *job);
369
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask);
D
dapan1121 已提交
370
void    schFreeRpcCtxVal(const void *arg);
D
dapan1121 已提交
371
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb);
D
dapan1121 已提交
372
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId);
D
dapan1121 已提交
373
int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync);
D
dapan1121 已提交
374
int32_t schExecJobImpl(SSchedulerReq *pReq, SSchJob *pJob, bool sync);
D
dapan1121 已提交
375
int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus);
D
dapan1121 已提交
376 377 378
int32_t schCancelJob(SSchJob *pJob);
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode);
uint64_t schGenTaskId(void);
D
dapan1121 已提交
379
void    schCloseJobRef(void);
D
dapan1121 已提交
380 381
int32_t schExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes);
int32_t schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob);
D
dapan1121 已提交
382 383
int32_t schFetchRows(SSchJob *pJob);
int32_t schAsyncFetchRows(SSchJob *pJob);
D
dapan1121 已提交
384
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId);
D
dapan1121 已提交
385
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList);
D
dapan1121 已提交
386 387
void    schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo);
char*   schGetOpStr(SCH_OP_TYPE type);
D
dapan1121 已提交
388
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
D
dapan1121 已提交
389 390
int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob);
int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes);
D
dapan1121 已提交
391 392
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet);
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode);
D
dapan1121 已提交
393

D
dapan 已提交
394

H
refact  
Hongze Cheng 已提交
395 396 397 398
#ifdef __cplusplus
}
#endif

D
dapan1121 已提交
399
#endif /*_TD_SCHEDULER_INT_H_*/