qworker.c 46.0 KB
Newer Older
D
dapan1121 已提交
1
#include "qworker.h"
dengyihao's avatar
dengyihao 已提交
2
#include "dataSinkMgt.h"
3
#include "executor.h"
D
dapan1121 已提交
4
#include "planner.h"
H
Haojun Liao 已提交
5 6
#include "query.h"
#include "qworkerInt.h"
D
dapan1121 已提交
7
#include "qworkerMsg.h"
dengyihao's avatar
dengyihao 已提交
8
#include "tcommon.h"
H
Haojun Liao 已提交
9
#include "tmsg.h"
10
#include "tname.h"
D
dapan1121 已提交
11

D
dapan1121 已提交
12
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true};
13

D
dapan1121 已提交
14 15 16
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
  if (!gQWDebug.statusEnable) {
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
17
  }
18

D
dapan1121 已提交
19
  int32_t code = 0;
D
dapan1121 已提交
20

D
dapan1121 已提交
21
  if (oriStatus == newStatus) {
D
dapan1121 已提交
22 23 24 25
    if (newStatus == JOB_TASK_STATUS_EXECUTING || newStatus == JOB_TASK_STATUS_FAILED) {
      *ignore = true;
      return TSDB_CODE_SUCCESS;
    }
H
Haojun Liao 已提交
26

D
dapan1121 已提交
27 28
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }
dengyihao's avatar
dengyihao 已提交
29

D
dapan1121 已提交
30 31
  switch (oriStatus) {
    case JOB_TASK_STATUS_NULL:
dengyihao's avatar
dengyihao 已提交
32 33
      if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED &&
          newStatus != JOB_TASK_STATUS_NOT_START) {
D
dapan1121 已提交
34 35
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
36

D
dapan1121 已提交
37 38
      break;
    case JOB_TASK_STATUS_NOT_START:
D
dapan1121 已提交
39
      if (newStatus != JOB_TASK_STATUS_CANCELLED) {
D
dapan1121 已提交
40 41
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
42

D
dapan1121 已提交
43 44
      break;
    case JOB_TASK_STATUS_EXECUTING:
dengyihao's avatar
dengyihao 已提交
45 46 47
      if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED && newStatus != JOB_TASK_STATUS_SUCCEED &&
          newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_CANCELLING &&
          newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
48 49
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
50

D
dapan1121 已提交
51 52
      break;
    case JOB_TASK_STATUS_PARTIAL_SUCCEED:
dengyihao's avatar
dengyihao 已提交
53 54 55
      if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_SUCCEED &&
          newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_FAILED &&
          newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
56 57
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
58

D
dapan1121 已提交
59 60
      break;
    case JOB_TASK_STATUS_SUCCEED:
dengyihao's avatar
dengyihao 已提交
61 62
      if (newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING &&
          newStatus != JOB_TASK_STATUS_FAILED) {
D
dapan1121 已提交
63 64 65 66
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
67
    case JOB_TASK_STATUS_FAILED:
D
dapan1121 已提交
68 69 70 71
      if (newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING) {
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      break;
H
Haojun Liao 已提交
72

D
dapan1121 已提交
73 74 75 76
    case JOB_TASK_STATUS_CANCELLING:
      if (newStatus != JOB_TASK_STATUS_CANCELLED) {
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
77

D
dapan1121 已提交
78 79
      break;
    case JOB_TASK_STATUS_CANCELLED:
D
dapan1121 已提交
80
    case JOB_TASK_STATUS_DROPPING:
D
dapan1121 已提交
81 82 83
      if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
D
dapan1121 已提交
84
      break;
dengyihao's avatar
dengyihao 已提交
85

D
dapan1121 已提交
86
    default:
D
dapan1121 已提交
87
      QW_TASK_ELOG("invalid task origStatus:%s", jobTaskStatusStr(oriStatus));
D
dapan1121 已提交
88 89 90
      return TSDB_CODE_QRY_APP_ERROR;
  }

D
dapan1121 已提交
91
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
92

D
dapan1121 已提交
93
_return:
D
dapan1121 已提交
94

D
dapan1121 已提交
95
  QW_TASK_ELOG("invalid task status update from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan1121 已提交
96
  QW_RET(code);
D
dapan1121 已提交
97 98
}

dengyihao's avatar
dengyihao 已提交
99
void qwDbgDumpSchInfo(SQWSchStatus *sch, int32_t i) {}
D
dapan1121 已提交
100 101 102 103 104 105 106

void qwDbgDumpMgmtInfo(SQWorkerMgmt *mgmt) {
  if (!gQWDebug.dumpEnable) {
    return;
  }

  QW_LOCK(QW_READ, &mgmt->schLock);
dengyihao's avatar
dengyihao 已提交
107

D
dapan1121 已提交
108 109
  QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash));

dengyihao's avatar
dengyihao 已提交
110 111 112
  void *        key = NULL;
  size_t        keyLen = 0;
  int32_t       i = 0;
D
dapan1121 已提交
113 114 115 116 117 118 119 120 121 122 123 124 125 126
  SQWSchStatus *sch = NULL;

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
    sch = (SQWSchStatus *)pIter;
    qwDbgDumpSchInfo(sch, i);
    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

  QW_UNLOCK(QW_READ, &mgmt->schLock);

  QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash));
}
D
dapan1121 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163

char *qwPhaseStr(int32_t phase) {
  switch (phase) {
    case QW_PHASE_PRE_QUERY:
      return "PRE_QUERY";
    case QW_PHASE_POST_QUERY:
      return "POST_QUERY";
    case QW_PHASE_PRE_FETCH:
      return "PRE_FETCH";
    case QW_PHASE_POST_FETCH:
      return "POST_FETCH";
    case QW_PHASE_PRE_CQUERY:
      return "PRE_CQUERY";
    case QW_PHASE_POST_CQUERY:
      return "POST_CQUERY";
    default:
      break;
  }

  return "UNKNOWN";
}

char *qwBufStatusStr(int32_t bufStatus) {
  switch (bufStatus) {
    case DS_BUF_LOW:
      return "LOW";
    case DS_BUF_FULL:
      return "FULL";
    case DS_BUF_EMPTY:
      return "EMPTY";
    default:
      break;
  }

  return "UNKNOWN";
}

D
dapan1121 已提交
164
int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) {
D
dapan1121 已提交
165
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
166 167
  int8_t  origStatus = 0;
  bool    ignore = false;
D
dapan1121 已提交
168 169 170

  while (true) {
    origStatus = atomic_load_8(&task->status);
dengyihao's avatar
dengyihao 已提交
171

D
dapan1121 已提交
172 173 174 175
    QW_ERR_RET(qwDbgValidateStatus(QW_FPARAMS(), origStatus, status, &ignore));
    if (ignore) {
      break;
    }
dengyihao's avatar
dengyihao 已提交
176

D
dapan1121 已提交
177 178
    if (origStatus != atomic_val_compare_exchange_8(&task->status, origStatus, status)) {
      continue;
D
dapan1121 已提交
179
    }
dengyihao's avatar
dengyihao 已提交
180

D
dapan1121 已提交
181
    QW_TASK_DLOG("task status updated from %s to %s", jobTaskStatusStr(origStatus), jobTaskStatusStr(status));
D
dapan1121 已提交
182 183

    break;
D
dapan1121 已提交
184
  }
dengyihao's avatar
dengyihao 已提交
185

D
dapan1121 已提交
186 187 188
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
189
int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType) {
D
dapan1121 已提交
190
  SQWSchStatus newSch = {0};
dengyihao's avatar
dengyihao 已提交
191 192
  newSch.tasksHash =
      taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
D
dapan1121 已提交
193
  if (NULL == newSch.tasksHash) {
D
dapan1121 已提交
194 195
    QW_SCH_ELOG("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
196 197
  }

D
dapan1121 已提交
198 199 200 201 202
  QW_LOCK(QW_WRITE, &mgmt->schLock);
  int32_t code = taosHashPut(mgmt->schHash, &sId, sizeof(sId), &newSch, sizeof(newSch));
  if (0 != code) {
    if (!HASH_NODE_EXIST(code)) {
      QW_UNLOCK(QW_WRITE, &mgmt->schLock);
dengyihao's avatar
dengyihao 已提交
203

D
dapan1121 已提交
204 205 206
      QW_SCH_ELOG("taosHashPut new sch to scheduleHash failed, errno:%d", errno);
      taosHashCleanup(newSch.tasksHash);
      QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
207
    }
D
dapan1121 已提交
208 209

    taosHashCleanup(newSch.tasksHash);
D
dapan1121 已提交
210
  }
D
dapan1121 已提交
211
  QW_UNLOCK(QW_WRITE, &mgmt->schLock);
D
dapan1121 已提交
212

dengyihao's avatar
dengyihao 已提交
213
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
214 215
}

D
dapan1121 已提交
216
int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
D
dapan1121 已提交
217 218 219 220 221
  while (true) {
    QW_LOCK(rwType, &mgmt->schLock);
    *sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
    if (NULL == (*sch)) {
      QW_UNLOCK(rwType, &mgmt->schLock);
dengyihao's avatar
dengyihao 已提交
222

D
dapan1121 已提交
223
      if (QW_NOT_EXIST_ADD == nOpt) {
D
dapan1121 已提交
224
        QW_ERR_RET(qwAddSchedulerImpl(mgmt, sId, rwType));
D
dapan1121 已提交
225 226

        nOpt = QW_NOT_EXIST_RET_ERR;
dengyihao's avatar
dengyihao 已提交
227

D
dapan1121 已提交
228 229 230 231
        continue;
      } else if (QW_NOT_EXIST_RET_ERR == nOpt) {
        QW_RET(TSDB_CODE_QRY_SCH_NOT_EXIST);
      } else {
D
dapan1121 已提交
232
        QW_SCH_ELOG("unknown notExistOpt:%d", nOpt);
D
dapan1121 已提交
233
        QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
234
      }
D
dapan1121 已提交
235
    }
D
dapan1121 已提交
236 237

    break;
D
dapan1121 已提交
238 239 240 241 242
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
243 244
int32_t qwAcquireAddScheduler(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
  return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_ADD);
D
dapan1121 已提交
245 246
}

D
dapan1121 已提交
247 248
int32_t qwAcquireScheduler(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
  return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_RET_ERR);
D
dapan1121 已提交
249 250
}

dengyihao's avatar
dengyihao 已提交
251
void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); }
D
dapan1121 已提交
252

D
dapan1121 已提交
253
int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) {
D
dapan1121 已提交
254 255 256 257 258 259 260 261 262 263 264 265 266
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);

  QW_LOCK(rwType, &sch->tasksLock);
  *task = taosHashGet(sch->tasksHash, id, sizeof(id));
  if (NULL == (*task)) {
    QW_UNLOCK(rwType, &sch->tasksLock);
    QW_ERR_RET(TSDB_CODE_QRY_TASK_NOT_EXIST);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
267
int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) {
D
dapan1121 已提交
268 269
  int32_t code = 0;

D
dapan1121 已提交
270 271
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
D
dapan1121 已提交
272

D
dapan1121 已提交
273 274
  SQWTaskStatus ntask = {0};
  ntask.status = status;
D
dapan1121 已提交
275
  ntask.refId = rId;
D
dapan1121 已提交
276 277 278 279 280 281

  QW_LOCK(QW_WRITE, &sch->tasksLock);
  code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask));
  if (0 != code) {
    QW_UNLOCK(QW_WRITE, &sch->tasksLock);
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
282 283
      if (rwType && task) {
        QW_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
D
dapan1121 已提交
284
      } else {
D
dapan1121 已提交
285
        QW_TASK_ELOG("task status already exist, newStatus:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
286
        QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
D
dapan1121 已提交
287 288
      }
    } else {
D
dapan1121 已提交
289
      QW_TASK_ELOG("taosHashPut to tasksHash failed, error:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
290
      QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
291 292 293
    }
  }
  QW_UNLOCK(QW_WRITE, &sch->tasksLock);
D
dapan1121 已提交
294

D
dapan1121 已提交
295 296
  QW_TASK_DLOG("task status added, newStatus:%s", jobTaskStatusStr(status));

D
dapan1121 已提交
297 298
  if (rwType && task) {
    QW_ERR_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
D
dapan1121 已提交
299 300
  }

D
dapan1121 已提交
301 302
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
303

D
dapan1121 已提交
304
int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status) {
D
dapan1121 已提交
305
  SQWSchStatus *tsch = NULL;
dengyihao's avatar
dengyihao 已提交
306
  int32_t       code = 0;
D
dapan1121 已提交
307
  QW_ERR_RET(qwAcquireAddScheduler(mgmt, sId, QW_READ, &tsch));
D
dapan1121 已提交
308

D
dapan1121 已提交
309
  QW_ERR_JRET(qwAddTaskStatusImpl(QW_FPARAMS(), tsch, 0, status, NULL));
D
dapan1121 已提交
310 311 312 313

_return:

  qwReleaseScheduler(QW_READ, mgmt);
dengyihao's avatar
dengyihao 已提交
314

D
dapan1121 已提交
315
  QW_RET(code);
D
dapan1121 已提交
316 317
}

dengyihao's avatar
dengyihao 已提交
318 319
int32_t qwAddAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, int32_t status,
                               SQWTaskStatus **task) {
D
dapan1121 已提交
320
  return qwAddTaskStatusImpl(QW_FPARAMS(), sch, rwType, status, task);
D
dapan1121 已提交
321
}
D
dapan1121 已提交
322

dengyihao's avatar
dengyihao 已提交
323
void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) { QW_UNLOCK(rwType, &sch->tasksLock); }
D
dapan1121 已提交
324

D
dapan1121 已提交
325
int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
D
dapan1121 已提交
326 327
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
H
Haojun Liao 已提交
328

D
dapan1121 已提交
329
  *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
D
dapan1121 已提交
330
  if (NULL == (*ctx)) {
D
dapan1121 已提交
331
    QW_TASK_DLOG_E("task ctx not exist, may be dropped");
D
dapan1121 已提交
332
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
333 334 335 336 337
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
338
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
D
dapan1121 已提交
339 340
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
dengyihao's avatar
dengyihao 已提交
341

D
dapan1121 已提交
342 343
  *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
  if (NULL == (*ctx)) {
D
dapan1121 已提交
344
    QW_TASK_DLOG_E("task ctx not exist, may be dropped");
D
dapan1121 已提交
345
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
346 347 348 349 350
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
351
int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) {
D
dapan1121 已提交
352 353 354
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);

D
dapan1121 已提交
355
  SQWTaskCtx nctx = {0};
D
dapan1121 已提交
356

D
dapan1121 已提交
357
  int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &nctx, sizeof(SQWTaskCtx));
D
dapan1121 已提交
358 359
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
360
      if (acquire && ctx) {
D
dapan1121 已提交
361
        QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
362 363
      } else if (ctx) {
        QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
364
      } else {
D
dapan1121 已提交
365
        QW_TASK_ELOG_E("task ctx already exist");
D
dapan1121 已提交
366 367 368
        QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
      }
    } else {
D
dapan1121 已提交
369
      QW_TASK_ELOG("taosHashPut to ctxHash failed, error:%x", code);
D
dapan1121 已提交
370 371 372 373
      QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

D
dapan1121 已提交
374
  if (acquire && ctx) {
D
dapan1121 已提交
375
    QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
376 377
  } else if (ctx) {
    QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
378 379 380 381 382
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
383
int32_t qwAddTaskCtx(QW_FPARAMS_DEF) { QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, NULL)); }
D
dapan1121 已提交
384

dengyihao's avatar
dengyihao 已提交
385
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTaskCtxImpl(QW_FPARAMS(), true, ctx); }
D
dapan1121 已提交
386

dengyihao's avatar
dengyihao 已提交
387
void qwReleaseTaskCtx(SQWorkerMgmt *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); }
D
dapan1121 已提交
388

dengyihao's avatar
dengyihao 已提交
389
void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) {
D
dapan1121 已提交
390
  // Note: free/kill may in RC
D
dapan1121 已提交
391 392 393
  qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
  if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
    qDestroyTask(otaskHandle);
D
dapan1121 已提交
394 395
  }
}
D
dapan1121 已提交
396

D
dapan1121 已提交
397 398
int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
  int32_t code = 0;
D
dapan1121 已提交
399
  // Note: free/kill may in RC
D
dapan1121 已提交
400 401
  qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
  if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
D
dapan1121 已提交
402
    code = qAsyncKillTask(taskHandle);
D
dapan1121 已提交
403
    atomic_store_ptr(&ctx->taskHandle, taskHandle);
D
dapan1121 已提交
404
  }
D
dapan1121 已提交
405

D
dapan1121 已提交
406 407 408 409
  QW_RET(code);
}

void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
D
dapan1121 已提交
410 411
  tmsgReleaseHandle(ctx->ctrlConnInfo.handle, TAOS_CONN_SERVER);
  ctx->ctrlConnInfo.handle = NULL;
dengyihao's avatar
dengyihao 已提交
412
  ctx->ctrlConnInfo.refId = -1;
D
dapan1121 已提交
413 414

  // NO need to release dataConnInfo
D
dapan1121 已提交
415

D
dapan1121 已提交
416
  qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle);
dengyihao's avatar
dengyihao 已提交
417

D
dapan1121 已提交
418 419 420
  if (ctx->sinkHandle) {
    dsDestroyDataSinker(ctx->sinkHandle);
    ctx->sinkHandle = NULL;
D
dapan1121 已提交
421
  }
D
dapan1121 已提交
422
}
D
dapan1121 已提交
423

D
dapan1121 已提交
424
int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
D
dapan1121 已提交
425 426 427
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
  SQWTaskCtx octx;
D
dapan1121 已提交
428

D
dapan1121 已提交
429 430
  SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
  if (NULL == ctx) {
D
dapan1121 已提交
431
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
432
  }
D
dapan1121 已提交
433

D
dapan1121 已提交
434
  octx = *ctx;
D
dapan1121 已提交
435

D
dapan1121 已提交
436 437 438
  atomic_store_ptr(&ctx->taskHandle, NULL);
  atomic_store_ptr(&ctx->sinkHandle, NULL);

D
dapan1121 已提交
439 440
  QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP);

D
dapan1121 已提交
441
  if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
dengyihao's avatar
dengyihao 已提交
442
    QW_TASK_ELOG_E("taosHashRemove from ctx hash failed");
D
dapan1121 已提交
443
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
444 445
  }

D
dapan1121 已提交
446
  qwFreeTask(QW_FPARAMS(), &octx);
D
dapan1121 已提交
447 448

  QW_TASK_DLOG_E("task ctx dropped");
dengyihao's avatar
dengyihao 已提交
449

D
dapan1121 已提交
450 451 452
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
453
int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
dengyihao's avatar
dengyihao 已提交
454
  SQWSchStatus * sch = NULL;
D
dapan1121 已提交
455
  SQWTaskStatus *task = NULL;
dengyihao's avatar
dengyihao 已提交
456 457
  int32_t        code = 0;

D
dapan1121 已提交
458 459
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
D
dapan1121 已提交
460

D
dapan1121 已提交
461
  if (qwAcquireScheduler(mgmt, sId, QW_WRITE, &sch)) {
D
dapan1121 已提交
462
    QW_TASK_WLOG_E("scheduler does not exist");
D
dapan1121 已提交
463 464
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
465

D
dapan1121 已提交
466 467
  if (qwAcquireTaskStatus(QW_FPARAMS(), QW_WRITE, sch, &task)) {
    qwReleaseScheduler(QW_WRITE, mgmt);
dengyihao's avatar
dengyihao 已提交
468

D
dapan1121 已提交
469
    QW_TASK_WLOG_E("task does not exist");
D
dapan1121 已提交
470 471
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
472

D
dapan1121 已提交
473
  if (taosHashRemove(sch->tasksHash, id, sizeof(id))) {
D
dapan1121 已提交
474
    QW_TASK_ELOG_E("taosHashRemove task from hash failed");
D
dapan1121 已提交
475 476 477
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

D
dapan1121 已提交
478
  QW_TASK_DLOG_E("task status dropped");
D
dapan1121 已提交
479 480 481

_return:

D
dapan1121 已提交
482 483 484
  if (task) {
    qwReleaseTaskStatus(QW_WRITE, sch);
  }
D
dapan1121 已提交
485
  qwReleaseScheduler(QW_WRITE, mgmt);
dengyihao's avatar
dengyihao 已提交
486

D
dapan1121 已提交
487 488 489
  QW_RET(code);
}

D
dapan1121 已提交
490
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) {
dengyihao's avatar
dengyihao 已提交
491
  SQWSchStatus * sch = NULL;
D
dapan1121 已提交
492
  SQWTaskStatus *task = NULL;
dengyihao's avatar
dengyihao 已提交
493
  int32_t        code = 0;
D
dapan1121 已提交
494

D
dapan1121 已提交
495
  QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch));
D
dapan1121 已提交
496
  QW_ERR_JRET(qwAcquireTaskStatus(QW_FPARAMS(), QW_READ, sch, &task));
D
dapan1121 已提交
497

D
dapan1121 已提交
498
  QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status));
dengyihao's avatar
dengyihao 已提交
499

D
dapan1121 已提交
500 501
_return:

D
dapan1121 已提交
502 503 504
  if (task) {
    qwReleaseTaskStatus(QW_READ, sch);
  }
D
dapan1121 已提交
505
  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
506 507 508 509

  QW_RET(code);
}

D
dapan1121 已提交
510
int32_t qwDropTask(QW_FPARAMS_DEF) {
H
Haojun Liao 已提交
511
  QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS()));
D
dapan1121 已提交
512
  QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS()));
H
Haojun Liao 已提交
513

D
dapan1121 已提交
514 515
  QW_TASK_DLOG_E("task is dropped");

D
dapan1121 已提交
516 517 518
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
519
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
dengyihao's avatar
dengyihao 已提交
520
  qTaskInfo_t *taskHandle = &ctx->taskHandle;
D
dapan1121 已提交
521 522 523 524

  if (TASK_TYPE_TEMP == ctx->taskType) {
    if (ctx->explain) {
      SExplainExecInfo *execInfo = NULL;
dengyihao's avatar
dengyihao 已提交
525
      int32_t           resNum = 0;
D
dapan1121 已提交
526 527
      QW_ERR_RET(qGetExplainExecInfo(ctx->taskHandle, &resNum, &execInfo));

D
dapan1121 已提交
528 529
      SQWConnInfo connInfo = {0};
      connInfo.handle = ctx->ctrlConnInfo.handle;
dengyihao's avatar
dengyihao 已提交
530 531
      connInfo.refId = ctx->ctrlConnInfo.refId;

D
dapan1121 已提交
532
      QW_ERR_RET(qwBuildAndSendExplainRsp(&connInfo, execInfo, resNum));
D
dapan1121 已提交
533
    }
dengyihao's avatar
dengyihao 已提交
534

D
dapan1121 已提交
535 536 537 538 539 540
    qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
541
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
dengyihao's avatar
dengyihao 已提交
542 543 544 545 546 547 548
  int32_t        code = 0;
  bool           qcontinue = true;
  SSDataBlock *  pRes = NULL;
  uint64_t       useconds = 0;
  int32_t        i = 0;
  int32_t        execNum = 0;
  qTaskInfo_t *  taskHandle = &ctx->taskHandle;
D
dapan1121 已提交
549
  DataSinkHandle sinkHandle = ctx->sinkHandle;
dengyihao's avatar
dengyihao 已提交
550

D
dapan1121 已提交
551
  while (true) {
H
Haojun Liao 已提交
552
    QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
D
dapan1121 已提交
553

D
dapan1121 已提交
554
    code = qExecTask(*taskHandle, &pRes, &useconds);
D
dapan1121 已提交
555
    if (code) {
D
dapan1121 已提交
556 557
      QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
      QW_ERR_RET(code);
D
dapan1121 已提交
558 559
    }

D
dapan1121 已提交
560 561
    ++execNum;

D
dapan1121 已提交
562
    if (NULL == pRes) {
dengyihao's avatar
dengyihao 已提交
563
      QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
H
Haojun Liao 已提交
564

D
dapan1121 已提交
565
      dsEndPut(sinkHandle, useconds);
D
dapan1121 已提交
566 567

      QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
D
dapan1121 已提交
568 569 570 571

      if (queryEnd) {
        *queryEnd = true;
      }
dengyihao's avatar
dengyihao 已提交
572

D
dapan1121 已提交
573 574 575
      break;
    }

D
dapan1121 已提交
576
    int32_t rows = pRes->info.rows;
H
Haojun Liao 已提交
577

578 579
    ASSERT(pRes->info.rows > 0);

H
Haojun Liao 已提交
580
    SInputData inputData = {.pData = pRes};
D
dapan1121 已提交
581 582
    code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
    if (code) {
D
dapan1121 已提交
583 584
      QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code));
      QW_ERR_RET(code);
D
dapan1121 已提交
585
    }
D
dapan1121 已提交
586

D
dapan1121 已提交
587
    QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", rows, qcontinue);
dengyihao's avatar
dengyihao 已提交
588

D
dapan1121 已提交
589 590 591 592
    if (!qcontinue) {
      break;
    }

D
dapan1121 已提交
593 594 595 596 597
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) {
      break;
    }

    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
598 599
      break;
    }
D
dapan1121 已提交
600

D
dapan1121 已提交
601 602 603
    if (atomic_load_32(&ctx->rspCode)) {
      break;
    }
D
dapan1121 已提交
604 605 606 607
  }

  QW_RET(code);
}
D
dapan1121 已提交
608

D
dapan1121 已提交
609
int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
D
dapan1121 已提交
610 611
  int32_t taskNum = 0;

D
dapan1121 已提交
612
  hbInfo->connInfo = sch->hbConnInfo;
D
dapan1121 已提交
613
  hbInfo->rsp.epId = sch->hbEpId;
D
dapan1121 已提交
614

D
dapan1121 已提交
615
  QW_LOCK(QW_READ, &sch->tasksLock);
dengyihao's avatar
dengyihao 已提交
616

D
dapan1121 已提交
617
  taskNum = taosHashGetSize(sch->tasksHash);
D
dapan1121 已提交
618 619 620

  hbInfo->rsp.taskStatus = taosArrayInit(taskNum, sizeof(STaskStatus));
  if (NULL == hbInfo->rsp.taskStatus) {
D
dapan1121 已提交
621
    QW_UNLOCK(QW_READ, &sch->tasksLock);
D
dapan1121 已提交
622
    QW_ELOG("taosArrayInit taskStatus failed, num:%d", taskNum);
D
dapan1121 已提交
623 624 625
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
626 627 628
  void *      key = NULL;
  size_t      keyLen = 0;
  int32_t     i = 0;
D
dapan1121 已提交
629
  STaskStatus status = {0};
D
dapan1121 已提交
630 631 632 633

  void *pIter = taosHashIterate(sch->tasksHash, NULL);
  while (pIter) {
    SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
D
dapan1121 已提交
634
    key = taosHashGetKey(pIter, &keyLen);
D
dapan1121 已提交
635

dengyihao's avatar
dengyihao 已提交
636
    // TODO GET EXECUTOR API TO GET MORE INFO
D
dapan1121 已提交
637

D
dapan1121 已提交
638 639 640
    QW_GET_QTID(key, status.queryId, status.taskId);
    status.status = taskStatus->status;
    status.refId = taskStatus->refId;
dengyihao's avatar
dengyihao 已提交
641

D
dapan1121 已提交
642
    taosArrayPush(hbInfo->rsp.taskStatus, &status);
dengyihao's avatar
dengyihao 已提交
643

D
dapan1121 已提交
644 645
    ++i;
    pIter = taosHashIterate(sch->tasksHash, pIter);
dengyihao's avatar
dengyihao 已提交
646
  }
D
dapan1121 已提交
647 648 649 650 651 652

  QW_UNLOCK(QW_READ, &sch->tasksLock);

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
653
int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
dengyihao's avatar
dengyihao 已提交
654
  int32_t            len = 0;
D
dapan1121 已提交
655
  SRetrieveTableRsp *rsp = NULL;
dengyihao's avatar
dengyihao 已提交
656 657
  bool               queryEnd = false;
  int32_t            code = 0;
D
dapan1121 已提交
658

D
dapan1121 已提交
659
  dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
D
dapan1121 已提交
660

D
dapan1121 已提交
661 662 663 664
  if (len < 0) {
    QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
D
dapan1121 已提交
665

D
dapan1121 已提交
666 667
  if (len == 0) {
    if (queryEnd) {
D
dapan 已提交
668
      code = dsGetDataBlock(ctx->sinkHandle, pOutput);
D
dapan1121 已提交
669
      if (code) {
D
dapan1121 已提交
670
        QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
671 672
        QW_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
673

D
dapan1121 已提交
674
      QW_TASK_DLOG_E("no data in sink and query end");
H
Haojun Liao 已提交
675

D
dapan1121 已提交
676
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
L
Liu Jicong 已提交
677
      QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
678

D
dapan1121 已提交
679
      *rspMsg = rsp;
D
dapan 已提交
680
      *dataLen = 0;
D
dapan1121 已提交
681 682
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
683 684

    pOutput->bufStatus = DS_BUF_EMPTY;
D
dapan1121 已提交
685

D
dapan1121 已提交
686
    return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
687
  }
D
dapan1121 已提交
688

D
dapan1121 已提交
689
  // Got data from sink
D
dapan1121 已提交
690
  QW_TASK_DLOG("there are data in sink, dataLength:%d", len);
D
dapan1121 已提交
691

D
dapan 已提交
692
  *dataLen = len;
dengyihao's avatar
dengyihao 已提交
693

D
dapan1121 已提交
694
  QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
D
dapan 已提交
695
  *rspMsg = rsp;
dengyihao's avatar
dengyihao 已提交
696

D
dapan 已提交
697 698
  pOutput->pData = rsp->data;
  code = dsGetDataBlock(ctx->sinkHandle, pOutput);
D
dapan1121 已提交
699
  if (code) {
D
dapan1121 已提交
700
    QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
701 702
    QW_ERR_RET(code);
  }
D
dapan1121 已提交
703

D
dapan1121 已提交
704
  if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
D
dapan1121 已提交
705
    QW_TASK_DLOG_E("task all data fetched, done");
D
dapan1121 已提交
706
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
707 708
  }

D
dapan1121 已提交
709
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
710 711
}

D
dapan1121 已提交
712
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
dengyihao's avatar
dengyihao 已提交
713 714
  int32_t      code = 0;
  SQWTaskCtx * ctx = NULL;
D
dapan1121 已提交
715 716
  SQWConnInfo *dropConnection = NULL;
  SQWConnInfo *cancelConnection = NULL;
D
dapan1121 已提交
717

D
dapan1121 已提交
718
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
719 720 721 722 723 724

  if (QW_PHASE_PRE_QUERY == phase) {
    QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
  } else {
    QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
  }
dengyihao's avatar
dengyihao 已提交
725

D
dapan1121 已提交
726
  QW_LOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
727

D
dapan1121 已提交
728
  if (QW_PHASE_PRE_FETCH == phase) {
dengyihao's avatar
dengyihao 已提交
729
    atomic_store_8((int8_t *)&ctx->queryFetched, true);
D
dapan1121 已提交
730
  } else {
D
dapan1121 已提交
731 732
    atomic_store_8(&ctx->phase, phase);
  }
D
dapan1121 已提交
733

dengyihao's avatar
dengyihao 已提交
734
  if (atomic_load_8((int8_t *)&ctx->queryEnd)) {
D
dapan1121 已提交
735 736 737
    QW_TASK_ELOG_E("query already end");
    QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
  }
D
dapan1121 已提交
738

D
dapan1121 已提交
739 740
  switch (phase) {
    case QW_PHASE_PRE_QUERY: {
D
dapan1121 已提交
741
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
H
Haojun Liao 已提交
742
        QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
743
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
D
dapan1121 已提交
744 745
        break;
      }
D
dapan1121 已提交
746

D
dapan1121 已提交
747
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
748
        dropConnection = &ctx->ctrlConnInfo;
D
dapan1121 已提交
749
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
750
        dropConnection = NULL;
751

D
dapan1121 已提交
752 753
        qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
754

D
dapan1121 已提交
755
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan 已提交
756
        break;
D
dapan1121 已提交
757
      }
D
dapan1121 已提交
758

D
dapan1121 已提交
759
      QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
D
dapan1121 已提交
760 761
      break;
    }
D
dapan1121 已提交
762
    case QW_PHASE_PRE_FETCH: {
D
dapan1121 已提交
763 764
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
        QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
765 766
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
      }
D
dapan1121 已提交
767

D
dapan1121 已提交
768
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
769
        QW_TASK_WLOG("last fetch still not processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
770 771 772 773
        QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
      }

      if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) {
D
dapan1121 已提交
774
        QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
775
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
D
dapan1121 已提交
776 777
      }
      break;
dengyihao's avatar
dengyihao 已提交
778
    }
D
dapan1121 已提交
779 780
    case QW_PHASE_PRE_CQUERY: {
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
781
        QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
782
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
783
      }
D
dapan1121 已提交
784

D
dapan1121 已提交
785
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
786
        dropConnection = &ctx->ctrlConnInfo;
D
dapan1121 已提交
787
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
788
        dropConnection = NULL;
H
Haojun Liao 已提交
789

D
dapan1121 已提交
790 791
        qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
792

D
dapan1121 已提交
793
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
794
      }
D
dapan1121 已提交
795

D
dapan1121 已提交
796
      break;
D
dapan1121 已提交
797 798 799 800 801 802 803
    }
    default:
      QW_TASK_ELOG("invalid phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

  if (ctx->rspCode) {
dengyihao's avatar
dengyihao 已提交
804 805
    QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode,
                 tstrerror(ctx->rspCode));
D
dapan1121 已提交
806
    QW_ERR_JRET(ctx->rspCode);
D
dapan1121 已提交
807
  }
D
dapan1121 已提交
808

D
dapan1121 已提交
809
_return:
D
dapan1121 已提交
810

D
dapan1121 已提交
811
  if (ctx) {
D
dapan1121 已提交
812
    QW_UPDATE_RSP_CODE(ctx, code);
dengyihao's avatar
dengyihao 已提交
813

D
dapan1121 已提交
814
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
815 816
    qwReleaseTaskCtx(mgmt, ctx);
  }
D
dapan1121 已提交
817

D
dapan1121 已提交
818
  if (dropConnection) {
S
shm  
Shengliang Guan 已提交
819
    qwBuildAndSendDropRsp(dropConnection, code);
D
dapan1121 已提交
820
    QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", dropConnection->handle, code, tstrerror(code));
D
dapan1121 已提交
821
  }
D
dapan1121 已提交
822

D
dapan1121 已提交
823
  if (cancelConnection) {
S
shm  
Shengliang Guan 已提交
824
    qwBuildAndSendCancelRsp(cancelConnection, code);
D
dapan1121 已提交
825
    QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code));
D
dapan1121 已提交
826 827
  }

D
dapan1121 已提交
828
  QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
D
dapan1121 已提交
829 830 831 832 833

  QW_RET(code);
}

int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
dengyihao's avatar
dengyihao 已提交
834 835 836
  int32_t      code = 0;
  SQWTaskCtx * ctx = NULL;
  SQWConnInfo  connInfo = {0};
D
dapan1121 已提交
837
  SQWConnInfo *readyConnection = NULL;
D
dapan1121 已提交
838

D
dapan1121 已提交
839
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
dengyihao's avatar
dengyihao 已提交
840

D
dapan1121 已提交
841
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
842

D
dapan1121 已提交
843 844 845
  QW_LOCK(QW_WRITE, &ctx->lock);

  if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
846
    QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
847 848 849 850
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
  }

  if (QW_PHASE_POST_QUERY == phase) {
D
dapan1121 已提交
851
#if 0    
D
dapan1121 已提交
852
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
D
dapan1121 已提交
853
      readyConnection = &ctx->connInfo;
D
dapan1121 已提交
854 855
      QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
    }
D
dapan1121 已提交
856
#else
D
dapan1121 已提交
857
    connInfo.handle = ctx->ctrlConnInfo.handle;
dengyihao's avatar
dengyihao 已提交
858
    connInfo.refId = ctx->ctrlConnInfo.refId;
D
dapan1121 已提交
859
    readyConnection = &connInfo;
dengyihao's avatar
dengyihao 已提交
860

D
dapan1121 已提交
861 862
    QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
#endif
D
dapan1121 已提交
863 864
  }

D
dapan1121 已提交
865
  if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
866 867 868 869
    if (QW_PHASE_POST_FETCH == phase) {
      QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
D
dapan1121 已提交
870

D
dapan1121 已提交
871 872
    qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
    QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
873

D
dapan1121 已提交
874 875
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
876 877 878
  }

  if (ctx->rspCode) {
dengyihao's avatar
dengyihao 已提交
879 880
    QW_TASK_ELOG("task already failed, phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode,
                 tstrerror(ctx->rspCode));
D
dapan1121 已提交
881
    QW_ERR_JRET(ctx->rspCode);
dengyihao's avatar
dengyihao 已提交
882
  }
D
dapan1121 已提交
883

D
dapan1121 已提交
884
  QW_ERR_JRET(input->code);
D
dapan1121 已提交
885 886 887

_return:

D
dapan1121 已提交
888 889 890 891
  if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED);
  }

D
dapan1121 已提交
892
  if (ctx) {
D
dapan1121 已提交
893
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
894

D
dapan1121 已提交
895 896 897
    if (QW_PHASE_POST_FETCH != phase) {
      atomic_store_8(&ctx->phase, phase);
    }
dengyihao's avatar
dengyihao 已提交
898

D
dapan1121 已提交
899
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
900
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
901 902
  }

D
dapan1121 已提交
903
  if (readyConnection) {
dengyihao's avatar
dengyihao 已提交
904
    qwBuildAndSendReadyRsp(readyConnection, code);
D
dapan1121 已提交
905
    QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code));
D
dapan1121 已提交
906 907
  }

D
dapan1121 已提交
908 909
  if (code) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
D
dapan1121 已提交
910 911
  }

D
dapan1121 已提交
912
  QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
D
dapan1121 已提交
913

D
dapan1121 已提交
914 915 916
  QW_RET(code);
}

D
dapan1121 已提交
917
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) {
dengyihao's avatar
dengyihao 已提交
918 919
  int32_t          code = 0;
  bool             queryRsped = false;
D
dapan1121 已提交
920
  struct SSubplan *plan = NULL;
dengyihao's avatar
dengyihao 已提交
921 922 923 924
  SQWPhaseInput    input = {0};
  qTaskInfo_t      pTaskInfo = NULL;
  DataSinkHandle   sinkHandle = NULL;
  SQWTaskCtx *     ctx = NULL;
D
dapan1121 已提交
925

D
dapan1121 已提交
926
  QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
D
dapan1121 已提交
927

D
dapan1121 已提交
928
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
D
dapan1121 已提交
929 930

  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
931

D
dapan1121 已提交
932
  atomic_store_8(&ctx->taskType, taskType);
D
dapan1121 已提交
933
  atomic_store_8(&ctx->explain, explain);
X
Xiaoyu Wang 已提交
934

D
dapan1121 已提交
935 936
  atomic_store_ptr(&ctx->ctrlConnInfo.handle, qwMsg->connInfo.handle);
  atomic_store_ptr(&ctx->ctrlConnInfo.ahandle, qwMsg->connInfo.ahandle);
dengyihao's avatar
dengyihao 已提交
937
  atomic_store_64(&ctx->ctrlConnInfo.refId, qwMsg->connInfo.refId);
D
dapan1121 已提交
938 939

  QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
X
Xiaoyu Wang 已提交
940

D
dapan1121 已提交
941 942
  code = qStringToSubplan(qwMsg->msg, &plan);
  if (TSDB_CODE_SUCCESS != code) {
D
dapan1121 已提交
943
    QW_TASK_ELOG("task string to subplan failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
944
    QW_ERR_JRET(code);
D
dapan1121 已提交
945
  }
dengyihao's avatar
dengyihao 已提交
946

947
  code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
948
  if (code) {
D
dapan1121 已提交
949
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
950
    QW_ERR_JRET(code);
D
dapan1121 已提交
951
  }
D
dapan1121 已提交
952

H
Haojun Liao 已提交
953
  if (NULL == sinkHandle || NULL == pTaskInfo) {
D
dapan1121 已提交
954 955 956 957
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

dengyihao's avatar
dengyihao 已提交
958 959
  // QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
  // QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
960

dengyihao's avatar
dengyihao 已提交
961
  // queryRsped = true;
D
dapan1121 已提交
962

D
dapan1121 已提交
963 964 965
  atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
  atomic_store_ptr(&ctx->sinkHandle, sinkHandle);

D
dapan1121 已提交
966
  if (pTaskInfo && sinkHandle) {
D
dapan1121 已提交
967
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
D
dapan1121 已提交
968
  }
dengyihao's avatar
dengyihao 已提交
969

D
dapan1121 已提交
970 971
_return:

D
dapan1121 已提交
972 973
  input.code = code;
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
974

dengyihao's avatar
dengyihao 已提交
975
  // if (!queryRsped) {
D
dapan1121 已提交
976 977 978
  //  qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
  //  QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
  //}
D
dapan1121 已提交
979

D
dapan1121 已提交
980
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
981 982
}

D
dapan1121 已提交
983
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
984
  int32_t     code = 0;
D
dapan1121 已提交
985
  SQWTaskCtx *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
986 987
  int8_t      phase = 0;
  bool        needRsp = true;
D
dapan1121 已提交
988 989

  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
990

D
dapan1121 已提交
991 992
  QW_LOCK(QW_WRITE, &ctx->lock);

D
dapan1121 已提交
993 994 995
  if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
    QW_TASK_WLOG_E("task is dropping or already dropped");
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
996
  }
dengyihao's avatar
dengyihao 已提交
997

D
dapan1121 已提交
998
  if (ctx->phase == QW_PHASE_PRE_QUERY) {
999
    ctx->ctrlConnInfo.handle = qwMsg->connInfo.handle;
D
dapan1121 已提交
1000
    ctx->ctrlConnInfo.ahandle = qwMsg->connInfo.ahandle;
D
dapan1121 已提交
1001
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY);
D
dapan1121 已提交
1002 1003 1004 1005 1006 1007 1008
    needRsp = false;
    QW_TASK_DLOG_E("ready msg will not rsp now");
    goto _return;
  }

  QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);

dengyihao's avatar
dengyihao 已提交
1009 1010 1011
  if (atomic_load_8((int8_t *)&ctx->queryEnd) || atomic_load_8((int8_t *)&ctx->queryFetched)) {
    QW_TASK_ELOG("got ready msg at wrong status, queryEnd:%d, queryFetched:%d", atomic_load_8((int8_t *)&ctx->queryEnd),
                 atomic_load_8((int8_t *)&ctx->queryFetched));
D
dapan1121 已提交
1012 1013 1014
    QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
  }

D
dapan1121 已提交
1015 1016 1017
  if (ctx->phase == QW_PHASE_POST_QUERY) {
    code = ctx->rspCode;
    goto _return;
D
dapan1121 已提交
1018 1019
  }

D
dapan1121 已提交
1020
  QW_TASK_ELOG("invalid phase when got ready msg, phase:%s", qwPhaseStr(ctx->phase));
H
Haojun Liao 已提交
1021

D
dapan1121 已提交
1022
  QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
D
dapan1121 已提交
1023 1024 1025

_return:

D
dapan1121 已提交
1026
  if (code && ctx) {
D
dapan1121 已提交
1027
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
1028 1029
  }

D
dapan1121 已提交
1030 1031 1032
  if (code) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
  }
H
Haojun Liao 已提交
1033

D
dapan1121 已提交
1034 1035
  if (ctx) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
1036
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
1037 1038
  }

D
dapan1121 已提交
1039
  if (needRsp) {
S
shm  
Shengliang Guan 已提交
1040
    qwBuildAndSendReadyRsp(&qwMsg->connInfo, code);
D
dapan1121 已提交
1041
    QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
1042 1043
  }

D
dapan1121 已提交
1044
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1045 1046
}

D
dapan1121 已提交
1047
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
1048 1049
  SQWTaskCtx *  ctx = NULL;
  int32_t       code = 0;
1050
  SQWPhaseInput input = {0};
dengyihao's avatar
dengyihao 已提交
1051 1052 1053 1054
  void *        rsp = NULL;
  int32_t       dataLen = 0;
  bool          queryEnd = false;

D
dapan1121 已提交
1055
  do {
D
dapan1121 已提交
1056
    QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
D
dapan1121 已提交
1057

D
dapan1121 已提交
1058
    QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
1059

dengyihao's avatar
dengyihao 已提交
1060 1061
    atomic_store_8((int8_t *)&ctx->queryInQueue, 0);
    atomic_store_8((int8_t *)&ctx->queryContinue, 0);
D
dapan1121 已提交
1062

D
dapan1121 已提交
1063
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd));
D
dapan1121 已提交
1064

D
dapan1121 已提交
1065 1066 1067
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
      SOutputData sOutput = {0};
      QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
dengyihao's avatar
dengyihao 已提交
1068 1069

      if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
1070
        QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
dengyihao's avatar
dengyihao 已提交
1071 1072

        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
1073
      }
dengyihao's avatar
dengyihao 已提交
1074

D
dapan1121 已提交
1075
      if (rsp) {
D
dapan1121 已提交
1076
        bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
1077

D
dapan1121 已提交
1078
        qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
1079
        if (qComplete) {
dengyihao's avatar
dengyihao 已提交
1080
          atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
1081
        }
H
Haojun Liao 已提交
1082

D
dapan1121 已提交
1083
        qwMsg->connInfo = ctx->dataConnInfo;
dengyihao's avatar
dengyihao 已提交
1084 1085 1086 1087 1088
        QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);

        qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
        QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
                     tstrerror(code), dataLen);
D
dapan1121 已提交
1089
      } else {
dengyihao's avatar
dengyihao 已提交
1090
        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
1091 1092 1093
      }
    }

dengyihao's avatar
dengyihao 已提交
1094
  _return:
1095

D
dapan1121 已提交
1096 1097 1098 1099
    if (NULL == ctx) {
      break;
    }

D
dapan1121 已提交
1100
    if (code && QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
dengyihao's avatar
dengyihao 已提交
1101
      QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
1102 1103
      qwFreeFetchRsp(rsp);
      rsp = NULL;
dengyihao's avatar
dengyihao 已提交
1104

D
dapan1121 已提交
1105
      qwMsg->connInfo = ctx->dataConnInfo;
S
shm  
Shengliang Guan 已提交
1106
      qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code);
dengyihao's avatar
dengyihao 已提交
1107 1108
      QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
                   0);
1109
    }
D
dapan1121 已提交
1110

D
dapan1121 已提交
1111
    QW_LOCK(QW_WRITE, &ctx->lock);
dengyihao's avatar
dengyihao 已提交
1112
    if (queryEnd || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
D
dapan1121 已提交
1113
      // Note: if necessary, fetch need to put cquery to queue again
D
dapan1121 已提交
1114
      atomic_store_8(&ctx->phase, 0);
dengyihao's avatar
dengyihao 已提交
1115
      QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
1116 1117
      break;
    }
dengyihao's avatar
dengyihao 已提交
1118
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
1119
  } while (true);
D
dapan1121 已提交
1120

D
dapan1121 已提交
1121
  input.code = code;
dengyihao's avatar
dengyihao 已提交
1122
  qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL);
D
dapan1121 已提交
1123

dengyihao's avatar
dengyihao 已提交
1124
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1125
}
D
dapan1121 已提交
1126

D
dapan1121 已提交
1127
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
1128 1129 1130 1131 1132
  int32_t       code = 0;
  int32_t       dataLen = 0;
  bool          locked = false;
  SQWTaskCtx *  ctx = NULL;
  void *        rsp = NULL;
D
dapan1121 已提交
1133
  SQWPhaseInput input = {0};
D
dapan1121 已提交
1134

D
dapan1121 已提交
1135
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
1136

1137
  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
1138

D
dapan 已提交
1139 1140
  SOutputData sOutput = {0};
  QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
D
dapan1121 已提交
1141

1142
  if (NULL == rsp) {
D
dapan1121 已提交
1143 1144
    atomic_store_ptr(&ctx->dataConnInfo.handle, qwMsg->connInfo.handle);
    atomic_store_ptr(&ctx->dataConnInfo.ahandle, qwMsg->connInfo.ahandle);
dengyihao's avatar
dengyihao 已提交
1145

1146
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
D
dapan1121 已提交
1147
  } else {
D
dapan1121 已提交
1148
    bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
1149

D
dapan1121 已提交
1150
    qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
1151
    if (qComplete) {
dengyihao's avatar
dengyihao 已提交
1152
      atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
1153
    }
D
dapan1121 已提交
1154 1155
  }

dengyihao's avatar
dengyihao 已提交
1156
  if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
1157
    QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
D
dapan1121 已提交
1158

D
dapan1121 已提交
1159 1160
    QW_LOCK(QW_WRITE, &ctx->lock);
    locked = true;
1161

D
dapan1121 已提交
1162
    // RC WARNING
D
dapan1121 已提交
1163
    if (QW_IS_QUERY_RUNNING(ctx)) {
dengyihao's avatar
dengyihao 已提交
1164 1165
      atomic_store_8((int8_t *)&ctx->queryContinue, 1);
    } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
H
Haojun Liao 已提交
1166
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING);
D
dapan1121 已提交
1167

dengyihao's avatar
dengyihao 已提交
1168 1169
      atomic_store_8((int8_t *)&ctx->queryInQueue, 1);

D
dapan1121 已提交
1170
      QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
1171
    }
D
dapan 已提交
1172
  }
dengyihao's avatar
dengyihao 已提交
1173

D
dapan1121 已提交
1174
_return:
D
dapan1121 已提交
1175

D
dapan1121 已提交
1176 1177 1178 1179 1180
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

  input.code = code;
D
dapan1121 已提交
1181
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL);
D
dapan1121 已提交
1182

D
dapan 已提交
1183 1184 1185
  if (code) {
    qwFreeFetchRsp(rsp);
    rsp = NULL;
D
dapan1121 已提交
1186
    dataLen = 0;
D
dapan1121 已提交
1187 1188 1189
  }

  if (code || rsp) {
S
shm  
Shengliang Guan 已提交
1190
    qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
dengyihao's avatar
dengyihao 已提交
1191 1192
    QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
                 dataLen);
D
dapan1121 已提交
1193 1194
  }

D
dapan1121 已提交
1195
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1196
}
D
dapan1121 已提交
1197

D
dapan1121 已提交
1198
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
1199 1200
  int32_t     code = 0;
  bool        rsped = false;
D
dapan1121 已提交
1201
  SQWTaskCtx *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
1202
  bool        locked = false;
D
dapan1121 已提交
1203

D
dapan1121 已提交
1204 1205
  // TODO : TASK ALREADY REMOVED AND A NEW DROP MSG RECEIVED

D
dapan1121 已提交
1206
  QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
1207

D
dapan1121 已提交
1208 1209 1210 1211 1212
  QW_LOCK(QW_WRITE, &ctx->lock);

  locked = true;

  if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
1213
    QW_TASK_WLOG_E("task already dropping");
D
dapan1121 已提交
1214 1215 1216
    QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
  }

D
dapan1121 已提交
1217
  if (QW_IS_QUERY_RUNNING(ctx)) {
D
dapan1121 已提交
1218
    QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
D
dapan1121 已提交
1219
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING);
D
dapan1121 已提交
1220
  } else if (ctx->phase > 0) {
D
dapan1121 已提交
1221 1222 1223 1224
    if (0 == qwMsg->code) {
      qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
      QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
    }
1225

D
dapan1121 已提交
1226
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
1227
    rsped = true;
D
dapan1121 已提交
1228 1229
  } else {
    // task not started
D
dapan1121 已提交
1230
  }
D
dapan1121 已提交
1231

D
dapan1121 已提交
1232
  if (!rsped) {
D
dapan1121 已提交
1233 1234
    ctx->ctrlConnInfo.handle = qwMsg->connInfo.handle;
    ctx->ctrlConnInfo.ahandle = qwMsg->connInfo.ahandle;
dengyihao's avatar
dengyihao 已提交
1235

D
dapan1121 已提交
1236 1237
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
  }
1238

D
dapan1121 已提交
1239
_return:
D
dapan1121 已提交
1240

D
dapan1121 已提交
1241
  if (code) {
D
dapan1121 已提交
1242 1243 1244
    if (ctx) {
      QW_UPDATE_RSP_CODE(ctx, code);
    }
H
Haojun Liao 已提交
1245

D
dapan1121 已提交
1246
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
D
dapan1121 已提交
1247 1248
  }

D
dapan 已提交
1249 1250 1251 1252
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

D
dapan1121 已提交
1253
  if (ctx) {
D
dapan1121 已提交
1254
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
1255 1256
  }

D
dapan1121 已提交
1257
  if ((TSDB_CODE_SUCCESS != code) && (0 == qwMsg->code)) {
S
shm  
Shengliang Guan 已提交
1258
    qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
D
dapan1121 已提交
1259
    QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
1260
  }
D
dapan1121 已提交
1261

D
dapan1121 已提交
1262
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1263
}
D
dapan1121 已提交
1264

D
dapan1121 已提交
1265
int32_t qwProcessHbLinkBroken(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
dengyihao's avatar
dengyihao 已提交
1266
  int32_t         code = 0;
D
dapan1121 已提交
1267
  SSchedulerHbRsp rsp = {0};
dengyihao's avatar
dengyihao 已提交
1268
  SQWSchStatus *  sch = NULL;
D
dapan1121 已提交
1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282

  QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));

  QW_LOCK(QW_WRITE, &sch->hbConnLock);

  if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
    tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER);
    sch->hbConnInfo.handle = NULL;
    sch->hbConnInfo.ahandle = NULL;

    QW_DLOG("release hb handle due to connection broken, handle:%p", qwMsg->connInfo.handle);
  } else {
    QW_DLOG("ignore hb connection broken, handle:%p, currentHandle:%p", qwMsg->connInfo.handle, sch->hbConnInfo.handle);
  }
dengyihao's avatar
dengyihao 已提交
1283

D
dapan1121 已提交
1284
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
dengyihao's avatar
dengyihao 已提交
1285

D
dapan1121 已提交
1286
  qwReleaseScheduler(QW_READ, mgmt);
dengyihao's avatar
dengyihao 已提交
1287

D
dapan1121 已提交
1288 1289 1290
  QW_RET(TSDB_CODE_SUCCESS);
}

D
dapan1121 已提交
1291
int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
dengyihao's avatar
dengyihao 已提交
1292
  int32_t         code = 0;
D
dapan1121 已提交
1293
  SSchedulerHbRsp rsp = {0};
dengyihao's avatar
dengyihao 已提交
1294
  SQWSchStatus *  sch = NULL;
D
dapan1121 已提交
1295

D
dapan1121 已提交
1296 1297 1298
  if (qwMsg->code) {
    QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req));
  }
D
dapan1121 已提交
1299 1300 1301

  QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));

D
dapan1121 已提交
1302 1303
  QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo));

D
dapan1121 已提交
1304
  QW_LOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
1305

D
dapan1121 已提交
1306
  if (sch->hbConnInfo.handle) {
S
shm  
Shengliang Guan 已提交
1307
    tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER);
D
dapan1121 已提交
1308
  }
D
dapan1121 已提交
1309

D
dapan1121 已提交
1310
  memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
D
dapan1121 已提交
1311
  memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId));
dengyihao's avatar
dengyihao 已提交
1312

D
dapan1121 已提交
1313
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
dengyihao's avatar
dengyihao 已提交
1314 1315 1316

  QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", req->sId,
          req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle);
D
dapan1121 已提交
1317

D
dapan1121 已提交
1318 1319 1320 1321
  qwReleaseScheduler(QW_READ, mgmt);

_return:

D
dapan1121 已提交
1322 1323
  memcpy(&rsp.epId, &req->epId, sizeof(req->epId));

S
shm  
Shengliang Guan 已提交
1324
  qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
D
dapan1121 已提交
1325 1326 1327 1328

  if (code) {
    tmsgReleaseHandle(qwMsg->connInfo.handle, TAOS_CONN_SERVER);
  }
dengyihao's avatar
dengyihao 已提交
1329

D
dapan1121 已提交
1330
  QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
dengyihao's avatar
dengyihao 已提交
1331

D
dapan1121 已提交
1332
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1333 1334 1335
}

void qwProcessHbTimerEvent(void *param, void *tmrId) {
D
dapan1121 已提交
1336 1337
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)param;
  SQWSchStatus *sch = NULL;
dengyihao's avatar
dengyihao 已提交
1338 1339 1340
  int32_t       taskNum = 0;
  SQWHbInfo *   rspList = NULL;
  int32_t       code = 0;
D
dapan1121 已提交
1341

D
dapan1121 已提交
1342 1343
  qwDbgDumpMgmtInfo(mgmt);

D
dapan1121 已提交
1344 1345 1346 1347 1348
  QW_LOCK(QW_READ, &mgmt->schLock);

  int32_t schNum = taosHashGetSize(mgmt->schHash);
  if (schNum <= 0) {
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
1349 1350
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
    return;
D
dapan1121 已提交
1351 1352
  }

wafwerar's avatar
wafwerar 已提交
1353
  rspList = taosMemoryCalloc(schNum, sizeof(SQWHbInfo));
D
dapan1121 已提交
1354 1355
  if (NULL == rspList) {
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
1356 1357 1358
    QW_ELOG("calloc %d SQWHbInfo failed", schNum);
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
    return;
D
dapan1121 已提交
1359 1360
  }

dengyihao's avatar
dengyihao 已提交
1361 1362
  void *  key = NULL;
  size_t  keyLen = 0;
D
dapan1121 已提交
1363 1364 1365 1366
  int32_t i = 0;

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
D
dapan1121 已提交
1367 1368 1369 1370 1371 1372 1373
    SQWSchStatus *sch = (SQWSchStatus *)pIter;
    if (NULL == sch->hbConnInfo.handle) {
      uint64_t *sId = taosHashGetKey(pIter, NULL);
      QW_DLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
      pIter = taosHashIterate(mgmt->schHash, pIter);
      continue;
    }
dengyihao's avatar
dengyihao 已提交
1374

D
dapan1121 已提交
1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389
    code = qwGenerateSchHbRsp(mgmt, (SQWSchStatus *)pIter, &rspList[i]);
    if (code) {
      taosHashCancelIterate(mgmt->schHash, pIter);
      QW_ERR_JRET(code);
    }

    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

_return:

  QW_UNLOCK(QW_READ, &mgmt->schLock);

  for (int32_t j = 0; j < i; ++j) {
S
shm  
Shengliang Guan 已提交
1390
    qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code);
1391 1392
    QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),
            (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));
D
dapan1121 已提交
1393
    tFreeSSchedulerHbRsp(&rspList[j].rsp);
D
dapan1121 已提交
1394 1395
  }

wafwerar's avatar
wafwerar 已提交
1396
  taosMemoryFreeClear(rspList);
D
dapan1121 已提交
1397

dengyihao's avatar
dengyihao 已提交
1398
  taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
1399 1400
}

S
Shengliang Guan 已提交
1401 1402
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
  if (NULL == qWorkerMgmt || pMsgCb->pWrapper == NULL) {
D
dapan1121 已提交
1403 1404 1405
    qError("invalid param to init qworker");
    QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
S
Shengliang 已提交
1406

dengyihao's avatar
dengyihao 已提交
1407
  int32_t       code = 0;
wafwerar's avatar
wafwerar 已提交
1408
  SQWorkerMgmt *mgmt = taosMemoryCalloc(1, sizeof(SQWorkerMgmt));
D
dapan1121 已提交
1409 1410
  if (NULL == mgmt) {
    qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt));
D
dapan1121 已提交
1411
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1412 1413 1414 1415
  }

  if (cfg) {
    mgmt->cfg = *cfg;
D
dapan1121 已提交
1416
    if (0 == mgmt->cfg.maxSchedulerNum) {
D
dapan1121 已提交
1417
      mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
D
dapan1121 已提交
1418 1419
    }
    if (0 == mgmt->cfg.maxTaskNum) {
D
dapan1121 已提交
1420
      mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
D
dapan1121 已提交
1421 1422
    }
    if (0 == mgmt->cfg.maxSchTaskNum) {
D
dapan1121 已提交
1423
      mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
1424
    }
D
dapan1121 已提交
1425
  } else {
D
dapan1121 已提交
1426 1427 1428
    mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
    mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
    mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
1429 1430
  }

dengyihao's avatar
dengyihao 已提交
1431 1432
  mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                               HASH_ENTRY_LOCK);
D
dapan1121 已提交
1433
  if (NULL == mgmt->schHash) {
wafwerar's avatar
wafwerar 已提交
1434
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1435
    qError("init %d scheduler hash failed", mgmt->cfg.maxSchedulerNum);
D
dapan1121 已提交
1436
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1437 1438
  }

dengyihao's avatar
dengyihao 已提交
1439 1440
  mgmt->ctxHash =
      taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1441
  if (NULL == mgmt->ctxHash) {
D
dapan1121 已提交
1442
    qError("init %d task ctx hash failed", mgmt->cfg.maxTaskNum);
D
dapan1121 已提交
1443 1444 1445 1446 1447 1448 1449 1450 1451
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  mgmt->timer = taosTmrInit(0, 0, 0, "qworker");
  if (NULL == mgmt->timer) {
    qError("init timer failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1452
  mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, mgmt, mgmt->timer);
D
dapan1121 已提交
1453 1454 1455
  if (NULL == mgmt->hbTimer) {
    qError("start hb timer failed");
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1456 1457
  }

D
dapan1121 已提交
1458 1459
  mgmt->nodeType = nodeType;
  mgmt->nodeId = nodeId;
S
Shengliang Guan 已提交
1460
  mgmt->msgCb = *pMsgCb;
D
dapan1121 已提交
1461

D
dapan1121 已提交
1462 1463
  *qWorkerMgmt = mgmt;

D
dapan1121 已提交
1464 1465
  qDebug("qworker initialized for node, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt);

D
dapan1121 已提交
1466
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1467 1468 1469 1470 1471 1472 1473

_return:

  taosHashCleanup(mgmt->schHash);
  taosHashCleanup(mgmt->ctxHash);

  taosTmrCleanUp(mgmt->timer);
dengyihao's avatar
dengyihao 已提交
1474

wafwerar's avatar
wafwerar 已提交
1475
  taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1476 1477

  QW_RET(code);
D
dapan1121 已提交
1478
}
D
dapan1121 已提交
1479 1480 1481 1482

void qWorkerDestroy(void **qWorkerMgmt) {
  if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
    return;
D
dapan1121 已提交
1483
  }
D
dapan 已提交
1484

D
dapan1121 已提交
1485
  SQWorkerMgmt *mgmt = *qWorkerMgmt;
D
dapan1121 已提交
1486 1487 1488

  taosTmrStopA(&mgmt->hbTimer);
  taosTmrCleanUp(mgmt->timer);
D
dapan1121 已提交
1489

dengyihao's avatar
dengyihao 已提交
1490 1491 1492
  // TODO STOP ALL QUERY

  // TODO FREE ALL
D
dapan1121 已提交
1493

D
dapan1121 已提交
1494 1495 1496
  taosHashCleanup(mgmt->ctxHash);
  taosHashCleanup(mgmt->schHash);

wafwerar's avatar
wafwerar 已提交
1497
  taosMemoryFreeClear(*qWorkerMgmt);
D
dapan1121 已提交
1498
}
D
dapan1121 已提交
1499

D
dapan1121 已提交
1500
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) {
dengyihao's avatar
dengyihao 已提交
1501 1502 1503
  /*
    SQWSchStatus *sch = NULL;
    int32_t taskNum = 0;
1504

dengyihao's avatar
dengyihao 已提交
1505
    QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch));
1506

dengyihao's avatar
dengyihao 已提交
1507
    sch->lastAccessTs = taosGetTimestampSec();
1508

dengyihao's avatar
dengyihao 已提交
1509
    QW_LOCK(QW_READ, &sch->tasksLock);
1510

dengyihao's avatar
dengyihao 已提交
1511
    taskNum = taosHashGetSize(sch->tasksHash);
D
dapan1121 已提交
1512

dengyihao's avatar
dengyihao 已提交
1513 1514 1515 1516 1517 1518
    int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum;
    *rsp = taosMemoryCalloc(1, size);
    if (NULL == *rsp) {
      QW_SCH_ELOG("calloc %d failed", size);
      QW_UNLOCK(QW_READ, &sch->tasksLock);
      qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
1519

dengyihao's avatar
dengyihao 已提交
1520 1521
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
D
dapan1121 已提交
1522

dengyihao's avatar
dengyihao 已提交
1523 1524 1525 1526 1527 1528 1529 1530
    void *key = NULL;
    size_t keyLen = 0;
    int32_t i = 0;

    void *pIter = taosHashIterate(sch->tasksHash, NULL);
    while (pIter) {
      SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
      taosHashGetKey(pIter, &key, &keyLen);
D
dapan1121 已提交
1531

dengyihao's avatar
dengyihao 已提交
1532 1533
      QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId);
      (*rsp)->status[i].status = taskStatus->status;
D
dapan1121 已提交
1534

dengyihao's avatar
dengyihao 已提交
1535 1536 1537
      ++i;
      pIter = taosHashIterate(sch->tasksHash, pIter);
    }
D
dapan1121 已提交
1538

dengyihao's avatar
dengyihao 已提交
1539 1540 1541 1542 1543 1544 1545
    QW_UNLOCK(QW_READ, &sch->tasksLock);
    qwReleaseScheduler(QW_READ, mgmt);

    (*rsp)->num = taskNum;
  */
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
1546

D
dapan1121 已提交
1547 1548
int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
  SQWSchStatus *sch = NULL;
D
dapan1121 已提交
1549

dengyihao's avatar
dengyihao 已提交
1550 1551
  /*
    QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
D
dapan1121 已提交
1552

dengyihao's avatar
dengyihao 已提交
1553
    sch->lastAccessTs = taosGetTimestampSec();
D
dapan1121 已提交
1554

dengyihao's avatar
dengyihao 已提交
1555 1556
    qwReleaseScheduler(QW_READ, mgmt);
  */
D
dapan1121 已提交
1557 1558 1559
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1560
int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t *taskStatus) {
dengyihao's avatar
dengyihao 已提交
1561
  SQWSchStatus * sch = NULL;
D
dapan1121 已提交
1562
  SQWTaskStatus *task = NULL;
dengyihao's avatar
dengyihao 已提交
1563
  int32_t        code = 0;
D
dapan1121 已提交
1564

dengyihao's avatar
dengyihao 已提交
1565 1566 1567 1568 1569
  /*
    if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch)) {
      *taskStatus = JOB_TASK_STATUS_NULL;
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
1570

dengyihao's avatar
dengyihao 已提交
1571 1572
    if (qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)) {
      qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
1573

dengyihao's avatar
dengyihao 已提交
1574 1575 1576
      *taskStatus = JOB_TASK_STATUS_NULL;
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
1577

dengyihao's avatar
dengyihao 已提交
1578 1579 1580 1581 1582
    *taskStatus = task->status;

    qwReleaseTask(QW_READ, sch);
    qwReleaseScheduler(QW_READ, mgmt);
  */
D
dapan1121 已提交
1583 1584 1585 1586

  QW_RET(code);
}

D
dapan1121 已提交
1587
int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
dengyihao's avatar
dengyihao 已提交
1588
  SQWSchStatus * sch = NULL;
D
dapan1121 已提交
1589
  SQWTaskStatus *task = NULL;
dengyihao's avatar
dengyihao 已提交
1590 1591 1592 1593
  int32_t        code = 0;

  /*
    QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch));
D
dapan1121 已提交
1594

dengyihao's avatar
dengyihao 已提交
1595
    QW_ERR_JRET(qwAcquireAddTask(mgmt, QW_READ, sch, qId, tId, JOB_TASK_STATUS_NOT_START, &task));
D
dapan1121 已提交
1596

D
dapan1121 已提交
1597

dengyihao's avatar
dengyihao 已提交
1598
    QW_LOCK(QW_WRITE, &task->lock);
D
dapan1121 已提交
1599

dengyihao's avatar
dengyihao 已提交
1600
    task->cancel = true;
D
dapan1121 已提交
1601

dengyihao's avatar
dengyihao 已提交
1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615
    int8_t oriStatus = task->status;
    int8_t newStatus = 0;

    if (task->status == JOB_TASK_STATUS_CANCELLED || task->status == JOB_TASK_STATUS_NOT_START || task->status ==
  JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING) { QW_UNLOCK(QW_WRITE, &task->lock);

      qwReleaseTask(QW_READ, sch);
      qwReleaseScheduler(QW_READ, mgmt);

      return TSDB_CODE_SUCCESS;
    } else if (task->status == JOB_TASK_STATUS_FAILED || task->status == JOB_TASK_STATUS_SUCCEED || task->status ==
  JOB_TASK_STATUS_PARTIAL_SUCCEED) { QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED)); } else {
      QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLING));
    }
D
dapan1121 已提交
1616

D
dapan1121 已提交
1617
    QW_UNLOCK(QW_WRITE, &task->lock);
D
dapan1121 已提交
1618

D
dapan1121 已提交
1619 1620
    qwReleaseTask(QW_READ, sch);
    qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
1621

dengyihao's avatar
dengyihao 已提交
1622 1623 1624
    if (oriStatus == JOB_TASK_STATUS_EXECUTING) {
      //TODO call executer to cancel subquery async
    }
D
dapan1121 已提交
1625

dengyihao's avatar
dengyihao 已提交
1626
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1627

dengyihao's avatar
dengyihao 已提交
1628
  _return:
D
dapan1121 已提交
1629

dengyihao's avatar
dengyihao 已提交
1630 1631
    if (task) {
      QW_UNLOCK(QW_WRITE, &task->lock);
D
dapan1121 已提交
1632

dengyihao's avatar
dengyihao 已提交
1633 1634 1635 1636 1637 1638 1639
      qwReleaseTask(QW_READ, sch);
    }

    if (sch) {
      qwReleaseScheduler(QW_READ, mgmt);
    }
  */
D
dapan1121 已提交
1640

D
dapan1121 已提交
1641
  QW_RET(code);
D
dapan1121 已提交
1642
}