qwInt.h 14.7 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_QWORKER_INT_H_
#define _TD_QWORKER_INT_H_

#ifdef __cplusplus
extern "C" {
#endif

H
Hongze Cheng 已提交
23
#include "executor.h"
wafwerar's avatar
wafwerar 已提交
24
#include "osDef.h"
H
Hongze Cheng 已提交
25
#include "plannodes.h"
S
Shengliang Guan 已提交
26
#include "qworker.h"
D
dapan1121 已提交
27
#include "tlockfree.h"
D
dapan1121 已提交
28
#include "tref.h"
S
Shengliang Guan 已提交
29
#include "trpc.h"
H
Hongze Cheng 已提交
30
#include "ttimer.h"
S
Shengliang Guan 已提交
31

D
dapan1121 已提交
32
#define QW_DEFAULT_SCHEDULER_NUMBER 100
dengyihao's avatar
dengyihao 已提交
33 34 35
#define QW_DEFAULT_TASK_NUMBER      10000
#define QW_DEFAULT_SCH_TASK_NUMBER  10000
#define QW_DEFAULT_SHORT_RUN_TIMES  2
D
dapan1121 已提交
36
#define QW_DEFAULT_HEARTBEAT_MSEC   5000
H
Hongze Cheng 已提交
37 38
#define QW_SCH_TIMEOUT_MSEC         180000
#define QW_MIN_RES_ROWS             4096
D
dapan1121 已提交
39

D
dapan1121 已提交
40
enum {
D
dapan1121 已提交
41 42 43 44
  QW_PHASE_PRE_QUERY = 1,
  QW_PHASE_POST_QUERY,
  QW_PHASE_PRE_FETCH,
  QW_PHASE_POST_FETCH,
D
dapan1121 已提交
45 46
  QW_PHASE_PRE_CQUERY,
  QW_PHASE_POST_CQUERY,
D
dapan1121 已提交
47 48
};

D
dapan1121 已提交
49
enum {
D
dapan1121 已提交
50 51 52 53
  QW_EVENT_CANCEL = 1,
  QW_EVENT_READY,
  QW_EVENT_FETCH,
  QW_EVENT_DROP,
54
  QW_EVENT_CQUERY,
D
dapan1121 已提交
55 56 57 58 59 60 61 62

  QW_EVENT_MAX,
};

enum {
  QW_EVENT_NOT_RECEIVED = 0,
  QW_EVENT_RECEIVED,
  QW_EVENT_PROCESSED,
D
dapan1121 已提交
63 64 65 66 67 68 69
};

enum {
  QW_READ = 1,
  QW_WRITE,
};

D
dapan1121 已提交
70 71 72 73 74
enum {
  QW_NOT_EXIST_RET_ERR = 1,
  QW_NOT_EXIST_ADD,
};

75
typedef struct SQWDebug {
D
dapan1121 已提交
76 77
  bool lockEnable;
  bool statusEnable;
D
dapan1121 已提交
78
  bool dumpEnable;
D
dapan1121 已提交
79 80
  bool sleepSimulate;
  bool deadSimulate;
D
dapan1121 已提交
81
  bool redirectSimulate;
82 83
} SQWDebug;

D
dapan1121 已提交
84 85
extern SQWDebug gQWDebug;

D
dapan1121 已提交
86
typedef struct SQWHbParam {
D
dapan1121 已提交
87
  bool    inUse;
D
dapan1121 已提交
88 89 90 91
  int32_t qwrId;
  int64_t refId;
} SQWHbParam;

D
dapan1121 已提交
92
typedef struct SQWHbInfo {
dengyihao's avatar
dengyihao 已提交
93
  SSchedulerHbRsp rsp;
S
Shengliang Guan 已提交
94
  SRpcHandleInfo  connInfo;
D
dapan1121 已提交
95 96
} SQWHbInfo;

D
dapan1121 已提交
97
typedef struct SQWPhaseInput {
dengyihao's avatar
dengyihao 已提交
98
  int32_t code;
D
dapan1121 已提交
99
  int32_t msgType;
D
dapan1121 已提交
100 101 102
} SQWPhaseInput;

typedef struct SQWPhaseOutput {
wafwerar's avatar
wafwerar 已提交
103 104 105
#ifdef WINDOWS
  size_t avoidCompilationErrors;
#endif
D
dapan1121 已提交
106 107
} SQWPhaseOutput;

dengyihao's avatar
dengyihao 已提交
108 109 110 111
typedef struct SQWTaskStatus {
  int64_t refId;  // job's refId
  int32_t code;
  int8_t  status;
D
dapan1121 已提交
112
} SQWTaskStatus;
D
dapan1121 已提交
113

D
dapan1121 已提交
114
typedef struct SQWTaskCtx {
dengyihao's avatar
dengyihao 已提交
115 116 117 118
  SRWLatch lock;
  int8_t   phase;
  int8_t   taskType;
  int8_t   explain;
D
dapan1121 已提交
119
  int8_t   needFetch;
D
dapan1121 已提交
120
  int8_t   localExec;
D
dapan1121 已提交
121
  int32_t  msgType;
D
dapan1121 已提交
122
  int32_t  fetchType;
D
dapan1121 已提交
123
  int32_t  execId;
124
  int32_t  level;
dengyihao's avatar
dengyihao 已提交
125

126
  bool    queryGotData;
D
dapan1121 已提交
127
  bool    queryRsped;
dengyihao's avatar
dengyihao 已提交
128 129
  bool    queryEnd;
  bool    queryContinue;
D
dapan1121 已提交
130
  bool    queryExecDone;
dengyihao's avatar
dengyihao 已提交
131 132
  bool    queryInQueue;
  int32_t rspCode;
H
Hongze Cheng 已提交
133
  int64_t affectedRows;  // for insert ...select stmt
dengyihao's avatar
dengyihao 已提交
134

S
Shengliang Guan 已提交
135 136
  SRpcHandleInfo ctrlConnInfo;
  SRpcHandleInfo dataConnInfo;
dengyihao's avatar
dengyihao 已提交
137 138 139

  int8_t events[QW_EVENT_MAX];

H
Hongze Cheng 已提交
140 141 142
  SArray    *explainRes;
  void      *taskHandle;
  void      *sinkHandle;
143
  STbVerInfo tbInfo;
D
dapan1121 已提交
144
} SQWTaskCtx;
D
dapan1121 已提交
145

D
dapan1121 已提交
146
typedef struct SQWSchStatus {
D
dapan1121 已提交
147
  int64_t        hbBrokenTs;  // timestamp in msecond
D
dapan1121 已提交
148
  SRWLatch       hbConnLock;
S
Shengliang Guan 已提交
149
  SRpcHandleInfo hbConnInfo;
dengyihao's avatar
dengyihao 已提交
150
  SQueryNodeEpId hbEpId;
D
dapan1121 已提交
151
  SRWLatch       tasksLock;
S
Shengliang Guan 已提交
152
  SHashObj      *tasksHash;  // key:queryId+taskId, value: SQWTaskStatus
D
dapan1121 已提交
153
} SQWSchStatus;
D
dapan1121 已提交
154

D
dapan1121 已提交
155
typedef struct SQWTimeInQ {
D
dapan1121 已提交
156 157
  uint64_t num;
  uint64_t total;
D
dapan1121 已提交
158 159 160 161
} SQWTimeInQ;

typedef struct SQWMsgStat {
  SQWTimeInQ waitTime[2];
H
Hongze Cheng 已提交
162 163 164 165 166 167 168 169
  uint64_t   queryProcessed;
  uint64_t   cqueryProcessed;
  uint64_t   fetchProcessed;
  uint64_t   rspProcessed;
  uint64_t   cancelProcessed;
  uint64_t   dropProcessed;
  uint64_t   hbProcessed;
  uint64_t   deleteProcessed;
D
dapan1121 已提交
170 171 172 173 174 175
} SQWMsgStat;

typedef struct SQWRTStat {
  uint64_t startTaskNum;
  uint64_t stopTaskNum;
} SQWRTStat;
D
dapan1121 已提交
176 177

typedef struct SQWStat {
H
Hongze Cheng 已提交
178 179
  SQWMsgStat msgStat;
  SQWRTStat  rtStat;
D
dapan1121 已提交
180 181
} SQWStat;

D
dapan1121 已提交
182
// Qnode/Vnode level task management
D
dapan1121 已提交
183 184
typedef struct SQWorker {
  int64_t     refId;
S
Shengliang Guan 已提交
185 186 187
  SQWorkerCfg cfg;
  int8_t      nodeType;
  int32_t     nodeId;
H
Hongze Cheng 已提交
188
  void       *timer;
S
Shengliang Guan 已提交
189 190 191
  tmr_h       hbTimer;
  SRWLatch    schLock;
  // SRWLatch ctxLock;
H
Hongze Cheng 已提交
192 193 194 195 196
  SHashObj *schHash;  // key: schedulerId,    value: SQWSchStatus
  SHashObj *ctxHash;  // key: queryId+taskId, value: SQWTaskCtx
  SMsgCb    msgCb;
  SQWStat   stat;
  int32_t  *destroyed;
D
dapan1121 已提交
197 198 199
} SQWorker;

typedef struct SQWorkerMgmt {
D
dapan1121 已提交
200 201 202 203 204
  SRWLatch   lock;
  int32_t    qwRef;
  int32_t    qwNum;
  SQWHbParam param[1024];
  int32_t    paramIdx;
D
dapan1121 已提交
205 206
} SQWorkerMgmt;

D
dapan1121 已提交
207 208
#define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId
#define QW_IDS()       sId, qId, tId, rId, eId
dengyihao's avatar
dengyihao 已提交
209
#define QW_FPARAMS()   mgmt, QW_IDS()
D
dapan1121 已提交
210

D
dapan1121 已提交
211 212
#define QW_STAT_INC(_item, _n) atomic_add_fetch_64(&(_item), _n)
#define QW_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n)
H
Hongze Cheng 已提交
213
#define QW_STAT_GET(_item)     atomic_load_64(&(_item))
D
dapan1121 已提交
214

H
Hongze Cheng 已提交
215 216 217
#define QW_GET_EVENT(ctx, event)           atomic_load_8(&(ctx)->events[event])
#define QW_EVENT_RECEIVED(ctx, event)      (QW_GET_EVENT(ctx, event) == QW_EVENT_RECEIVED)
#define QW_EVENT_PROCESSED(ctx, event)     (QW_GET_EVENT(ctx, event) == QW_EVENT_PROCESSED)
dengyihao's avatar
dengyihao 已提交
218
#define QW_SET_EVENT_RECEIVED(ctx, event)  atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
219
#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED)
D
dapan1121 已提交
220

D
dapan1121 已提交
221
#define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase)
H
Hongze Cheng 已提交
222 223 224 225 226 227
#define QW_SET_PHASE(ctx, _value)                                            \
  do {                                                                       \
    if ((_value) != QW_PHASE_PRE_FETCH && (_value) != QW_PHASE_POST_FETCH) { \
      atomic_store_8(&(ctx)->phase, _value);                                 \
    }                                                                        \
  } while (0)
D
dapan1121 已提交
228

dengyihao's avatar
dengyihao 已提交
229
#define QW_SET_RSP_CODE(ctx, code)    atomic_store_32(&(ctx)->rspCode, code)
D
dapan1121 已提交
230
#define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
D
dapan1121 已提交
231

D
dapan1121 已提交
232
#define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY)
D
dapan1121 已提交
233

D
dapan1121 已提交
234 235 236 237 238
#define QW_SET_QTID(id, qId, tId, eId)                              \
  do {                                                              \
    *(uint64_t *)(id) = (qId);                                      \
    *(uint64_t *)((char *)(id) + sizeof(qId)) = (tId);              \
    *(int32_t *)((char *)(id) + sizeof(qId) + sizeof(tId)) = (eId); \
dengyihao's avatar
dengyihao 已提交
239
  } while (0)
H
Hongze Cheng 已提交
240

D
dapan1121 已提交
241 242 243 244 245
#define QW_GET_QTID(id, qId, tId, eId)                              \
  do {                                                              \
    (qId) = *(uint64_t *)(id);                                      \
    (tId) = *(uint64_t *)((char *)(id) + sizeof(qId));              \
    (eId) = *(int32_t *)((char *)(id) + sizeof(qId) + sizeof(tId)); \
dengyihao's avatar
dengyihao 已提交
246 247 248 249
  } while (0)

#define QW_ERR_RET(c)                 \
  do {                                \
D
dapan1121 已提交
250
    int32_t _code = (c);                \
dengyihao's avatar
dengyihao 已提交
251 252 253 254 255 256 257
    if (_code != TSDB_CODE_SUCCESS) { \
      terrno = _code;                 \
      return _code;                   \
    }                                 \
  } while (0)
#define QW_RET(c)                     \
  do {                                \
D
dapan1121 已提交
258
    int32_t _code = (c);                \
dengyihao's avatar
dengyihao 已提交
259 260 261 262 263 264 265
    if (_code != TSDB_CODE_SUCCESS) { \
      terrno = _code;                 \
    }                                 \
    return _code;                     \
  } while (0)
#define QW_ERR_JRET(c)               \
  do {                               \
D
dapan1121 已提交
266
    code = (c);                        \
dengyihao's avatar
dengyihao 已提交
267 268 269 270 271
    if (code != TSDB_CODE_SUCCESS) { \
      terrno = code;                 \
      goto _return;                  \
    }                                \
  } while (0)
D
dapan1121 已提交
272

D
dapan1121 已提交
273 274
#define QW_ELOG(_param, ...) qError("QW:%p " _param, mgmt, __VA_ARGS__)
#define QW_DLOG(_param, ...) qDebug("QW:%p " _param, mgmt, __VA_ARGS__)
D
dapan1121 已提交
275
#define QW_TLOG(_param, ...) qTrace("QW:%p " _param, mgmt, __VA_ARGS__)
D
dapan1121 已提交
276

D
dapan1121 已提交
277
#define QW_DUMP(_param, ...)                      \
H
Hongze Cheng 已提交
278 279
  do {                                            \
    if (gQWDebug.dumpEnable) {                    \
D
dapan1121 已提交
280
      qDebug("QW:%p " _param, mgmt, __VA_ARGS__); \
H
Hongze Cheng 已提交
281
    }                                             \
dengyihao's avatar
dengyihao 已提交
282 283 284 285 286
  } while (0)

#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)

D
dapan1121 已提交
287 288 289
#define QW_TASK_ELOG(param, ...) qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__)
dengyihao's avatar
dengyihao 已提交
290
#define QW_TASK_DLOGL(param, ...) \
D
dapan1121 已提交
291
  qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__)
dengyihao's avatar
dengyihao 已提交
292

D
dapan1121 已提交
293 294 295
#define QW_TASK_ELOG_E(param) qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId)
#define QW_TASK_WLOG_E(param) qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId)
#define QW_TASK_DLOG_E(param) qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId)
dengyihao's avatar
dengyihao 已提交
296

H
Hongze Cheng 已提交
297 298 299 300 301 302 303 304 305
#define QW_SCH_TASK_ELOG(param, ...)                                                                            \
  qError("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, \
         __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...)                                                                           \
  qWarn("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, \
        __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...)                                                                            \
  qDebug("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, \
         __VA_ARGS__)
dengyihao's avatar
dengyihao 已提交
306 307 308 309 310 311 312

#define QW_LOCK_DEBUG(...)     \
  do {                         \
    if (gQWDebug.lockEnable) { \
      qDebug(__VA_ARGS__);     \
    }                          \
  } while (0)
D
dapan1121 已提交
313 314 315

#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000

dengyihao's avatar
dengyihao 已提交
316 317 318
#define QW_LOCK(type, _lock)                                                                       \
  do {                                                                                             \
    if (QW_READ == (type)) {                                                                       \
319 320
      assert(atomic_load_32((_lock)) >= 0);                                                        \
      QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
dengyihao's avatar
dengyihao 已提交
321
      taosRLockLatch(_lock);                                                                       \
322 323
      QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      assert(atomic_load_32((_lock)) > 0);                                                         \
dengyihao's avatar
dengyihao 已提交
324
    } else {                                                                                       \
325 326
      assert(atomic_load_32((_lock)) >= 0);                                                        \
      QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
dengyihao's avatar
dengyihao 已提交
327
      taosWLockLatch(_lock);                                                                       \
328 329
      QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY);                               \
dengyihao's avatar
dengyihao 已提交
330 331 332 333 334 335
    }                                                                                              \
  } while (0)

#define QW_UNLOCK(type, _lock)                                                                      \
  do {                                                                                              \
    if (QW_READ == (type)) {                                                                        \
336 337
      assert(atomic_load_32((_lock)) > 0);                                                          \
      QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
dengyihao's avatar
dengyihao 已提交
338
      taosRUnLockLatch(_lock);                                                                      \
339 340
      QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      assert(atomic_load_32((_lock)) >= 0);                                                         \
dengyihao's avatar
dengyihao 已提交
341
    } else {                                                                                        \
342 343
      assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY);                                \
      QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
dengyihao's avatar
dengyihao 已提交
344
      taosWUnLockLatch(_lock);                                                                      \
345 346
      QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
      assert(atomic_load_32((_lock)) >= 0);                                                         \
dengyihao's avatar
dengyihao 已提交
347 348
    }                                                                                               \
  } while (0)
D
dapan1121 已提交
349

D
dapan1121 已提交
350 351
extern SQWorkerMgmt gQwMgmt;

H
Hongze Cheng 已提交
352 353 354
static FORCE_INLINE SQWorker *qwAcquire(int64_t refId) {
  return (SQWorker *)taosAcquireRef(atomic_load_32(&gQwMgmt.qwRef), refId);
}
wafwerar's avatar
wafwerar 已提交
355
static FORCE_INLINE int32_t qwRelease(int64_t refId) { return taosReleaseRef(gQwMgmt.qwRef, refId); }
D
dapan1121 已提交
356

H
Hongze Cheng 已提交
357 358
char   *qwPhaseStr(int32_t phase);
char   *qwBufStatusStr(int32_t bufStatus);
D
dapan1121 已提交
359
int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch);
H
Hongze Cheng 已提交
360
void    qwReleaseScheduler(int32_t rwType, SQWorker *mgmt);
D
dapan1121 已提交
361 362 363 364
int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status);
int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
H
Hongze Cheng 已提交
365
void    qwReleaseTaskCtx(SQWorker *mgmt, void *ctx);
D
dapan1121 已提交
366
int32_t qwKillTaskHandle(SQWTaskCtx *ctx);
D
dapan1121 已提交
367 368
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status);
int32_t qwDropTask(QW_FPARAMS_DEF);
H
Hongze Cheng 已提交
369
void    qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx);
D
dapan1121 已提交
370
int32_t qwOpenRef(void);
H
Hongze Cheng 已提交
371
void    qwSetHbParam(int64_t refId, SQWHbParam **pParam);
D
dapan1121 已提交
372 373
int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);
int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type);
H
Hongze Cheng 已提交
374
void    qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch);
D
dapan1121 已提交
375
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch);
H
Hongze Cheng 已提交
376
void    qwFreeTaskCtx(SQWTaskCtx *ctx);
D
dapan1121 已提交
377

H
Hongze Cheng 已提交
378
void    qwDbgDumpMgmtInfo(SQWorker *mgmt);
D
dapan1121 已提交
379
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);
D
dapan1121 已提交
380 381
int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet);
int32_t qwAddTaskCtx(QW_FPARAMS_DEF);
H
Hongze Cheng 已提交
382 383 384
void    qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped);
void    qwDbgSimulateSleep(void);
void    qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped);
D
dapan1121 已提交
385

D
dapan1121 已提交
386 387 388 389 390
#ifdef __cplusplus
}
#endif

#endif /*_TD_QWORKER_INT_H_*/