qworker.c 48.3 KB
Newer Older
D
dapan1121 已提交
1
#include "qworker.h"
dengyihao's avatar
dengyihao 已提交
2
#include "dataSinkMgt.h"
3
#include "executor.h"
D
dapan1121 已提交
4
#include "planner.h"
H
Haojun Liao 已提交
5 6
#include "query.h"
#include "qworkerInt.h"
D
dapan1121 已提交
7
#include "qworkerMsg.h"
dengyihao's avatar
dengyihao 已提交
8
#include "tcommon.h"
H
Haojun Liao 已提交
9
#include "tmsg.h"
10
#include "tname.h"
D
dapan1121 已提交
11

D
dapan1121 已提交
12
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true};
D
dapan1121 已提交
13 14 15 16 17
SQWorkerMgmt gQwMgmt = {
  .lock  = 0,
  .qwRef = -1, 
  .qwNum = 0,
};
18

D
dapan1121 已提交
19 20 21
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
  if (!gQWDebug.statusEnable) {
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
22
  }
23

D
dapan1121 已提交
24
  int32_t code = 0;
D
dapan1121 已提交
25

D
dapan1121 已提交
26
  if (oriStatus == newStatus) {
D
dapan1121 已提交
27 28 29 30
    if (newStatus == JOB_TASK_STATUS_EXECUTING || newStatus == JOB_TASK_STATUS_FAILED) {
      *ignore = true;
      return TSDB_CODE_SUCCESS;
    }
H
Haojun Liao 已提交
31

D
dapan1121 已提交
32 33
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }
dengyihao's avatar
dengyihao 已提交
34

D
dapan1121 已提交
35 36
  switch (oriStatus) {
    case JOB_TASK_STATUS_NULL:
dengyihao's avatar
dengyihao 已提交
37 38
      if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED &&
          newStatus != JOB_TASK_STATUS_NOT_START) {
D
dapan1121 已提交
39 40
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
41

D
dapan1121 已提交
42 43
      break;
    case JOB_TASK_STATUS_NOT_START:
D
dapan1121 已提交
44
      if (newStatus != JOB_TASK_STATUS_CANCELLED) {
D
dapan1121 已提交
45 46
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
47

D
dapan1121 已提交
48 49
      break;
    case JOB_TASK_STATUS_EXECUTING:
dengyihao's avatar
dengyihao 已提交
50 51 52
      if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED && newStatus != JOB_TASK_STATUS_SUCCEED &&
          newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_CANCELLING &&
          newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
53 54
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
55

D
dapan1121 已提交
56 57
      break;
    case JOB_TASK_STATUS_PARTIAL_SUCCEED:
dengyihao's avatar
dengyihao 已提交
58 59 60
      if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_SUCCEED &&
          newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_FAILED &&
          newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
61 62
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
63

D
dapan1121 已提交
64 65
      break;
    case JOB_TASK_STATUS_SUCCEED:
dengyihao's avatar
dengyihao 已提交
66 67
      if (newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING &&
          newStatus != JOB_TASK_STATUS_FAILED) {
D
dapan1121 已提交
68 69 70 71
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
72
    case JOB_TASK_STATUS_FAILED:
D
dapan1121 已提交
73 74 75 76
      if (newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING) {
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      break;
H
Haojun Liao 已提交
77

D
dapan1121 已提交
78 79 80 81
    case JOB_TASK_STATUS_CANCELLING:
      if (newStatus != JOB_TASK_STATUS_CANCELLED) {
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
82

D
dapan1121 已提交
83 84
      break;
    case JOB_TASK_STATUS_CANCELLED:
D
dapan1121 已提交
85
    case JOB_TASK_STATUS_DROPPING:
D
dapan1121 已提交
86 87 88
      if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
D
dapan1121 已提交
89
      break;
dengyihao's avatar
dengyihao 已提交
90

D
dapan1121 已提交
91
    default:
D
dapan1121 已提交
92
      QW_TASK_ELOG("invalid task origStatus:%s", jobTaskStatusStr(oriStatus));
D
dapan1121 已提交
93 94 95
      return TSDB_CODE_QRY_APP_ERROR;
  }

D
dapan1121 已提交
96
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
97

D
dapan1121 已提交
98
_return:
D
dapan1121 已提交
99

D
dapan1121 已提交
100
  QW_TASK_ELOG("invalid task status update from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan1121 已提交
101
  QW_RET(code);
D
dapan1121 已提交
102 103
}

dengyihao's avatar
dengyihao 已提交
104
void qwDbgDumpSchInfo(SQWSchStatus *sch, int32_t i) {}
D
dapan1121 已提交
105

D
dapan1121 已提交
106
void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
D
dapan1121 已提交
107 108 109 110 111
  if (!gQWDebug.dumpEnable) {
    return;
  }

  QW_LOCK(QW_READ, &mgmt->schLock);
dengyihao's avatar
dengyihao 已提交
112

D
dapan1121 已提交
113 114
  QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash));

dengyihao's avatar
dengyihao 已提交
115 116 117
  void *        key = NULL;
  size_t        keyLen = 0;
  int32_t       i = 0;
D
dapan1121 已提交
118 119 120 121 122 123 124 125 126 127 128 129 130 131
  SQWSchStatus *sch = NULL;

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
    sch = (SQWSchStatus *)pIter;
    qwDbgDumpSchInfo(sch, i);
    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

  QW_UNLOCK(QW_READ, &mgmt->schLock);

  QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash));
}
D
dapan1121 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168

char *qwPhaseStr(int32_t phase) {
  switch (phase) {
    case QW_PHASE_PRE_QUERY:
      return "PRE_QUERY";
    case QW_PHASE_POST_QUERY:
      return "POST_QUERY";
    case QW_PHASE_PRE_FETCH:
      return "PRE_FETCH";
    case QW_PHASE_POST_FETCH:
      return "POST_FETCH";
    case QW_PHASE_PRE_CQUERY:
      return "PRE_CQUERY";
    case QW_PHASE_POST_CQUERY:
      return "POST_CQUERY";
    default:
      break;
  }

  return "UNKNOWN";
}

char *qwBufStatusStr(int32_t bufStatus) {
  switch (bufStatus) {
    case DS_BUF_LOW:
      return "LOW";
    case DS_BUF_FULL:
      return "FULL";
    case DS_BUF_EMPTY:
      return "EMPTY";
    default:
      break;
  }

  return "UNKNOWN";
}

D
dapan1121 已提交
169
int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) {
D
dapan1121 已提交
170
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
171 172
  int8_t  origStatus = 0;
  bool    ignore = false;
D
dapan1121 已提交
173 174 175

  while (true) {
    origStatus = atomic_load_8(&task->status);
dengyihao's avatar
dengyihao 已提交
176

D
dapan1121 已提交
177 178 179 180
    QW_ERR_RET(qwDbgValidateStatus(QW_FPARAMS(), origStatus, status, &ignore));
    if (ignore) {
      break;
    }
dengyihao's avatar
dengyihao 已提交
181

D
dapan1121 已提交
182 183
    if (origStatus != atomic_val_compare_exchange_8(&task->status, origStatus, status)) {
      continue;
D
dapan1121 已提交
184
    }
dengyihao's avatar
dengyihao 已提交
185

D
dapan1121 已提交
186
    QW_TASK_DLOG("task status updated from %s to %s", jobTaskStatusStr(origStatus), jobTaskStatusStr(status));
D
dapan1121 已提交
187 188

    break;
D
dapan1121 已提交
189
  }
dengyihao's avatar
dengyihao 已提交
190

D
dapan1121 已提交
191 192 193
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
194
int32_t qwAddSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType) {
D
dapan1121 已提交
195
  SQWSchStatus newSch = {0};
dengyihao's avatar
dengyihao 已提交
196 197
  newSch.tasksHash =
      taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
D
dapan1121 已提交
198
  if (NULL == newSch.tasksHash) {
D
dapan1121 已提交
199 200
    QW_SCH_ELOG("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
201 202
  }

D
dapan1121 已提交
203 204 205 206 207
  QW_LOCK(QW_WRITE, &mgmt->schLock);
  int32_t code = taosHashPut(mgmt->schHash, &sId, sizeof(sId), &newSch, sizeof(newSch));
  if (0 != code) {
    if (!HASH_NODE_EXIST(code)) {
      QW_UNLOCK(QW_WRITE, &mgmt->schLock);
dengyihao's avatar
dengyihao 已提交
208

D
dapan1121 已提交
209 210 211
      QW_SCH_ELOG("taosHashPut new sch to scheduleHash failed, errno:%d", errno);
      taosHashCleanup(newSch.tasksHash);
      QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
212
    }
D
dapan1121 已提交
213 214

    taosHashCleanup(newSch.tasksHash);
D
dapan1121 已提交
215
  }
D
dapan1121 已提交
216
  QW_UNLOCK(QW_WRITE, &mgmt->schLock);
D
dapan1121 已提交
217

dengyihao's avatar
dengyihao 已提交
218
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
219 220
}

D
dapan1121 已提交
221
int32_t qwAcquireSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
D
dapan1121 已提交
222 223 224 225 226
  while (true) {
    QW_LOCK(rwType, &mgmt->schLock);
    *sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
    if (NULL == (*sch)) {
      QW_UNLOCK(rwType, &mgmt->schLock);
dengyihao's avatar
dengyihao 已提交
227

D
dapan1121 已提交
228
      if (QW_NOT_EXIST_ADD == nOpt) {
D
dapan1121 已提交
229
        QW_ERR_RET(qwAddSchedulerImpl(mgmt, sId, rwType));
D
dapan1121 已提交
230 231

        nOpt = QW_NOT_EXIST_RET_ERR;
dengyihao's avatar
dengyihao 已提交
232

D
dapan1121 已提交
233 234 235 236
        continue;
      } else if (QW_NOT_EXIST_RET_ERR == nOpt) {
        QW_RET(TSDB_CODE_QRY_SCH_NOT_EXIST);
      } else {
D
dapan1121 已提交
237
        QW_SCH_ELOG("unknown notExistOpt:%d", nOpt);
D
dapan1121 已提交
238
        QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
239
      }
D
dapan1121 已提交
240
    }
D
dapan1121 已提交
241 242

    break;
D
dapan1121 已提交
243 244 245 246 247
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
248
int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
D
dapan1121 已提交
249
  return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_ADD);
D
dapan1121 已提交
250 251
}

D
dapan1121 已提交
252
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
D
dapan1121 已提交
253
  return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_RET_ERR);
D
dapan1121 已提交
254 255
}

D
dapan1121 已提交
256
void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); }
D
dapan1121 已提交
257

D
dapan1121 已提交
258
int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) {
D
dapan1121 已提交
259 260 261 262 263 264 265 266 267 268 269 270 271
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);

  QW_LOCK(rwType, &sch->tasksLock);
  *task = taosHashGet(sch->tasksHash, id, sizeof(id));
  if (NULL == (*task)) {
    QW_UNLOCK(rwType, &sch->tasksLock);
    QW_ERR_RET(TSDB_CODE_QRY_TASK_NOT_EXIST);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
272
int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) {
D
dapan1121 已提交
273 274
  int32_t code = 0;

D
dapan1121 已提交
275 276
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
D
dapan1121 已提交
277

D
dapan1121 已提交
278 279
  SQWTaskStatus ntask = {0};
  ntask.status = status;
D
dapan1121 已提交
280
  ntask.refId = rId;
D
dapan1121 已提交
281 282 283 284 285 286

  QW_LOCK(QW_WRITE, &sch->tasksLock);
  code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask));
  if (0 != code) {
    QW_UNLOCK(QW_WRITE, &sch->tasksLock);
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
287 288
      if (rwType && task) {
        QW_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
D
dapan1121 已提交
289
      } else {
D
dapan1121 已提交
290
        QW_TASK_ELOG("task status already exist, newStatus:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
291
        QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
D
dapan1121 已提交
292 293
      }
    } else {
D
dapan1121 已提交
294
      QW_TASK_ELOG("taosHashPut to tasksHash failed, error:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
295
      QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
296 297 298
    }
  }
  QW_UNLOCK(QW_WRITE, &sch->tasksLock);
D
dapan1121 已提交
299

D
dapan1121 已提交
300 301
  QW_TASK_DLOG("task status added, newStatus:%s", jobTaskStatusStr(status));

D
dapan1121 已提交
302 303
  if (rwType && task) {
    QW_ERR_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
D
dapan1121 已提交
304 305
  }

D
dapan1121 已提交
306 307
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
308

D
dapan1121 已提交
309
int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status) {
D
dapan1121 已提交
310
  SQWSchStatus *tsch = NULL;
dengyihao's avatar
dengyihao 已提交
311
  int32_t       code = 0;
D
dapan1121 已提交
312
  QW_ERR_RET(qwAcquireAddScheduler(mgmt, sId, QW_READ, &tsch));
D
dapan1121 已提交
313

D
dapan1121 已提交
314
  QW_ERR_JRET(qwAddTaskStatusImpl(QW_FPARAMS(), tsch, 0, status, NULL));
D
dapan1121 已提交
315 316 317 318

_return:

  qwReleaseScheduler(QW_READ, mgmt);
dengyihao's avatar
dengyihao 已提交
319

D
dapan1121 已提交
320
  QW_RET(code);
D
dapan1121 已提交
321 322
}

dengyihao's avatar
dengyihao 已提交
323 324
int32_t qwAddAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, int32_t status,
                               SQWTaskStatus **task) {
D
dapan1121 已提交
325
  return qwAddTaskStatusImpl(QW_FPARAMS(), sch, rwType, status, task);
D
dapan1121 已提交
326
}
D
dapan1121 已提交
327

dengyihao's avatar
dengyihao 已提交
328
void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) { QW_UNLOCK(rwType, &sch->tasksLock); }
D
dapan1121 已提交
329

D
dapan1121 已提交
330
int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
D
dapan1121 已提交
331 332
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
H
Haojun Liao 已提交
333

D
dapan1121 已提交
334
  *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
D
dapan1121 已提交
335
  if (NULL == (*ctx)) {
D
dapan1121 已提交
336
    QW_TASK_DLOG_E("task ctx not exist, may be dropped");
D
dapan1121 已提交
337
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
338 339 340 341 342
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
343
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
D
dapan1121 已提交
344 345
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
dengyihao's avatar
dengyihao 已提交
346

D
dapan1121 已提交
347 348
  *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
  if (NULL == (*ctx)) {
D
dapan1121 已提交
349
    QW_TASK_DLOG_E("task ctx not exist, may be dropped");
D
dapan1121 已提交
350
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
351 352 353 354 355
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
356
int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) {
D
dapan1121 已提交
357 358 359
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);

D
dapan1121 已提交
360
  SQWTaskCtx nctx = {0};
D
dapan1121 已提交
361

D
dapan1121 已提交
362
  int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &nctx, sizeof(SQWTaskCtx));
D
dapan1121 已提交
363 364
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
365
      if (acquire && ctx) {
D
dapan1121 已提交
366
        QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
367 368
      } else if (ctx) {
        QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
369
      } else {
D
dapan1121 已提交
370
        QW_TASK_ELOG_E("task ctx already exist");
D
dapan1121 已提交
371 372 373
        QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
      }
    } else {
D
dapan1121 已提交
374
      QW_TASK_ELOG("taosHashPut to ctxHash failed, error:%x", code);
D
dapan1121 已提交
375 376 377 378
      QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

D
dapan1121 已提交
379
  if (acquire && ctx) {
D
dapan1121 已提交
380
    QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
381 382
  } else if (ctx) {
    QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
383 384 385 386 387
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
388
int32_t qwAddTaskCtx(QW_FPARAMS_DEF) { QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, NULL)); }
D
dapan1121 已提交
389

dengyihao's avatar
dengyihao 已提交
390
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTaskCtxImpl(QW_FPARAMS(), true, ctx); }
D
dapan1121 已提交
391

D
dapan1121 已提交
392
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); }
D
dapan1121 已提交
393

dengyihao's avatar
dengyihao 已提交
394
void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) {
D
dapan1121 已提交
395
  // Note: free/kill may in RC
D
dapan1121 已提交
396 397 398
  qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
  if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
    qDestroyTask(otaskHandle);
D
dapan1121 已提交
399 400
  }
}
D
dapan1121 已提交
401

D
dapan1121 已提交
402 403
int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
  int32_t code = 0;
D
dapan1121 已提交
404
  // Note: free/kill may in RC
D
dapan1121 已提交
405 406
  qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
  if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
D
dapan1121 已提交
407
    code = qAsyncKillTask(taskHandle);
D
dapan1121 已提交
408
    atomic_store_ptr(&ctx->taskHandle, taskHandle);
D
dapan1121 已提交
409
  }
D
dapan1121 已提交
410

D
dapan1121 已提交
411 412 413 414
  QW_RET(code);
}

void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
D
dapan1121 已提交
415 416
  tmsgReleaseHandle(ctx->ctrlConnInfo.handle, TAOS_CONN_SERVER);
  ctx->ctrlConnInfo.handle = NULL;
dengyihao's avatar
dengyihao 已提交
417
  ctx->ctrlConnInfo.refId = -1;
D
dapan1121 已提交
418 419

  // NO need to release dataConnInfo
D
dapan1121 已提交
420

D
dapan1121 已提交
421
  qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle);
dengyihao's avatar
dengyihao 已提交
422

D
dapan1121 已提交
423 424 425
  if (ctx->sinkHandle) {
    dsDestroyDataSinker(ctx->sinkHandle);
    ctx->sinkHandle = NULL;
D
dapan1121 已提交
426
  }
D
dapan1121 已提交
427
}
D
dapan1121 已提交
428

D
dapan1121 已提交
429
int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
D
dapan1121 已提交
430 431 432
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
  SQWTaskCtx octx;
D
dapan1121 已提交
433

D
dapan1121 已提交
434 435
  SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
  if (NULL == ctx) {
D
dapan1121 已提交
436
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
437
  }
D
dapan1121 已提交
438

D
dapan1121 已提交
439
  octx = *ctx;
D
dapan1121 已提交
440

D
dapan1121 已提交
441 442 443
  atomic_store_ptr(&ctx->taskHandle, NULL);
  atomic_store_ptr(&ctx->sinkHandle, NULL);

D
dapan1121 已提交
444 445
  QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP);

D
dapan1121 已提交
446
  if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
dengyihao's avatar
dengyihao 已提交
447
    QW_TASK_ELOG_E("taosHashRemove from ctx hash failed");
D
dapan1121 已提交
448
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
449 450
  }

D
dapan1121 已提交
451
  qwFreeTask(QW_FPARAMS(), &octx);
D
dapan1121 已提交
452 453

  QW_TASK_DLOG_E("task ctx dropped");
dengyihao's avatar
dengyihao 已提交
454

D
dapan1121 已提交
455 456 457
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
458
int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
dengyihao's avatar
dengyihao 已提交
459
  SQWSchStatus * sch = NULL;
D
dapan1121 已提交
460
  SQWTaskStatus *task = NULL;
dengyihao's avatar
dengyihao 已提交
461 462
  int32_t        code = 0;

D
dapan1121 已提交
463 464
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
D
dapan1121 已提交
465

D
dapan1121 已提交
466
  if (qwAcquireScheduler(mgmt, sId, QW_WRITE, &sch)) {
D
dapan1121 已提交
467
    QW_TASK_WLOG_E("scheduler does not exist");
D
dapan1121 已提交
468 469
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
470

D
dapan1121 已提交
471 472
  if (qwAcquireTaskStatus(QW_FPARAMS(), QW_WRITE, sch, &task)) {
    qwReleaseScheduler(QW_WRITE, mgmt);
dengyihao's avatar
dengyihao 已提交
473

D
dapan1121 已提交
474
    QW_TASK_WLOG_E("task does not exist");
D
dapan1121 已提交
475 476
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
477

D
dapan1121 已提交
478
  if (taosHashRemove(sch->tasksHash, id, sizeof(id))) {
D
dapan1121 已提交
479
    QW_TASK_ELOG_E("taosHashRemove task from hash failed");
D
dapan1121 已提交
480 481 482
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

D
dapan1121 已提交
483
  QW_TASK_DLOG_E("task status dropped");
D
dapan1121 已提交
484 485 486

_return:

D
dapan1121 已提交
487 488 489
  if (task) {
    qwReleaseTaskStatus(QW_WRITE, sch);
  }
D
dapan1121 已提交
490
  qwReleaseScheduler(QW_WRITE, mgmt);
dengyihao's avatar
dengyihao 已提交
491

D
dapan1121 已提交
492 493 494
  QW_RET(code);
}

D
dapan1121 已提交
495
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) {
dengyihao's avatar
dengyihao 已提交
496
  SQWSchStatus * sch = NULL;
D
dapan1121 已提交
497
  SQWTaskStatus *task = NULL;
dengyihao's avatar
dengyihao 已提交
498
  int32_t        code = 0;
D
dapan1121 已提交
499

D
dapan1121 已提交
500
  QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch));
D
dapan1121 已提交
501
  QW_ERR_JRET(qwAcquireTaskStatus(QW_FPARAMS(), QW_READ, sch, &task));
D
dapan1121 已提交
502

D
dapan1121 已提交
503
  QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status));
dengyihao's avatar
dengyihao 已提交
504

D
dapan1121 已提交
505 506
_return:

D
dapan1121 已提交
507 508 509
  if (task) {
    qwReleaseTaskStatus(QW_READ, sch);
  }
D
dapan1121 已提交
510
  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
511 512 513 514

  QW_RET(code);
}

D
dapan1121 已提交
515
int32_t qwDropTask(QW_FPARAMS_DEF) {
H
Haojun Liao 已提交
516
  QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS()));
D
dapan1121 已提交
517
  QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS()));
H
Haojun Liao 已提交
518

D
dapan1121 已提交
519 520
  QW_TASK_DLOG_E("task is dropped");

D
dapan1121 已提交
521 522 523
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
524
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
dengyihao's avatar
dengyihao 已提交
525
  qTaskInfo_t *taskHandle = &ctx->taskHandle;
D
dapan1121 已提交
526 527 528 529

  if (TASK_TYPE_TEMP == ctx->taskType) {
    if (ctx->explain) {
      SExplainExecInfo *execInfo = NULL;
dengyihao's avatar
dengyihao 已提交
530
      int32_t           resNum = 0;
D
dapan1121 已提交
531 532
      QW_ERR_RET(qGetExplainExecInfo(ctx->taskHandle, &resNum, &execInfo));

D
dapan1121 已提交
533 534
      SQWConnInfo connInfo = {0};
      connInfo.handle = ctx->ctrlConnInfo.handle;
dengyihao's avatar
dengyihao 已提交
535 536
      connInfo.refId = ctx->ctrlConnInfo.refId;

D
dapan1121 已提交
537
      QW_ERR_RET(qwBuildAndSendExplainRsp(&connInfo, execInfo, resNum));
D
dapan1121 已提交
538
    }
dengyihao's avatar
dengyihao 已提交
539

D
dapan1121 已提交
540 541 542 543 544 545
    qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
546
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
dengyihao's avatar
dengyihao 已提交
547 548 549 550 551 552 553
  int32_t        code = 0;
  bool           qcontinue = true;
  SSDataBlock *  pRes = NULL;
  uint64_t       useconds = 0;
  int32_t        i = 0;
  int32_t        execNum = 0;
  qTaskInfo_t *  taskHandle = &ctx->taskHandle;
D
dapan1121 已提交
554
  DataSinkHandle sinkHandle = ctx->sinkHandle;
dengyihao's avatar
dengyihao 已提交
555

D
dapan1121 已提交
556
  while (true) {
H
Haojun Liao 已提交
557
    QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
D
dapan1121 已提交
558

D
dapan1121 已提交
559
    code = qExecTask(*taskHandle, &pRes, &useconds);
D
dapan1121 已提交
560
    if (code) {
D
dapan1121 已提交
561 562
      QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
      QW_ERR_RET(code);
D
dapan1121 已提交
563 564
    }

D
dapan1121 已提交
565 566
    ++execNum;

D
dapan1121 已提交
567
    if (NULL == pRes) {
dengyihao's avatar
dengyihao 已提交
568
      QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
H
Haojun Liao 已提交
569

D
dapan1121 已提交
570
      dsEndPut(sinkHandle, useconds);
D
dapan1121 已提交
571 572

      QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
D
dapan1121 已提交
573 574 575 576

      if (queryEnd) {
        *queryEnd = true;
      }
dengyihao's avatar
dengyihao 已提交
577

D
dapan1121 已提交
578 579 580
      break;
    }

D
dapan1121 已提交
581
    int32_t rows = pRes->info.rows;
H
Haojun Liao 已提交
582

583 584
    ASSERT(pRes->info.rows > 0);

H
Haojun Liao 已提交
585
    SInputData inputData = {.pData = pRes};
D
dapan1121 已提交
586 587
    code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
    if (code) {
D
dapan1121 已提交
588 589
      QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code));
      QW_ERR_RET(code);
D
dapan1121 已提交
590
    }
D
dapan1121 已提交
591

D
dapan1121 已提交
592
    QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", rows, qcontinue);
dengyihao's avatar
dengyihao 已提交
593

D
dapan1121 已提交
594 595 596 597
    if (!qcontinue) {
      break;
    }

D
dapan1121 已提交
598 599 600 601 602
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) {
      break;
    }

    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
603 604
      break;
    }
D
dapan1121 已提交
605

D
dapan1121 已提交
606 607 608
    if (atomic_load_32(&ctx->rspCode)) {
      break;
    }
D
dapan1121 已提交
609 610 611 612
  }

  QW_RET(code);
}
D
dapan1121 已提交
613

D
dapan1121 已提交
614
int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
D
dapan1121 已提交
615 616
  int32_t taskNum = 0;

D
dapan1121 已提交
617
  hbInfo->connInfo = sch->hbConnInfo;
D
dapan1121 已提交
618
  hbInfo->rsp.epId = sch->hbEpId;
D
dapan1121 已提交
619

D
dapan1121 已提交
620
  QW_LOCK(QW_READ, &sch->tasksLock);
dengyihao's avatar
dengyihao 已提交
621

D
dapan1121 已提交
622
  taskNum = taosHashGetSize(sch->tasksHash);
D
dapan1121 已提交
623 624 625

  hbInfo->rsp.taskStatus = taosArrayInit(taskNum, sizeof(STaskStatus));
  if (NULL == hbInfo->rsp.taskStatus) {
D
dapan1121 已提交
626
    QW_UNLOCK(QW_READ, &sch->tasksLock);
D
dapan1121 已提交
627
    QW_ELOG("taosArrayInit taskStatus failed, num:%d", taskNum);
D
dapan1121 已提交
628 629 630
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
631 632 633
  void *      key = NULL;
  size_t      keyLen = 0;
  int32_t     i = 0;
D
dapan1121 已提交
634
  STaskStatus status = {0};
D
dapan1121 已提交
635 636 637 638

  void *pIter = taosHashIterate(sch->tasksHash, NULL);
  while (pIter) {
    SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
D
dapan1121 已提交
639
    key = taosHashGetKey(pIter, &keyLen);
D
dapan1121 已提交
640

dengyihao's avatar
dengyihao 已提交
641
    // TODO GET EXECUTOR API TO GET MORE INFO
D
dapan1121 已提交
642

D
dapan1121 已提交
643 644 645
    QW_GET_QTID(key, status.queryId, status.taskId);
    status.status = taskStatus->status;
    status.refId = taskStatus->refId;
dengyihao's avatar
dengyihao 已提交
646

D
dapan1121 已提交
647
    taosArrayPush(hbInfo->rsp.taskStatus, &status);
dengyihao's avatar
dengyihao 已提交
648

D
dapan1121 已提交
649 650
    ++i;
    pIter = taosHashIterate(sch->tasksHash, pIter);
dengyihao's avatar
dengyihao 已提交
651
  }
D
dapan1121 已提交
652 653 654 655 656 657

  QW_UNLOCK(QW_READ, &sch->tasksLock);

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
658
int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
dengyihao's avatar
dengyihao 已提交
659
  int32_t            len = 0;
D
dapan1121 已提交
660
  SRetrieveTableRsp *rsp = NULL;
dengyihao's avatar
dengyihao 已提交
661 662
  bool               queryEnd = false;
  int32_t            code = 0;
D
dapan1121 已提交
663

D
dapan1121 已提交
664
  dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
D
dapan1121 已提交
665

D
dapan1121 已提交
666 667 668 669
  if (len < 0) {
    QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
D
dapan1121 已提交
670

D
dapan1121 已提交
671 672
  if (len == 0) {
    if (queryEnd) {
D
dapan 已提交
673
      code = dsGetDataBlock(ctx->sinkHandle, pOutput);
D
dapan1121 已提交
674
      if (code) {
D
dapan1121 已提交
675
        QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
676 677
        QW_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
678

D
dapan1121 已提交
679
      QW_TASK_DLOG_E("no data in sink and query end");
H
Haojun Liao 已提交
680

D
dapan1121 已提交
681
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
L
Liu Jicong 已提交
682
      QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
683

D
dapan1121 已提交
684
      *rspMsg = rsp;
D
dapan 已提交
685
      *dataLen = 0;
D
dapan1121 已提交
686 687
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
688 689

    pOutput->bufStatus = DS_BUF_EMPTY;
D
dapan1121 已提交
690

D
dapan1121 已提交
691
    return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
692
  }
D
dapan1121 已提交
693

D
dapan1121 已提交
694
  // Got data from sink
D
dapan1121 已提交
695
  QW_TASK_DLOG("there are data in sink, dataLength:%d", len);
D
dapan1121 已提交
696

D
dapan 已提交
697
  *dataLen = len;
dengyihao's avatar
dengyihao 已提交
698

D
dapan1121 已提交
699
  QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
D
dapan 已提交
700
  *rspMsg = rsp;
dengyihao's avatar
dengyihao 已提交
701

D
dapan 已提交
702 703
  pOutput->pData = rsp->data;
  code = dsGetDataBlock(ctx->sinkHandle, pOutput);
D
dapan1121 已提交
704
  if (code) {
D
dapan1121 已提交
705
    QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
706 707
    QW_ERR_RET(code);
  }
D
dapan1121 已提交
708

D
dapan1121 已提交
709
  if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
D
dapan1121 已提交
710
    QW_TASK_DLOG_E("task all data fetched, done");
D
dapan1121 已提交
711
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
712 713
  }

D
dapan1121 已提交
714
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
715 716
}

D
dapan1121 已提交
717
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
dengyihao's avatar
dengyihao 已提交
718 719
  int32_t      code = 0;
  SQWTaskCtx * ctx = NULL;
D
dapan1121 已提交
720 721
  SQWConnInfo *dropConnection = NULL;
  SQWConnInfo *cancelConnection = NULL;
D
dapan1121 已提交
722

D
dapan1121 已提交
723
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
724 725 726 727 728 729

  if (QW_PHASE_PRE_QUERY == phase) {
    QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
  } else {
    QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
  }
dengyihao's avatar
dengyihao 已提交
730

D
dapan1121 已提交
731
  QW_LOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
732

D
dapan1121 已提交
733
  if (QW_PHASE_PRE_FETCH == phase) {
dengyihao's avatar
dengyihao 已提交
734
    atomic_store_8((int8_t *)&ctx->queryFetched, true);
D
dapan1121 已提交
735
  } else {
D
dapan1121 已提交
736 737
    atomic_store_8(&ctx->phase, phase);
  }
D
dapan1121 已提交
738

dengyihao's avatar
dengyihao 已提交
739
  if (atomic_load_8((int8_t *)&ctx->queryEnd)) {
D
dapan1121 已提交
740 741 742
    QW_TASK_ELOG_E("query already end");
    QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
  }
D
dapan1121 已提交
743

D
dapan1121 已提交
744 745
  switch (phase) {
    case QW_PHASE_PRE_QUERY: {
D
dapan1121 已提交
746
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
H
Haojun Liao 已提交
747
        QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
748
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
D
dapan1121 已提交
749 750
        break;
      }
D
dapan1121 已提交
751

D
dapan1121 已提交
752
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
753
        dropConnection = &ctx->ctrlConnInfo;
D
dapan1121 已提交
754
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
755
        dropConnection = NULL;
756

D
dapan1121 已提交
757 758
        qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
759

D
dapan1121 已提交
760
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan 已提交
761
        break;
D
dapan1121 已提交
762
      }
D
dapan1121 已提交
763

D
dapan1121 已提交
764
      QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
D
dapan1121 已提交
765 766
      break;
    }
D
dapan1121 已提交
767
    case QW_PHASE_PRE_FETCH: {
D
dapan1121 已提交
768 769
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
        QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
770 771
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
      }
D
dapan1121 已提交
772

D
dapan1121 已提交
773
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
774
        QW_TASK_WLOG("last fetch still not processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
775 776 777 778
        QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
      }

      if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) {
D
dapan1121 已提交
779
        QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
780
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
D
dapan1121 已提交
781 782
      }
      break;
dengyihao's avatar
dengyihao 已提交
783
    }
D
dapan1121 已提交
784 785
    case QW_PHASE_PRE_CQUERY: {
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
786
        QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
787
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
788
      }
D
dapan1121 已提交
789

D
dapan1121 已提交
790
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
791
        dropConnection = &ctx->ctrlConnInfo;
D
dapan1121 已提交
792
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
793
        dropConnection = NULL;
H
Haojun Liao 已提交
794

D
dapan1121 已提交
795 796
        qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
797

D
dapan1121 已提交
798
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
799
      }
D
dapan1121 已提交
800

D
dapan1121 已提交
801
      break;
D
dapan1121 已提交
802 803 804 805 806 807 808
    }
    default:
      QW_TASK_ELOG("invalid phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

  if (ctx->rspCode) {
dengyihao's avatar
dengyihao 已提交
809 810
    QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode,
                 tstrerror(ctx->rspCode));
D
dapan1121 已提交
811
    QW_ERR_JRET(ctx->rspCode);
D
dapan1121 已提交
812
  }
D
dapan1121 已提交
813

D
dapan1121 已提交
814
_return:
D
dapan1121 已提交
815

D
dapan1121 已提交
816
  if (ctx) {
D
dapan1121 已提交
817
    QW_UPDATE_RSP_CODE(ctx, code);
dengyihao's avatar
dengyihao 已提交
818

D
dapan1121 已提交
819
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
820 821
    qwReleaseTaskCtx(mgmt, ctx);
  }
D
dapan1121 已提交
822

D
dapan1121 已提交
823
  if (dropConnection) {
S
shm  
Shengliang Guan 已提交
824
    qwBuildAndSendDropRsp(dropConnection, code);
D
dapan1121 已提交
825
    QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", dropConnection->handle, code, tstrerror(code));
D
dapan1121 已提交
826
  }
D
dapan1121 已提交
827

D
dapan1121 已提交
828
  if (cancelConnection) {
S
shm  
Shengliang Guan 已提交
829
    qwBuildAndSendCancelRsp(cancelConnection, code);
D
dapan1121 已提交
830
    QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code));
D
dapan1121 已提交
831 832
  }

D
dapan1121 已提交
833
  QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
D
dapan1121 已提交
834 835 836 837 838

  QW_RET(code);
}

int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
dengyihao's avatar
dengyihao 已提交
839 840 841
  int32_t      code = 0;
  SQWTaskCtx * ctx = NULL;
  SQWConnInfo  connInfo = {0};
D
dapan1121 已提交
842
  SQWConnInfo *readyConnection = NULL;
D
dapan1121 已提交
843

D
dapan1121 已提交
844
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
dengyihao's avatar
dengyihao 已提交
845

D
dapan1121 已提交
846
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
847

D
dapan1121 已提交
848 849 850
  QW_LOCK(QW_WRITE, &ctx->lock);

  if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
851
    QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
852 853 854 855
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
  }

  if (QW_PHASE_POST_QUERY == phase) {
D
dapan1121 已提交
856
#if 0    
D
dapan1121 已提交
857
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
D
dapan1121 已提交
858
      readyConnection = &ctx->connInfo;
D
dapan1121 已提交
859 860
      QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
    }
D
dapan1121 已提交
861
#else
D
dapan1121 已提交
862
    connInfo.handle = ctx->ctrlConnInfo.handle;
dengyihao's avatar
dengyihao 已提交
863
    connInfo.refId = ctx->ctrlConnInfo.refId;
D
dapan1121 已提交
864
    readyConnection = &connInfo;
dengyihao's avatar
dengyihao 已提交
865

D
dapan1121 已提交
866 867
    QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
#endif
D
dapan1121 已提交
868 869
  }

D
dapan1121 已提交
870
  if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
871 872 873 874
    if (QW_PHASE_POST_FETCH == phase) {
      QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
D
dapan1121 已提交
875

D
dapan1121 已提交
876 877
    qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
    QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
878

D
dapan1121 已提交
879 880
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
881 882 883
  }

  if (ctx->rspCode) {
dengyihao's avatar
dengyihao 已提交
884 885
    QW_TASK_ELOG("task already failed, phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode,
                 tstrerror(ctx->rspCode));
D
dapan1121 已提交
886
    QW_ERR_JRET(ctx->rspCode);
dengyihao's avatar
dengyihao 已提交
887
  }
D
dapan1121 已提交
888

D
dapan1121 已提交
889
  QW_ERR_JRET(input->code);
D
dapan1121 已提交
890 891 892

_return:

D
dapan1121 已提交
893 894 895 896
  if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED);
  }

D
dapan1121 已提交
897
  if (ctx) {
D
dapan1121 已提交
898
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
899

D
dapan1121 已提交
900 901 902
    if (QW_PHASE_POST_FETCH != phase) {
      atomic_store_8(&ctx->phase, phase);
    }
dengyihao's avatar
dengyihao 已提交
903

D
dapan1121 已提交
904
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
905
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
906 907
  }

D
dapan1121 已提交
908
  if (readyConnection) {
dengyihao's avatar
dengyihao 已提交
909
    qwBuildAndSendReadyRsp(readyConnection, code);
D
dapan1121 已提交
910
    QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code));
D
dapan1121 已提交
911 912
  }

D
dapan1121 已提交
913 914
  if (code) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
D
dapan1121 已提交
915 916
  }

D
dapan1121 已提交
917
  QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
D
dapan1121 已提交
918

D
dapan1121 已提交
919 920 921
  QW_RET(code);
}

D
dapan1121 已提交
922
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) {
dengyihao's avatar
dengyihao 已提交
923 924
  int32_t          code = 0;
  bool             queryRsped = false;
D
dapan1121 已提交
925
  struct SSubplan *plan = NULL;
dengyihao's avatar
dengyihao 已提交
926 927 928 929
  SQWPhaseInput    input = {0};
  qTaskInfo_t      pTaskInfo = NULL;
  DataSinkHandle   sinkHandle = NULL;
  SQWTaskCtx *     ctx = NULL;
D
dapan1121 已提交
930

D
dapan1121 已提交
931
  QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
D
dapan1121 已提交
932

D
dapan1121 已提交
933
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
D
dapan1121 已提交
934 935

  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
936

D
dapan1121 已提交
937
  atomic_store_8(&ctx->taskType, taskType);
D
dapan1121 已提交
938
  atomic_store_8(&ctx->explain, explain);
X
Xiaoyu Wang 已提交
939

D
dapan1121 已提交
940 941
  atomic_store_ptr(&ctx->ctrlConnInfo.handle, qwMsg->connInfo.handle);
  atomic_store_ptr(&ctx->ctrlConnInfo.ahandle, qwMsg->connInfo.ahandle);
dengyihao's avatar
dengyihao 已提交
942
  atomic_store_64(&ctx->ctrlConnInfo.refId, qwMsg->connInfo.refId);
D
dapan1121 已提交
943 944

  QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
X
Xiaoyu Wang 已提交
945

D
dapan1121 已提交
946 947
  code = qStringToSubplan(qwMsg->msg, &plan);
  if (TSDB_CODE_SUCCESS != code) {
948 949
    code = TSDB_CODE_INVALID_MSG;
    QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
950
    QW_ERR_JRET(code);
D
dapan1121 已提交
951
  }
dengyihao's avatar
dengyihao 已提交
952

953
  code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
954
  if (code) {
D
dapan1121 已提交
955
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
956
    QW_ERR_JRET(code);
D
dapan1121 已提交
957
  }
D
dapan1121 已提交
958

H
Haojun Liao 已提交
959
  if (NULL == sinkHandle || NULL == pTaskInfo) {
D
dapan1121 已提交
960 961 962 963
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

dengyihao's avatar
dengyihao 已提交
964 965
  // QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
  // QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
966

dengyihao's avatar
dengyihao 已提交
967
  // queryRsped = true;
D
dapan1121 已提交
968

D
dapan1121 已提交
969 970 971
  atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
  atomic_store_ptr(&ctx->sinkHandle, sinkHandle);

D
dapan1121 已提交
972
  if (pTaskInfo && sinkHandle) {
D
dapan1121 已提交
973
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
D
dapan1121 已提交
974
  }
dengyihao's avatar
dengyihao 已提交
975

D
dapan1121 已提交
976 977
_return:

D
dapan1121 已提交
978 979
  input.code = code;
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
980

dengyihao's avatar
dengyihao 已提交
981
  // if (!queryRsped) {
D
dapan1121 已提交
982 983 984
  //  qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
  //  QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
  //}
D
dapan1121 已提交
985

D
dapan1121 已提交
986
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
987 988
}

D
dapan1121 已提交
989
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
990
  int32_t     code = 0;
D
dapan1121 已提交
991
  SQWTaskCtx *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
992 993
  int8_t      phase = 0;
  bool        needRsp = true;
D
dapan1121 已提交
994 995

  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
996

D
dapan1121 已提交
997 998
  QW_LOCK(QW_WRITE, &ctx->lock);

D
dapan1121 已提交
999 1000 1001
  if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
    QW_TASK_WLOG_E("task is dropping or already dropped");
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
1002
  }
dengyihao's avatar
dengyihao 已提交
1003

D
dapan1121 已提交
1004
  if (ctx->phase == QW_PHASE_PRE_QUERY) {
1005
    ctx->ctrlConnInfo.handle = qwMsg->connInfo.handle;
D
dapan1121 已提交
1006
    ctx->ctrlConnInfo.ahandle = qwMsg->connInfo.ahandle;
D
dapan1121 已提交
1007
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY);
D
dapan1121 已提交
1008 1009 1010 1011 1012 1013 1014
    needRsp = false;
    QW_TASK_DLOG_E("ready msg will not rsp now");
    goto _return;
  }

  QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);

dengyihao's avatar
dengyihao 已提交
1015 1016 1017
  if (atomic_load_8((int8_t *)&ctx->queryEnd) || atomic_load_8((int8_t *)&ctx->queryFetched)) {
    QW_TASK_ELOG("got ready msg at wrong status, queryEnd:%d, queryFetched:%d", atomic_load_8((int8_t *)&ctx->queryEnd),
                 atomic_load_8((int8_t *)&ctx->queryFetched));
D
dapan1121 已提交
1018 1019 1020
    QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
  }

D
dapan1121 已提交
1021 1022 1023
  if (ctx->phase == QW_PHASE_POST_QUERY) {
    code = ctx->rspCode;
    goto _return;
D
dapan1121 已提交
1024 1025
  }

D
dapan1121 已提交
1026
  QW_TASK_ELOG("invalid phase when got ready msg, phase:%s", qwPhaseStr(ctx->phase));
H
Haojun Liao 已提交
1027

D
dapan1121 已提交
1028
  QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
D
dapan1121 已提交
1029 1030 1031

_return:

D
dapan1121 已提交
1032
  if (code && ctx) {
D
dapan1121 已提交
1033
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
1034 1035
  }

D
dapan1121 已提交
1036 1037 1038
  if (code) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
  }
H
Haojun Liao 已提交
1039

D
dapan1121 已提交
1040 1041
  if (ctx) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
1042
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
1043 1044
  }

D
dapan1121 已提交
1045
  if (needRsp) {
S
shm  
Shengliang Guan 已提交
1046
    qwBuildAndSendReadyRsp(&qwMsg->connInfo, code);
D
dapan1121 已提交
1047
    QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
1048 1049
  }

D
dapan1121 已提交
1050
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1051 1052
}

D
dapan1121 已提交
1053
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
1054 1055
  SQWTaskCtx *  ctx = NULL;
  int32_t       code = 0;
1056
  SQWPhaseInput input = {0};
dengyihao's avatar
dengyihao 已提交
1057 1058 1059 1060
  void *        rsp = NULL;
  int32_t       dataLen = 0;
  bool          queryEnd = false;

D
dapan1121 已提交
1061
  do {
D
dapan1121 已提交
1062
    QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
D
dapan1121 已提交
1063

D
dapan1121 已提交
1064
    QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
1065

dengyihao's avatar
dengyihao 已提交
1066 1067
    atomic_store_8((int8_t *)&ctx->queryInQueue, 0);
    atomic_store_8((int8_t *)&ctx->queryContinue, 0);
D
dapan1121 已提交
1068

D
dapan1121 已提交
1069
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd));
D
dapan1121 已提交
1070

D
dapan1121 已提交
1071 1072 1073
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
      SOutputData sOutput = {0};
      QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
dengyihao's avatar
dengyihao 已提交
1074 1075

      if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
1076
        QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
dengyihao's avatar
dengyihao 已提交
1077 1078

        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
1079
      }
dengyihao's avatar
dengyihao 已提交
1080

D
dapan1121 已提交
1081
      if (rsp) {
D
dapan1121 已提交
1082
        bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
1083

D
dapan1121 已提交
1084
        qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
1085
        if (qComplete) {
dengyihao's avatar
dengyihao 已提交
1086
          atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
1087
        }
H
Haojun Liao 已提交
1088

D
dapan1121 已提交
1089
        qwMsg->connInfo = ctx->dataConnInfo;
dengyihao's avatar
dengyihao 已提交
1090 1091 1092 1093 1094
        QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);

        qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
        QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
                     tstrerror(code), dataLen);
D
dapan1121 已提交
1095
      } else {
dengyihao's avatar
dengyihao 已提交
1096
        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
1097 1098 1099
      }
    }

dengyihao's avatar
dengyihao 已提交
1100
  _return:
1101

D
dapan1121 已提交
1102 1103 1104 1105
    if (NULL == ctx) {
      break;
    }

D
dapan1121 已提交
1106
    if (code && QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
dengyihao's avatar
dengyihao 已提交
1107
      QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
1108 1109
      qwFreeFetchRsp(rsp);
      rsp = NULL;
dengyihao's avatar
dengyihao 已提交
1110

D
dapan1121 已提交
1111
      qwMsg->connInfo = ctx->dataConnInfo;
S
shm  
Shengliang Guan 已提交
1112
      qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code);
dengyihao's avatar
dengyihao 已提交
1113 1114
      QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
                   0);
1115
    }
D
dapan1121 已提交
1116

D
dapan1121 已提交
1117
    QW_LOCK(QW_WRITE, &ctx->lock);
dengyihao's avatar
dengyihao 已提交
1118
    if (queryEnd || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
D
dapan1121 已提交
1119
      // Note: if necessary, fetch need to put cquery to queue again
D
dapan1121 已提交
1120
      atomic_store_8(&ctx->phase, 0);
dengyihao's avatar
dengyihao 已提交
1121
      QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
1122 1123
      break;
    }
dengyihao's avatar
dengyihao 已提交
1124
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
1125
  } while (true);
D
dapan1121 已提交
1126

D
dapan1121 已提交
1127
  input.code = code;
dengyihao's avatar
dengyihao 已提交
1128
  qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL);
D
dapan1121 已提交
1129

dengyihao's avatar
dengyihao 已提交
1130
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1131
}
D
dapan1121 已提交
1132

D
dapan1121 已提交
1133
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
1134 1135 1136 1137 1138
  int32_t       code = 0;
  int32_t       dataLen = 0;
  bool          locked = false;
  SQWTaskCtx *  ctx = NULL;
  void *        rsp = NULL;
D
dapan1121 已提交
1139
  SQWPhaseInput input = {0};
D
dapan1121 已提交
1140

D
dapan1121 已提交
1141
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
1142

1143
  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
1144

D
dapan 已提交
1145 1146
  SOutputData sOutput = {0};
  QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
D
dapan1121 已提交
1147

1148
  if (NULL == rsp) {
D
dapan1121 已提交
1149 1150
    atomic_store_ptr(&ctx->dataConnInfo.handle, qwMsg->connInfo.handle);
    atomic_store_ptr(&ctx->dataConnInfo.ahandle, qwMsg->connInfo.ahandle);
dengyihao's avatar
dengyihao 已提交
1151

1152
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
D
dapan1121 已提交
1153
  } else {
D
dapan1121 已提交
1154
    bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
1155

D
dapan1121 已提交
1156
    qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
1157
    if (qComplete) {
dengyihao's avatar
dengyihao 已提交
1158
      atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
1159
    }
D
dapan1121 已提交
1160 1161
  }

dengyihao's avatar
dengyihao 已提交
1162
  if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
1163
    QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
D
dapan1121 已提交
1164

D
dapan1121 已提交
1165 1166
    QW_LOCK(QW_WRITE, &ctx->lock);
    locked = true;
1167

D
dapan1121 已提交
1168
    // RC WARNING
D
dapan1121 已提交
1169
    if (QW_IS_QUERY_RUNNING(ctx)) {
dengyihao's avatar
dengyihao 已提交
1170 1171
      atomic_store_8((int8_t *)&ctx->queryContinue, 1);
    } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
H
Haojun Liao 已提交
1172
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING);
D
dapan1121 已提交
1173

dengyihao's avatar
dengyihao 已提交
1174 1175
      atomic_store_8((int8_t *)&ctx->queryInQueue, 1);

D
dapan1121 已提交
1176
      QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
1177
    }
D
dapan 已提交
1178
  }
dengyihao's avatar
dengyihao 已提交
1179

D
dapan1121 已提交
1180
_return:
D
dapan1121 已提交
1181

D
dapan1121 已提交
1182 1183 1184 1185 1186
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

  input.code = code;
D
dapan1121 已提交
1187
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL);
D
dapan1121 已提交
1188

D
dapan 已提交
1189 1190 1191
  if (code) {
    qwFreeFetchRsp(rsp);
    rsp = NULL;
D
dapan1121 已提交
1192
    dataLen = 0;
D
dapan1121 已提交
1193 1194 1195
  }

  if (code || rsp) {
S
shm  
Shengliang Guan 已提交
1196
    qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
dengyihao's avatar
dengyihao 已提交
1197 1198
    QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
                 dataLen);
D
dapan1121 已提交
1199 1200
  }

D
dapan1121 已提交
1201
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1202
}
D
dapan1121 已提交
1203

D
dapan1121 已提交
1204
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
1205 1206
  int32_t     code = 0;
  bool        rsped = false;
D
dapan1121 已提交
1207
  SQWTaskCtx *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
1208
  bool        locked = false;
D
dapan1121 已提交
1209

D
dapan1121 已提交
1210 1211
  // TODO : TASK ALREADY REMOVED AND A NEW DROP MSG RECEIVED

D
dapan1121 已提交
1212
  QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
1213

D
dapan1121 已提交
1214 1215 1216 1217 1218
  QW_LOCK(QW_WRITE, &ctx->lock);

  locked = true;

  if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
1219
    QW_TASK_WLOG_E("task already dropping");
D
dapan1121 已提交
1220 1221 1222
    QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
  }

D
dapan1121 已提交
1223
  if (QW_IS_QUERY_RUNNING(ctx)) {
D
dapan1121 已提交
1224
    QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
D
dapan1121 已提交
1225
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING);
D
dapan1121 已提交
1226
  } else if (ctx->phase > 0) {
D
dapan1121 已提交
1227 1228 1229 1230
    if (0 == qwMsg->code) {
      qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
      QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
    }
1231

D
dapan1121 已提交
1232
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
1233
    rsped = true;
D
dapan1121 已提交
1234 1235
  } else {
    // task not started
D
dapan1121 已提交
1236
  }
D
dapan1121 已提交
1237

D
dapan1121 已提交
1238
  if (!rsped) {
D
dapan1121 已提交
1239 1240
    ctx->ctrlConnInfo.handle = qwMsg->connInfo.handle;
    ctx->ctrlConnInfo.ahandle = qwMsg->connInfo.ahandle;
dengyihao's avatar
dengyihao 已提交
1241

D
dapan1121 已提交
1242 1243
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
  }
1244

D
dapan1121 已提交
1245
_return:
D
dapan1121 已提交
1246

D
dapan1121 已提交
1247
  if (code) {
D
dapan1121 已提交
1248 1249 1250
    if (ctx) {
      QW_UPDATE_RSP_CODE(ctx, code);
    }
H
Haojun Liao 已提交
1251

D
dapan1121 已提交
1252
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
D
dapan1121 已提交
1253 1254
  }

D
dapan 已提交
1255 1256 1257 1258
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

D
dapan1121 已提交
1259
  if (ctx) {
D
dapan1121 已提交
1260
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
1261 1262
  }

D
dapan1121 已提交
1263
  if ((TSDB_CODE_SUCCESS != code) && (0 == qwMsg->code)) {
S
shm  
Shengliang Guan 已提交
1264
    qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
D
dapan1121 已提交
1265
    QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
1266
  }
D
dapan1121 已提交
1267

D
dapan1121 已提交
1268
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1269
}
D
dapan1121 已提交
1270

D
dapan1121 已提交
1271
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
dengyihao's avatar
dengyihao 已提交
1272
  int32_t         code = 0;
D
dapan1121 已提交
1273
  SSchedulerHbRsp rsp = {0};
dengyihao's avatar
dengyihao 已提交
1274
  SQWSchStatus *  sch = NULL;
D
dapan1121 已提交
1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288

  QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));

  QW_LOCK(QW_WRITE, &sch->hbConnLock);

  if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
    tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER);
    sch->hbConnInfo.handle = NULL;
    sch->hbConnInfo.ahandle = NULL;

    QW_DLOG("release hb handle due to connection broken, handle:%p", qwMsg->connInfo.handle);
  } else {
    QW_DLOG("ignore hb connection broken, handle:%p, currentHandle:%p", qwMsg->connInfo.handle, sch->hbConnInfo.handle);
  }
dengyihao's avatar
dengyihao 已提交
1289

D
dapan1121 已提交
1290
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
dengyihao's avatar
dengyihao 已提交
1291

D
dapan1121 已提交
1292
  qwReleaseScheduler(QW_READ, mgmt);
dengyihao's avatar
dengyihao 已提交
1293

D
dapan1121 已提交
1294 1295 1296
  QW_RET(TSDB_CODE_SUCCESS);
}

D
dapan1121 已提交
1297
int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
dengyihao's avatar
dengyihao 已提交
1298
  int32_t         code = 0;
D
dapan1121 已提交
1299
  SSchedulerHbRsp rsp = {0};
dengyihao's avatar
dengyihao 已提交
1300
  SQWSchStatus *  sch = NULL;
D
dapan1121 已提交
1301

D
dapan1121 已提交
1302 1303 1304
  if (qwMsg->code) {
    QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req));
  }
D
dapan1121 已提交
1305 1306 1307

  QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));

D
dapan1121 已提交
1308 1309
  QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo));

D
dapan1121 已提交
1310
  QW_LOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
1311

D
dapan1121 已提交
1312
  if (sch->hbConnInfo.handle) {
S
shm  
Shengliang Guan 已提交
1313
    tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER);
D
dapan1121 已提交
1314
  }
D
dapan1121 已提交
1315

D
dapan1121 已提交
1316
  memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
D
dapan1121 已提交
1317
  memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId));
dengyihao's avatar
dengyihao 已提交
1318

D
dapan1121 已提交
1319
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
dengyihao's avatar
dengyihao 已提交
1320 1321 1322

  QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", req->sId,
          req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle);
D
dapan1121 已提交
1323

D
dapan1121 已提交
1324 1325 1326 1327
  qwReleaseScheduler(QW_READ, mgmt);

_return:

D
dapan1121 已提交
1328 1329
  memcpy(&rsp.epId, &req->epId, sizeof(req->epId));

S
shm  
Shengliang Guan 已提交
1330
  qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
D
dapan1121 已提交
1331 1332 1333 1334

  if (code) {
    tmsgReleaseHandle(qwMsg->connInfo.handle, TAOS_CONN_SERVER);
  }
dengyihao's avatar
dengyihao 已提交
1335

D
dapan1121 已提交
1336
  QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
dengyihao's avatar
dengyihao 已提交
1337

D
dapan1121 已提交
1338
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1339 1340 1341
}

void qwProcessHbTimerEvent(void *param, void *tmrId) {
D
dapan1121 已提交
1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354
  SQWHbParam* hbParam = (SQWHbParam*)param;
  if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) {
    return;
  }

  int64_t refId = hbParam->refId;
  SQWorker *mgmt = qwAcquire(refId);
  if (NULL == mgmt) {
    QW_DLOG("qwAcquire %" PRIx64 "failed", refId);
    taosMemoryFree(param);
    return;
  }
  
D
dapan1121 已提交
1355
  SQWSchStatus *sch = NULL;
dengyihao's avatar
dengyihao 已提交
1356 1357 1358
  int32_t       taskNum = 0;
  SQWHbInfo *   rspList = NULL;
  int32_t       code = 0;
D
dapan1121 已提交
1359

D
dapan1121 已提交
1360 1361
  qwDbgDumpMgmtInfo(mgmt);

D
dapan1121 已提交
1362 1363 1364 1365 1366
  QW_LOCK(QW_READ, &mgmt->schLock);

  int32_t schNum = taosHashGetSize(mgmt->schHash);
  if (schNum <= 0) {
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
1367
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
1368
    qwRelease(refId);
D
dapan1121 已提交
1369
    return;
D
dapan1121 已提交
1370 1371
  }

wafwerar's avatar
wafwerar 已提交
1372
  rspList = taosMemoryCalloc(schNum, sizeof(SQWHbInfo));
D
dapan1121 已提交
1373 1374
  if (NULL == rspList) {
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
1375 1376
    QW_ELOG("calloc %d SQWHbInfo failed", schNum);
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
1377
    qwRelease(refId);
D
dapan1121 已提交
1378
    return;
D
dapan1121 已提交
1379 1380
  }

dengyihao's avatar
dengyihao 已提交
1381 1382
  void *  key = NULL;
  size_t  keyLen = 0;
D
dapan1121 已提交
1383 1384 1385 1386
  int32_t i = 0;

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
D
dapan1121 已提交
1387 1388 1389 1390 1391 1392 1393
    SQWSchStatus *sch = (SQWSchStatus *)pIter;
    if (NULL == sch->hbConnInfo.handle) {
      uint64_t *sId = taosHashGetKey(pIter, NULL);
      QW_DLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
      pIter = taosHashIterate(mgmt->schHash, pIter);
      continue;
    }
dengyihao's avatar
dengyihao 已提交
1394

D
dapan1121 已提交
1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409
    code = qwGenerateSchHbRsp(mgmt, (SQWSchStatus *)pIter, &rspList[i]);
    if (code) {
      taosHashCancelIterate(mgmt->schHash, pIter);
      QW_ERR_JRET(code);
    }

    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

_return:

  QW_UNLOCK(QW_READ, &mgmt->schLock);

  for (int32_t j = 0; j < i; ++j) {
S
shm  
Shengliang Guan 已提交
1410
    qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code);
1411 1412
    QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),
            (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));
D
dapan1121 已提交
1413
    tFreeSSchedulerHbRsp(&rspList[j].rsp);
D
dapan1121 已提交
1414 1415
  }

wafwerar's avatar
wafwerar 已提交
1416
  taosMemoryFreeClear(rspList);
D
dapan1121 已提交
1417

dengyihao's avatar
dengyihao 已提交
1418
  taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
1419 1420 1421 1422 1423 1424 1425 1426 1427 1428
  qwRelease(refId);  
}

void qwCloseRef(void) {
  taosWLockLatch(&gQwMgmt.lock);
  if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) {
    taosCloseRef(gQwMgmt.qwRef);
    gQwMgmt.qwRef= -1;
  }
  taosWUnLockLatch(&gQwMgmt.lock);
D
dapan1121 已提交
1429 1430
}

D
dapan1121 已提交
1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465
void qwDestroyImpl(void *pMgmt) {
  SQWorker *mgmt = (SQWorker *)pMgmt;

  taosTmrStopA(&mgmt->hbTimer);
  taosTmrCleanUp(mgmt->timer);

  // TODO STOP ALL QUERY

  // TODO FREE ALL

  taosHashCleanup(mgmt->ctxHash);
  taosHashCleanup(mgmt->schHash);

  taosMemoryFree(mgmt);

  atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);  

  qwCloseRef();
}

int32_t qwOpenRef(void) {
  taosWLockLatch(&gQwMgmt.lock);
  if (gQwMgmt.qwRef < 0) {
    gQwMgmt.qwRef= taosOpenRef(100, qwDestroyImpl);
    if (gQwMgmt.qwRef < 0) {
      taosWUnLockLatch(&gQwMgmt.lock);
      qError("init qworker ref failed");
      QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }
  taosWUnLockLatch(&gQwMgmt.lock);
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487
void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
  int32_t paramIdx = 0;
  int32_t newParamIdx = 0;
  
  while (true) {
    paramIdx = atomic_load_32(&gQwMgmt.paramIdx);
    if (paramIdx == tListLen(gQwMgmt.param)) {
      newParamIdx = 0;
    } else {
      newParamIdx = paramIdx + 1;
    }
    
    if (paramIdx == atomic_val_compare_exchange_32(&gQwMgmt.paramIdx, paramIdx, newParamIdx)) {
      break;
    }
  }

  gQwMgmt.param[paramIdx].qwrId = gQwMgmt.qwRef;
  gQwMgmt.param[paramIdx].refId = refId;

  *pParam = &gQwMgmt.param[paramIdx];
}
D
dapan1121 已提交
1488

S
Shengliang Guan 已提交
1489 1490
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
  if (NULL == qWorkerMgmt || pMsgCb->pWrapper == NULL) {
D
dapan1121 已提交
1491 1492 1493
    qError("invalid param to init qworker");
    QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
S
Shengliang 已提交
1494

D
dapan1121 已提交
1495 1496 1497 1498
  int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1);
  if (1 == qwNum) {
    memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param));
  }
D
dapan1121 已提交
1499 1500 1501 1502 1503 1504 1505 1506

  int32_t code = qwOpenRef();
  if (code) {
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
    QW_RET(code);
  }

  SQWorker *mgmt = taosMemoryCalloc(1, sizeof(SQWorker));
D
dapan1121 已提交
1507
  if (NULL == mgmt) {
D
dapan1121 已提交
1508 1509
    qError("calloc %d failed", (int32_t)sizeof(SQWorker));
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
D
dapan1121 已提交
1510
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1511 1512 1513 1514
  }

  if (cfg) {
    mgmt->cfg = *cfg;
D
dapan1121 已提交
1515
    if (0 == mgmt->cfg.maxSchedulerNum) {
D
dapan1121 已提交
1516
      mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
D
dapan1121 已提交
1517 1518
    }
    if (0 == mgmt->cfg.maxTaskNum) {
D
dapan1121 已提交
1519
      mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
D
dapan1121 已提交
1520 1521
    }
    if (0 == mgmt->cfg.maxSchTaskNum) {
D
dapan1121 已提交
1522
      mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
1523
    }
D
dapan1121 已提交
1524
  } else {
D
dapan1121 已提交
1525 1526 1527
    mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
    mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
    mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
1528 1529
  }

dengyihao's avatar
dengyihao 已提交
1530 1531
  mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                               HASH_ENTRY_LOCK);
D
dapan1121 已提交
1532
  if (NULL == mgmt->schHash) {
wafwerar's avatar
wafwerar 已提交
1533
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1534
    qError("init %d scheduler hash failed", mgmt->cfg.maxSchedulerNum);
D
dapan1121 已提交
1535
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1536 1537
  }

dengyihao's avatar
dengyihao 已提交
1538 1539
  mgmt->ctxHash =
      taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1540
  if (NULL == mgmt->ctxHash) {
D
dapan1121 已提交
1541
    qError("init %d task ctx hash failed", mgmt->cfg.maxTaskNum);
D
dapan1121 已提交
1542 1543 1544 1545 1546 1547 1548 1549 1550
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  mgmt->timer = taosTmrInit(0, 0, 0, "qworker");
  if (NULL == mgmt->timer) {
    qError("init timer failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1551 1552
  mgmt->nodeType = nodeType;
  mgmt->nodeId = nodeId;
S
Shengliang Guan 已提交
1553
  mgmt->msgCb = *pMsgCb;
D
dapan1121 已提交
1554

D
dapan1121 已提交
1555 1556 1557 1558 1559 1560
  mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt);
  if (mgmt->refId < 0) {
    qError("taosAddRef qw failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(terrno);
  }

D
dapan1121 已提交
1561 1562 1563
  SQWHbParam *param = NULL;
  qwSetHbParam(mgmt->refId, &param);

D
dapan1121 已提交
1564 1565 1566 1567 1568 1569
  mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void*)param, mgmt->timer);
  if (NULL == mgmt->hbTimer) {
    qError("start hb timer failed");
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  
D
dapan1121 已提交
1570 1571
  *qWorkerMgmt = mgmt;

D
dapan1121 已提交
1572 1573
  qDebug("qworker initialized for node, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt);

D
dapan1121 已提交
1574
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1575 1576 1577

_return:

D
dapan1121 已提交
1578 1579 1580 1581 1582 1583 1584
  if (mgmt->refId >= 0) {
    qwRelease(mgmt->refId);
  } else {
    taosHashCleanup(mgmt->schHash);
    taosHashCleanup(mgmt->ctxHash);
    taosTmrCleanUp(mgmt->timer);
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1585

D
dapan1121 已提交
1586 1587 1588
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);    
  }
  
D
dapan1121 已提交
1589
  QW_RET(code);
D
dapan1121 已提交
1590
}
D
dapan1121 已提交
1591 1592 1593 1594

void qWorkerDestroy(void **qWorkerMgmt) {
  if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
    return;
D
dapan1121 已提交
1595
  }
D
dapan 已提交
1596

D
dapan1121 已提交
1597
  SQWorker *mgmt = *qWorkerMgmt;
D
dapan1121 已提交
1598

D
dapan1121 已提交
1599 1600 1601
  if (taosRemoveRef(gQwMgmt.qwRef, mgmt->refId)) {
    qError("remove qw from ref list failed, refId:%" PRIx64, mgmt->refId);
  }
D
dapan1121 已提交
1602
}
D
dapan1121 已提交
1603

D
dapan1121 已提交
1604
int32_t qwGetSchTasksStatus(SQWorker *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) {
dengyihao's avatar
dengyihao 已提交
1605 1606 1607
  /*
    SQWSchStatus *sch = NULL;
    int32_t taskNum = 0;
1608

dengyihao's avatar
dengyihao 已提交
1609
    QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch));
1610

dengyihao's avatar
dengyihao 已提交
1611
    sch->lastAccessTs = taosGetTimestampSec();
1612

dengyihao's avatar
dengyihao 已提交
1613
    QW_LOCK(QW_READ, &sch->tasksLock);
1614

dengyihao's avatar
dengyihao 已提交
1615
    taskNum = taosHashGetSize(sch->tasksHash);
D
dapan1121 已提交
1616

dengyihao's avatar
dengyihao 已提交
1617 1618 1619 1620 1621 1622
    int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum;
    *rsp = taosMemoryCalloc(1, size);
    if (NULL == *rsp) {
      QW_SCH_ELOG("calloc %d failed", size);
      QW_UNLOCK(QW_READ, &sch->tasksLock);
      qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
1623

dengyihao's avatar
dengyihao 已提交
1624 1625
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
D
dapan1121 已提交
1626

dengyihao's avatar
dengyihao 已提交
1627 1628 1629 1630 1631 1632 1633 1634
    void *key = NULL;
    size_t keyLen = 0;
    int32_t i = 0;

    void *pIter = taosHashIterate(sch->tasksHash, NULL);
    while (pIter) {
      SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
      taosHashGetKey(pIter, &key, &keyLen);
D
dapan1121 已提交
1635

dengyihao's avatar
dengyihao 已提交
1636 1637
      QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId);
      (*rsp)->status[i].status = taskStatus->status;
D
dapan1121 已提交
1638

dengyihao's avatar
dengyihao 已提交
1639 1640 1641
      ++i;
      pIter = taosHashIterate(sch->tasksHash, pIter);
    }
D
dapan1121 已提交
1642

dengyihao's avatar
dengyihao 已提交
1643 1644 1645 1646 1647 1648 1649
    QW_UNLOCK(QW_READ, &sch->tasksLock);
    qwReleaseScheduler(QW_READ, mgmt);

    (*rsp)->num = taskNum;
  */
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
1650

D
dapan1121 已提交
1651
int32_t qwUpdateSchLastAccess(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
D
dapan1121 已提交
1652
  SQWSchStatus *sch = NULL;
D
dapan1121 已提交
1653

dengyihao's avatar
dengyihao 已提交
1654 1655
  /*
    QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
D
dapan1121 已提交
1656

dengyihao's avatar
dengyihao 已提交
1657
    sch->lastAccessTs = taosGetTimestampSec();
D
dapan1121 已提交
1658

dengyihao's avatar
dengyihao 已提交
1659 1660
    qwReleaseScheduler(QW_READ, mgmt);
  */
D
dapan1121 已提交
1661 1662 1663
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1664
int32_t qwGetTaskStatus(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t *taskStatus) {
dengyihao's avatar
dengyihao 已提交
1665
  SQWSchStatus * sch = NULL;
D
dapan1121 已提交
1666
  SQWTaskStatus *task = NULL;
dengyihao's avatar
dengyihao 已提交
1667
  int32_t        code = 0;
D
dapan1121 已提交
1668

dengyihao's avatar
dengyihao 已提交
1669 1670 1671 1672 1673
  /*
    if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch)) {
      *taskStatus = JOB_TASK_STATUS_NULL;
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
1674

dengyihao's avatar
dengyihao 已提交
1675 1676
    if (qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)) {
      qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
1677

dengyihao's avatar
dengyihao 已提交
1678 1679 1680
      *taskStatus = JOB_TASK_STATUS_NULL;
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
1681

dengyihao's avatar
dengyihao 已提交
1682 1683 1684 1685 1686
    *taskStatus = task->status;

    qwReleaseTask(QW_READ, sch);
    qwReleaseScheduler(QW_READ, mgmt);
  */
D
dapan1121 已提交
1687 1688 1689 1690

  QW_RET(code);
}

D
dapan1121 已提交
1691
int32_t qwCancelTask(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
dengyihao's avatar
dengyihao 已提交
1692
  SQWSchStatus * sch = NULL;
D
dapan1121 已提交
1693
  SQWTaskStatus *task = NULL;
dengyihao's avatar
dengyihao 已提交
1694 1695 1696 1697
  int32_t        code = 0;

  /*
    QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch));
D
dapan1121 已提交
1698

dengyihao's avatar
dengyihao 已提交
1699
    QW_ERR_JRET(qwAcquireAddTask(mgmt, QW_READ, sch, qId, tId, JOB_TASK_STATUS_NOT_START, &task));
D
dapan1121 已提交
1700

D
dapan1121 已提交
1701

dengyihao's avatar
dengyihao 已提交
1702
    QW_LOCK(QW_WRITE, &task->lock);
D
dapan1121 已提交
1703

dengyihao's avatar
dengyihao 已提交
1704
    task->cancel = true;
D
dapan1121 已提交
1705

dengyihao's avatar
dengyihao 已提交
1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719
    int8_t oriStatus = task->status;
    int8_t newStatus = 0;

    if (task->status == JOB_TASK_STATUS_CANCELLED || task->status == JOB_TASK_STATUS_NOT_START || task->status ==
  JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING) { QW_UNLOCK(QW_WRITE, &task->lock);

      qwReleaseTask(QW_READ, sch);
      qwReleaseScheduler(QW_READ, mgmt);

      return TSDB_CODE_SUCCESS;
    } else if (task->status == JOB_TASK_STATUS_FAILED || task->status == JOB_TASK_STATUS_SUCCEED || task->status ==
  JOB_TASK_STATUS_PARTIAL_SUCCEED) { QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED)); } else {
      QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLING));
    }
D
dapan1121 已提交
1720

D
dapan1121 已提交
1721
    QW_UNLOCK(QW_WRITE, &task->lock);
D
dapan1121 已提交
1722

D
dapan1121 已提交
1723 1724
    qwReleaseTask(QW_READ, sch);
    qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
1725

dengyihao's avatar
dengyihao 已提交
1726 1727 1728
    if (oriStatus == JOB_TASK_STATUS_EXECUTING) {
      //TODO call executer to cancel subquery async
    }
D
dapan1121 已提交
1729

dengyihao's avatar
dengyihao 已提交
1730
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1731

dengyihao's avatar
dengyihao 已提交
1732
  _return:
D
dapan1121 已提交
1733

dengyihao's avatar
dengyihao 已提交
1734 1735
    if (task) {
      QW_UNLOCK(QW_WRITE, &task->lock);
D
dapan1121 已提交
1736

dengyihao's avatar
dengyihao 已提交
1737 1738 1739 1740 1741 1742 1743
      qwReleaseTask(QW_READ, sch);
    }

    if (sch) {
      qwReleaseScheduler(QW_READ, mgmt);
    }
  */
D
dapan1121 已提交
1744

D
dapan1121 已提交
1745
  QW_RET(code);
D
dapan1121 已提交
1746
}