qworker.c 48.4 KB
Newer Older
D
dapan1121 已提交
1
#include "qworker.h"
dengyihao's avatar
dengyihao 已提交
2
#include "dataSinkMgt.h"
3
#include "executor.h"
D
dapan1121 已提交
4
#include "planner.h"
H
Haojun Liao 已提交
5 6
#include "query.h"
#include "qworkerInt.h"
D
dapan1121 已提交
7
#include "qworkerMsg.h"
dengyihao's avatar
dengyihao 已提交
8
#include "tcommon.h"
H
Haojun Liao 已提交
9
#include "tmsg.h"
10
#include "tname.h"
D
dapan1121 已提交
11

D
dapan1121 已提交
12
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true};
D
dapan1121 已提交
13 14 15 16 17
SQWorkerMgmt gQwMgmt = {
  .lock  = 0,
  .qwRef = -1, 
  .qwNum = 0,
};
18

D
dapan1121 已提交
19 20 21
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
  if (!gQWDebug.statusEnable) {
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
22
  }
23

D
dapan1121 已提交
24
  int32_t code = 0;
D
dapan1121 已提交
25

D
dapan1121 已提交
26
  if (oriStatus == newStatus) {
D
dapan1121 已提交
27 28 29 30
    if (newStatus == JOB_TASK_STATUS_EXECUTING || newStatus == JOB_TASK_STATUS_FAILED) {
      *ignore = true;
      return TSDB_CODE_SUCCESS;
    }
H
Haojun Liao 已提交
31

D
dapan1121 已提交
32 33
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }
dengyihao's avatar
dengyihao 已提交
34

D
dapan1121 已提交
35 36
  switch (oriStatus) {
    case JOB_TASK_STATUS_NULL:
dengyihao's avatar
dengyihao 已提交
37 38
      if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED &&
          newStatus != JOB_TASK_STATUS_NOT_START) {
D
dapan1121 已提交
39 40
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
41

D
dapan1121 已提交
42 43
      break;
    case JOB_TASK_STATUS_NOT_START:
D
dapan1121 已提交
44
      if (newStatus != JOB_TASK_STATUS_CANCELLED) {
D
dapan1121 已提交
45 46
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
47

D
dapan1121 已提交
48 49
      break;
    case JOB_TASK_STATUS_EXECUTING:
dengyihao's avatar
dengyihao 已提交
50 51 52
      if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED && newStatus != JOB_TASK_STATUS_SUCCEED &&
          newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_CANCELLING &&
          newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
53 54
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
55

D
dapan1121 已提交
56 57
      break;
    case JOB_TASK_STATUS_PARTIAL_SUCCEED:
dengyihao's avatar
dengyihao 已提交
58 59 60
      if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_SUCCEED &&
          newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_FAILED &&
          newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
61 62
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
63

D
dapan1121 已提交
64 65
      break;
    case JOB_TASK_STATUS_SUCCEED:
dengyihao's avatar
dengyihao 已提交
66 67
      if (newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING &&
          newStatus != JOB_TASK_STATUS_FAILED) {
D
dapan1121 已提交
68 69 70 71
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
72
    case JOB_TASK_STATUS_FAILED:
D
dapan1121 已提交
73 74 75 76
      if (newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING) {
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      break;
H
Haojun Liao 已提交
77

D
dapan1121 已提交
78 79 80 81
    case JOB_TASK_STATUS_CANCELLING:
      if (newStatus != JOB_TASK_STATUS_CANCELLED) {
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
82

D
dapan1121 已提交
83 84
      break;
    case JOB_TASK_STATUS_CANCELLED:
D
dapan1121 已提交
85
    case JOB_TASK_STATUS_DROPPING:
D
dapan1121 已提交
86 87 88
      if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
D
dapan1121 已提交
89
      break;
dengyihao's avatar
dengyihao 已提交
90

D
dapan1121 已提交
91
    default:
D
dapan1121 已提交
92
      QW_TASK_ELOG("invalid task origStatus:%s", jobTaskStatusStr(oriStatus));
D
dapan1121 已提交
93 94 95
      return TSDB_CODE_QRY_APP_ERROR;
  }

D
dapan1121 已提交
96
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
97

D
dapan1121 已提交
98
_return:
D
dapan1121 已提交
99

D
dapan1121 已提交
100
  QW_TASK_ELOG("invalid task status update from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan1121 已提交
101
  QW_RET(code);
D
dapan1121 已提交
102 103
}

dengyihao's avatar
dengyihao 已提交
104
void qwDbgDumpSchInfo(SQWSchStatus *sch, int32_t i) {}
D
dapan1121 已提交
105

D
dapan1121 已提交
106
void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
D
dapan1121 已提交
107 108 109 110 111
  if (!gQWDebug.dumpEnable) {
    return;
  }

  QW_LOCK(QW_READ, &mgmt->schLock);
dengyihao's avatar
dengyihao 已提交
112

D
dapan1121 已提交
113 114
  QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash));

dengyihao's avatar
dengyihao 已提交
115 116 117
  void *        key = NULL;
  size_t        keyLen = 0;
  int32_t       i = 0;
D
dapan1121 已提交
118 119 120 121 122 123 124 125 126 127 128 129 130 131
  SQWSchStatus *sch = NULL;

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
    sch = (SQWSchStatus *)pIter;
    qwDbgDumpSchInfo(sch, i);
    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

  QW_UNLOCK(QW_READ, &mgmt->schLock);

  QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash));
}
D
dapan1121 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168

char *qwPhaseStr(int32_t phase) {
  switch (phase) {
    case QW_PHASE_PRE_QUERY:
      return "PRE_QUERY";
    case QW_PHASE_POST_QUERY:
      return "POST_QUERY";
    case QW_PHASE_PRE_FETCH:
      return "PRE_FETCH";
    case QW_PHASE_POST_FETCH:
      return "POST_FETCH";
    case QW_PHASE_PRE_CQUERY:
      return "PRE_CQUERY";
    case QW_PHASE_POST_CQUERY:
      return "POST_CQUERY";
    default:
      break;
  }

  return "UNKNOWN";
}

char *qwBufStatusStr(int32_t bufStatus) {
  switch (bufStatus) {
    case DS_BUF_LOW:
      return "LOW";
    case DS_BUF_FULL:
      return "FULL";
    case DS_BUF_EMPTY:
      return "EMPTY";
    default:
      break;
  }

  return "UNKNOWN";
}

D
dapan1121 已提交
169
int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) {
D
dapan1121 已提交
170
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
171 172
  int8_t  origStatus = 0;
  bool    ignore = false;
D
dapan1121 已提交
173 174 175

  while (true) {
    origStatus = atomic_load_8(&task->status);
dengyihao's avatar
dengyihao 已提交
176

D
dapan1121 已提交
177 178 179 180
    QW_ERR_RET(qwDbgValidateStatus(QW_FPARAMS(), origStatus, status, &ignore));
    if (ignore) {
      break;
    }
dengyihao's avatar
dengyihao 已提交
181

D
dapan1121 已提交
182 183
    if (origStatus != atomic_val_compare_exchange_8(&task->status, origStatus, status)) {
      continue;
D
dapan1121 已提交
184
    }
dengyihao's avatar
dengyihao 已提交
185

D
dapan1121 已提交
186
    QW_TASK_DLOG("task status updated from %s to %s", jobTaskStatusStr(origStatus), jobTaskStatusStr(status));
D
dapan1121 已提交
187 188

    break;
D
dapan1121 已提交
189
  }
dengyihao's avatar
dengyihao 已提交
190

D
dapan1121 已提交
191 192 193
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
194
int32_t qwAddSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType) {
D
dapan1121 已提交
195
  SQWSchStatus newSch = {0};
dengyihao's avatar
dengyihao 已提交
196 197
  newSch.tasksHash =
      taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
D
dapan1121 已提交
198
  if (NULL == newSch.tasksHash) {
D
dapan1121 已提交
199 200
    QW_SCH_ELOG("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
201 202
  }

D
dapan1121 已提交
203 204 205 206 207
  QW_LOCK(QW_WRITE, &mgmt->schLock);
  int32_t code = taosHashPut(mgmt->schHash, &sId, sizeof(sId), &newSch, sizeof(newSch));
  if (0 != code) {
    if (!HASH_NODE_EXIST(code)) {
      QW_UNLOCK(QW_WRITE, &mgmt->schLock);
dengyihao's avatar
dengyihao 已提交
208

D
dapan1121 已提交
209 210 211
      QW_SCH_ELOG("taosHashPut new sch to scheduleHash failed, errno:%d", errno);
      taosHashCleanup(newSch.tasksHash);
      QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
212
    }
D
dapan1121 已提交
213 214

    taosHashCleanup(newSch.tasksHash);
D
dapan1121 已提交
215
  }
D
dapan1121 已提交
216
  QW_UNLOCK(QW_WRITE, &mgmt->schLock);
D
dapan1121 已提交
217

dengyihao's avatar
dengyihao 已提交
218
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
219 220
}

D
dapan1121 已提交
221
int32_t qwAcquireSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
D
dapan1121 已提交
222 223 224 225 226
  while (true) {
    QW_LOCK(rwType, &mgmt->schLock);
    *sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
    if (NULL == (*sch)) {
      QW_UNLOCK(rwType, &mgmt->schLock);
dengyihao's avatar
dengyihao 已提交
227

D
dapan1121 已提交
228
      if (QW_NOT_EXIST_ADD == nOpt) {
D
dapan1121 已提交
229
        QW_ERR_RET(qwAddSchedulerImpl(mgmt, sId, rwType));
D
dapan1121 已提交
230 231

        nOpt = QW_NOT_EXIST_RET_ERR;
dengyihao's avatar
dengyihao 已提交
232

D
dapan1121 已提交
233 234 235 236
        continue;
      } else if (QW_NOT_EXIST_RET_ERR == nOpt) {
        QW_RET(TSDB_CODE_QRY_SCH_NOT_EXIST);
      } else {
D
dapan1121 已提交
237
        QW_SCH_ELOG("unknown notExistOpt:%d", nOpt);
D
dapan1121 已提交
238
        QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
239
      }
D
dapan1121 已提交
240
    }
D
dapan1121 已提交
241 242

    break;
D
dapan1121 已提交
243 244 245 246 247
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
248
int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
D
dapan1121 已提交
249
  return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_ADD);
D
dapan1121 已提交
250 251
}

D
dapan1121 已提交
252
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
D
dapan1121 已提交
253
  return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_RET_ERR);
D
dapan1121 已提交
254 255
}

D
dapan1121 已提交
256
void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); }
D
dapan1121 已提交
257

D
dapan1121 已提交
258
int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) {
D
dapan1121 已提交
259 260 261 262 263 264 265 266 267 268 269 270 271
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);

  QW_LOCK(rwType, &sch->tasksLock);
  *task = taosHashGet(sch->tasksHash, id, sizeof(id));
  if (NULL == (*task)) {
    QW_UNLOCK(rwType, &sch->tasksLock);
    QW_ERR_RET(TSDB_CODE_QRY_TASK_NOT_EXIST);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
272
int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) {
D
dapan1121 已提交
273 274
  int32_t code = 0;

D
dapan1121 已提交
275 276
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
D
dapan1121 已提交
277

D
dapan1121 已提交
278 279
  SQWTaskStatus ntask = {0};
  ntask.status = status;
D
dapan1121 已提交
280
  ntask.refId = rId;
D
dapan1121 已提交
281 282 283 284 285 286

  QW_LOCK(QW_WRITE, &sch->tasksLock);
  code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask));
  if (0 != code) {
    QW_UNLOCK(QW_WRITE, &sch->tasksLock);
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
287 288
      if (rwType && task) {
        QW_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
D
dapan1121 已提交
289
      } else {
D
dapan1121 已提交
290
        QW_TASK_ELOG("task status already exist, newStatus:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
291
        QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
D
dapan1121 已提交
292 293
      }
    } else {
D
dapan1121 已提交
294
      QW_TASK_ELOG("taosHashPut to tasksHash failed, error:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
295
      QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
296 297 298
    }
  }
  QW_UNLOCK(QW_WRITE, &sch->tasksLock);
D
dapan1121 已提交
299

D
dapan1121 已提交
300 301
  QW_TASK_DLOG("task status added, newStatus:%s", jobTaskStatusStr(status));

D
dapan1121 已提交
302 303
  if (rwType && task) {
    QW_ERR_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
D
dapan1121 已提交
304 305
  }

D
dapan1121 已提交
306 307
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
308

D
dapan1121 已提交
309
int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status) {
D
dapan1121 已提交
310
  SQWSchStatus *tsch = NULL;
dengyihao's avatar
dengyihao 已提交
311
  int32_t       code = 0;
D
dapan1121 已提交
312
  QW_ERR_RET(qwAcquireAddScheduler(mgmt, sId, QW_READ, &tsch));
D
dapan1121 已提交
313

D
dapan1121 已提交
314
  QW_ERR_JRET(qwAddTaskStatusImpl(QW_FPARAMS(), tsch, 0, status, NULL));
D
dapan1121 已提交
315 316 317 318

_return:

  qwReleaseScheduler(QW_READ, mgmt);
dengyihao's avatar
dengyihao 已提交
319

D
dapan1121 已提交
320
  QW_RET(code);
D
dapan1121 已提交
321 322
}

dengyihao's avatar
dengyihao 已提交
323 324
int32_t qwAddAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, int32_t status,
                               SQWTaskStatus **task) {
D
dapan1121 已提交
325
  return qwAddTaskStatusImpl(QW_FPARAMS(), sch, rwType, status, task);
D
dapan1121 已提交
326
}
D
dapan1121 已提交
327

dengyihao's avatar
dengyihao 已提交
328
void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) { QW_UNLOCK(rwType, &sch->tasksLock); }
D
dapan1121 已提交
329

D
dapan1121 已提交
330
int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
D
dapan1121 已提交
331 332
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
H
Haojun Liao 已提交
333

D
dapan1121 已提交
334
  *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
D
dapan1121 已提交
335
  if (NULL == (*ctx)) {
D
dapan1121 已提交
336
    QW_TASK_DLOG_E("task ctx not exist, may be dropped");
D
dapan1121 已提交
337
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
338 339 340 341 342
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
343
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
D
dapan1121 已提交
344 345
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
dengyihao's avatar
dengyihao 已提交
346

D
dapan1121 已提交
347 348
  *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
  if (NULL == (*ctx)) {
D
dapan1121 已提交
349
    QW_TASK_DLOG_E("task ctx not exist, may be dropped");
D
dapan1121 已提交
350
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
351 352 353 354 355
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
356
int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) {
D
dapan1121 已提交
357 358 359
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);

D
dapan1121 已提交
360
  SQWTaskCtx nctx = {0};
D
dapan1121 已提交
361

D
dapan1121 已提交
362
  int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &nctx, sizeof(SQWTaskCtx));
D
dapan1121 已提交
363 364
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
365
      if (acquire && ctx) {
D
dapan1121 已提交
366
        QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
367 368
      } else if (ctx) {
        QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
369
      } else {
D
dapan1121 已提交
370
        QW_TASK_ELOG_E("task ctx already exist");
D
dapan1121 已提交
371 372 373
        QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
      }
    } else {
D
dapan1121 已提交
374
      QW_TASK_ELOG("taosHashPut to ctxHash failed, error:%x", code);
D
dapan1121 已提交
375 376 377 378
      QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

D
dapan1121 已提交
379
  if (acquire && ctx) {
D
dapan1121 已提交
380
    QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
381 382
  } else if (ctx) {
    QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
383 384 385 386 387
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
388
int32_t qwAddTaskCtx(QW_FPARAMS_DEF) { QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, NULL)); }
D
dapan1121 已提交
389

dengyihao's avatar
dengyihao 已提交
390
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTaskCtxImpl(QW_FPARAMS(), true, ctx); }
D
dapan1121 已提交
391

D
dapan1121 已提交
392
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); }
D
dapan1121 已提交
393

dengyihao's avatar
dengyihao 已提交
394
void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) {
D
dapan1121 已提交
395
  // Note: free/kill may in RC
D
dapan1121 已提交
396 397 398
  qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
  if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
    qDestroyTask(otaskHandle);
D
dapan1121 已提交
399 400
  }
}
D
dapan1121 已提交
401

D
dapan1121 已提交
402 403
int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
  int32_t code = 0;
D
dapan1121 已提交
404
  // Note: free/kill may in RC
D
dapan1121 已提交
405 406
  qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
  if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
D
dapan1121 已提交
407
    code = qAsyncKillTask(taskHandle);
D
dapan1121 已提交
408
    atomic_store_ptr(&ctx->taskHandle, taskHandle);
D
dapan1121 已提交
409
  }
D
dapan1121 已提交
410

D
dapan1121 已提交
411 412 413 414
  QW_RET(code);
}

void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
415
  tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER);
D
dapan1121 已提交
416
  ctx->ctrlConnInfo.handle = NULL;
dengyihao's avatar
dengyihao 已提交
417
  ctx->ctrlConnInfo.refId = -1;
D
dapan1121 已提交
418 419

  // NO need to release dataConnInfo
D
dapan1121 已提交
420

D
dapan1121 已提交
421
  qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle);
dengyihao's avatar
dengyihao 已提交
422

D
dapan1121 已提交
423 424 425
  if (ctx->sinkHandle) {
    dsDestroyDataSinker(ctx->sinkHandle);
    ctx->sinkHandle = NULL;
D
dapan1121 已提交
426
  }
D
dapan1121 已提交
427 428 429 430 431

  if (ctx->plan) {
    nodesDestroyNode(ctx->plan);
    ctx->plan = NULL;
  }
D
dapan1121 已提交
432
}
D
dapan1121 已提交
433

D
dapan1121 已提交
434
int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
D
dapan1121 已提交
435 436 437
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
  SQWTaskCtx octx;
D
dapan1121 已提交
438

D
dapan1121 已提交
439 440
  SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
  if (NULL == ctx) {
D
dapan1121 已提交
441
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
442
  }
D
dapan1121 已提交
443

D
dapan1121 已提交
444
  octx = *ctx;
D
dapan1121 已提交
445

D
dapan1121 已提交
446 447
  atomic_store_ptr(&ctx->taskHandle, NULL);
  atomic_store_ptr(&ctx->sinkHandle, NULL);
D
dapan1121 已提交
448
  atomic_store_ptr(&ctx->plan, NULL);
D
dapan1121 已提交
449

D
dapan1121 已提交
450 451
  QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP);

D
dapan1121 已提交
452
  if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
dengyihao's avatar
dengyihao 已提交
453
    QW_TASK_ELOG_E("taosHashRemove from ctx hash failed");
D
dapan1121 已提交
454
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
455 456
  }

D
dapan1121 已提交
457
  qwFreeTask(QW_FPARAMS(), &octx);
D
dapan1121 已提交
458 459

  QW_TASK_DLOG_E("task ctx dropped");
dengyihao's avatar
dengyihao 已提交
460

D
dapan1121 已提交
461 462 463
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
464
int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
dengyihao's avatar
dengyihao 已提交
465
  SQWSchStatus * sch = NULL;
D
dapan1121 已提交
466
  SQWTaskStatus *task = NULL;
dengyihao's avatar
dengyihao 已提交
467 468
  int32_t        code = 0;

D
dapan1121 已提交
469 470
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
D
dapan1121 已提交
471

D
dapan1121 已提交
472
  if (qwAcquireScheduler(mgmt, sId, QW_WRITE, &sch)) {
D
dapan1121 已提交
473
    QW_TASK_WLOG_E("scheduler does not exist");
D
dapan1121 已提交
474 475
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
476

D
dapan1121 已提交
477 478
  if (qwAcquireTaskStatus(QW_FPARAMS(), QW_WRITE, sch, &task)) {
    qwReleaseScheduler(QW_WRITE, mgmt);
dengyihao's avatar
dengyihao 已提交
479

D
dapan1121 已提交
480
    QW_TASK_WLOG_E("task does not exist");
D
dapan1121 已提交
481 482
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
483

D
dapan1121 已提交
484
  if (taosHashRemove(sch->tasksHash, id, sizeof(id))) {
D
dapan1121 已提交
485
    QW_TASK_ELOG_E("taosHashRemove task from hash failed");
D
dapan1121 已提交
486 487 488
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

D
dapan1121 已提交
489
  QW_TASK_DLOG_E("task status dropped");
D
dapan1121 已提交
490 491 492

_return:

D
dapan1121 已提交
493 494 495
  if (task) {
    qwReleaseTaskStatus(QW_WRITE, sch);
  }
D
dapan1121 已提交
496
  qwReleaseScheduler(QW_WRITE, mgmt);
dengyihao's avatar
dengyihao 已提交
497

D
dapan1121 已提交
498 499 500
  QW_RET(code);
}

D
dapan1121 已提交
501
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) {
dengyihao's avatar
dengyihao 已提交
502
  SQWSchStatus * sch = NULL;
D
dapan1121 已提交
503
  SQWTaskStatus *task = NULL;
dengyihao's avatar
dengyihao 已提交
504
  int32_t        code = 0;
D
dapan1121 已提交
505

D
dapan1121 已提交
506
  QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch));
D
dapan1121 已提交
507
  QW_ERR_JRET(qwAcquireTaskStatus(QW_FPARAMS(), QW_READ, sch, &task));
D
dapan1121 已提交
508

D
dapan1121 已提交
509
  QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status));
dengyihao's avatar
dengyihao 已提交
510

D
dapan1121 已提交
511 512
_return:

D
dapan1121 已提交
513 514 515
  if (task) {
    qwReleaseTaskStatus(QW_READ, sch);
  }
D
dapan1121 已提交
516
  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
517 518 519 520

  QW_RET(code);
}

D
dapan1121 已提交
521
int32_t qwDropTask(QW_FPARAMS_DEF) {
H
Haojun Liao 已提交
522
  QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS()));
D
dapan1121 已提交
523
  QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS()));
H
Haojun Liao 已提交
524

D
dapan1121 已提交
525 526
  QW_TASK_DLOG_E("task is dropped");

D
dapan1121 已提交
527 528 529
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
530
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
dengyihao's avatar
dengyihao 已提交
531
  qTaskInfo_t *taskHandle = &ctx->taskHandle;
D
dapan1121 已提交
532 533 534 535

  if (TASK_TYPE_TEMP == ctx->taskType) {
    if (ctx->explain) {
      SExplainExecInfo *execInfo = NULL;
dengyihao's avatar
dengyihao 已提交
536
      int32_t           resNum = 0;
D
dapan1121 已提交
537
      QW_ERR_RET(qGetExplainExecInfo(ctx->taskHandle, &resNum, &execInfo));
538
      QW_ERR_RET(qwBuildAndSendExplainRsp(&ctx->ctrlConnInfo, execInfo, resNum));
D
dapan1121 已提交
539
    }
dengyihao's avatar
dengyihao 已提交
540

D
dapan1121 已提交
541 542 543 544 545 546
    qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
547
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
dengyihao's avatar
dengyihao 已提交
548 549 550 551 552 553 554
  int32_t        code = 0;
  bool           qcontinue = true;
  SSDataBlock *  pRes = NULL;
  uint64_t       useconds = 0;
  int32_t        i = 0;
  int32_t        execNum = 0;
  qTaskInfo_t *  taskHandle = &ctx->taskHandle;
D
dapan1121 已提交
555
  DataSinkHandle sinkHandle = ctx->sinkHandle;
dengyihao's avatar
dengyihao 已提交
556

D
dapan1121 已提交
557
  while (true) {
H
Haojun Liao 已提交
558
    QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
D
dapan1121 已提交
559

D
dapan1121 已提交
560
    code = qExecTask(*taskHandle, &pRes, &useconds);
D
dapan1121 已提交
561
    if (code) {
D
dapan1121 已提交
562 563
      QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
      QW_ERR_RET(code);
D
dapan1121 已提交
564 565
    }

D
dapan1121 已提交
566 567
    ++execNum;

D
dapan1121 已提交
568
    if (NULL == pRes) {
dengyihao's avatar
dengyihao 已提交
569
      QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
H
Haojun Liao 已提交
570

D
dapan1121 已提交
571
      dsEndPut(sinkHandle, useconds);
D
dapan1121 已提交
572 573

      QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
D
dapan1121 已提交
574 575 576 577

      if (queryEnd) {
        *queryEnd = true;
      }
dengyihao's avatar
dengyihao 已提交
578

D
dapan1121 已提交
579 580 581
      break;
    }

D
dapan1121 已提交
582
    int32_t rows = pRes->info.rows;
H
Haojun Liao 已提交
583

584 585
    ASSERT(pRes->info.rows > 0);

H
Haojun Liao 已提交
586
    SInputData inputData = {.pData = pRes};
D
dapan1121 已提交
587 588
    code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
    if (code) {
D
dapan1121 已提交
589 590
      QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code));
      QW_ERR_RET(code);
D
dapan1121 已提交
591
    }
D
dapan1121 已提交
592

D
dapan1121 已提交
593
    QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", rows, qcontinue);
dengyihao's avatar
dengyihao 已提交
594

D
dapan1121 已提交
595 596 597 598
    if (!qcontinue) {
      break;
    }

D
dapan1121 已提交
599 600 601 602 603
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) {
      break;
    }

    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
604 605
      break;
    }
D
dapan1121 已提交
606

D
dapan1121 已提交
607 608 609
    if (atomic_load_32(&ctx->rspCode)) {
      break;
    }
D
dapan1121 已提交
610 611 612 613
  }

  QW_RET(code);
}
D
dapan1121 已提交
614

D
dapan1121 已提交
615
int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
D
dapan1121 已提交
616 617
  int32_t taskNum = 0;

D
dapan1121 已提交
618
  hbInfo->connInfo = sch->hbConnInfo;
D
dapan1121 已提交
619
  hbInfo->rsp.epId = sch->hbEpId;
D
dapan1121 已提交
620

D
dapan1121 已提交
621
  QW_LOCK(QW_READ, &sch->tasksLock);
dengyihao's avatar
dengyihao 已提交
622

D
dapan1121 已提交
623
  taskNum = taosHashGetSize(sch->tasksHash);
D
dapan1121 已提交
624 625 626

  hbInfo->rsp.taskStatus = taosArrayInit(taskNum, sizeof(STaskStatus));
  if (NULL == hbInfo->rsp.taskStatus) {
D
dapan1121 已提交
627
    QW_UNLOCK(QW_READ, &sch->tasksLock);
D
dapan1121 已提交
628
    QW_ELOG("taosArrayInit taskStatus failed, num:%d", taskNum);
D
dapan1121 已提交
629 630 631
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
632 633 634
  void *      key = NULL;
  size_t      keyLen = 0;
  int32_t     i = 0;
D
dapan1121 已提交
635
  STaskStatus status = {0};
D
dapan1121 已提交
636 637 638 639

  void *pIter = taosHashIterate(sch->tasksHash, NULL);
  while (pIter) {
    SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
D
dapan1121 已提交
640
    key = taosHashGetKey(pIter, &keyLen);
D
dapan1121 已提交
641

dengyihao's avatar
dengyihao 已提交
642
    // TODO GET EXECUTOR API TO GET MORE INFO
D
dapan1121 已提交
643

D
dapan1121 已提交
644 645 646
    QW_GET_QTID(key, status.queryId, status.taskId);
    status.status = taskStatus->status;
    status.refId = taskStatus->refId;
dengyihao's avatar
dengyihao 已提交
647

D
dapan1121 已提交
648
    taosArrayPush(hbInfo->rsp.taskStatus, &status);
dengyihao's avatar
dengyihao 已提交
649

D
dapan1121 已提交
650 651
    ++i;
    pIter = taosHashIterate(sch->tasksHash, pIter);
dengyihao's avatar
dengyihao 已提交
652
  }
D
dapan1121 已提交
653 654 655 656 657 658

  QW_UNLOCK(QW_READ, &sch->tasksLock);

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
659
int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
dengyihao's avatar
dengyihao 已提交
660
  int32_t            len = 0;
D
dapan1121 已提交
661
  SRetrieveTableRsp *rsp = NULL;
dengyihao's avatar
dengyihao 已提交
662 663
  bool               queryEnd = false;
  int32_t            code = 0;
D
dapan1121 已提交
664

D
dapan1121 已提交
665
  dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
D
dapan1121 已提交
666

D
dapan1121 已提交
667 668 669 670
  if (len < 0) {
    QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
D
dapan1121 已提交
671

D
dapan1121 已提交
672 673
  if (len == 0) {
    if (queryEnd) {
D
dapan 已提交
674
      code = dsGetDataBlock(ctx->sinkHandle, pOutput);
D
dapan1121 已提交
675
      if (code) {
D
dapan1121 已提交
676
        QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
677 678
        QW_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
679

D
dapan1121 已提交
680
      QW_TASK_DLOG_E("no data in sink and query end");
H
Haojun Liao 已提交
681

D
dapan1121 已提交
682
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
L
Liu Jicong 已提交
683
      QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
684

D
dapan1121 已提交
685
      *rspMsg = rsp;
D
dapan 已提交
686
      *dataLen = 0;
D
dapan1121 已提交
687 688
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
689 690

    pOutput->bufStatus = DS_BUF_EMPTY;
D
dapan1121 已提交
691

D
dapan1121 已提交
692
    return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
693
  }
D
dapan1121 已提交
694

D
dapan1121 已提交
695
  // Got data from sink
D
dapan1121 已提交
696
  QW_TASK_DLOG("there are data in sink, dataLength:%d", len);
D
dapan1121 已提交
697

D
dapan 已提交
698
  *dataLen = len;
dengyihao's avatar
dengyihao 已提交
699

D
dapan1121 已提交
700
  QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
D
dapan 已提交
701
  *rspMsg = rsp;
dengyihao's avatar
dengyihao 已提交
702

D
dapan 已提交
703 704
  pOutput->pData = rsp->data;
  code = dsGetDataBlock(ctx->sinkHandle, pOutput);
D
dapan1121 已提交
705
  if (code) {
D
dapan1121 已提交
706
    QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
707 708
    QW_ERR_RET(code);
  }
D
dapan1121 已提交
709

D
dapan1121 已提交
710
  if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
D
dapan1121 已提交
711
    QW_TASK_DLOG_E("task all data fetched, done");
D
dapan1121 已提交
712
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
713 714
  }

D
dapan1121 已提交
715
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
716 717
}

D
dapan1121 已提交
718
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
dengyihao's avatar
dengyihao 已提交
719 720
  int32_t      code = 0;
  SQWTaskCtx * ctx = NULL;
S
Shengliang Guan 已提交
721 722
  SRpcHandleInfo *dropConnection = NULL;
  SRpcHandleInfo *cancelConnection = NULL;
D
dapan1121 已提交
723

D
dapan1121 已提交
724
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
725 726 727 728 729 730

  if (QW_PHASE_PRE_QUERY == phase) {
    QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
  } else {
    QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
  }
dengyihao's avatar
dengyihao 已提交
731

D
dapan1121 已提交
732
  QW_LOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
733

D
dapan1121 已提交
734
  if (QW_PHASE_PRE_FETCH == phase) {
dengyihao's avatar
dengyihao 已提交
735
    atomic_store_8((int8_t *)&ctx->queryFetched, true);
D
dapan1121 已提交
736
  } else {
D
dapan1121 已提交
737 738
    atomic_store_8(&ctx->phase, phase);
  }
D
dapan1121 已提交
739

dengyihao's avatar
dengyihao 已提交
740
  if (atomic_load_8((int8_t *)&ctx->queryEnd)) {
D
dapan1121 已提交
741 742 743
    QW_TASK_ELOG_E("query already end");
    QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
  }
D
dapan1121 已提交
744

D
dapan1121 已提交
745 746
  switch (phase) {
    case QW_PHASE_PRE_QUERY: {
D
dapan1121 已提交
747
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
H
Haojun Liao 已提交
748
        QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
749
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
D
dapan1121 已提交
750 751
        break;
      }
D
dapan1121 已提交
752

D
dapan1121 已提交
753
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
754
        dropConnection = &ctx->ctrlConnInfo;
D
dapan1121 已提交
755
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
756
        dropConnection = NULL;
757

D
dapan1121 已提交
758 759
        qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
760

D
dapan1121 已提交
761
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan 已提交
762
        break;
D
dapan1121 已提交
763
      }
D
dapan1121 已提交
764

D
dapan1121 已提交
765
      QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
D
dapan1121 已提交
766 767
      break;
    }
D
dapan1121 已提交
768
    case QW_PHASE_PRE_FETCH: {
D
dapan1121 已提交
769 770
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
        QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
771 772
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
      }
D
dapan1121 已提交
773

D
dapan1121 已提交
774
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
775
        QW_TASK_WLOG("last fetch still not processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
776 777 778 779
        QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
      }

      if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) {
D
dapan1121 已提交
780
        QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
781
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
D
dapan1121 已提交
782 783
      }
      break;
dengyihao's avatar
dengyihao 已提交
784
    }
D
dapan1121 已提交
785 786
    case QW_PHASE_PRE_CQUERY: {
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
787
        QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
788
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
789
      }
D
dapan1121 已提交
790

D
dapan1121 已提交
791
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
792
        dropConnection = &ctx->ctrlConnInfo;
D
dapan1121 已提交
793
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
794
        dropConnection = NULL;
H
Haojun Liao 已提交
795

D
dapan1121 已提交
796 797
        qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
798

D
dapan1121 已提交
799
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
800
      }
D
dapan1121 已提交
801

D
dapan1121 已提交
802
      break;
D
dapan1121 已提交
803 804 805 806 807 808 809
    }
    default:
      QW_TASK_ELOG("invalid phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

  if (ctx->rspCode) {
810
    QW_TASK_ELOG("task already failed at phase %s, code:%s", qwPhaseStr(phase), tstrerror(ctx->rspCode));
D
dapan1121 已提交
811
    QW_ERR_JRET(ctx->rspCode);
D
dapan1121 已提交
812
  }
D
dapan1121 已提交
813

D
dapan1121 已提交
814 815
_return:
  if (ctx) {
D
dapan1121 已提交
816
    QW_UPDATE_RSP_CODE(ctx, code);
dengyihao's avatar
dengyihao 已提交
817

D
dapan1121 已提交
818
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
819 820
    qwReleaseTaskCtx(mgmt, ctx);
  }
D
dapan1121 已提交
821

D
dapan1121 已提交
822
  if (dropConnection) {
S
shm  
Shengliang Guan 已提交
823
    qwBuildAndSendDropRsp(dropConnection, code);
D
dapan1121 已提交
824
    QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", dropConnection->handle, code, tstrerror(code));
D
dapan1121 已提交
825
  }
D
dapan1121 已提交
826

D
dapan1121 已提交
827
  if (cancelConnection) {
S
shm  
Shengliang Guan 已提交
828
    qwBuildAndSendCancelRsp(cancelConnection, code);
D
dapan1121 已提交
829
    QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code));
D
dapan1121 已提交
830 831
  }

832 833 834 835 836
  if (code != TSDB_CODE_SUCCESS) {
    QW_TASK_ELOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
  } else {
    QW_TASK_DLOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
  }
D
dapan1121 已提交
837 838 839 840 841

  QW_RET(code);
}

int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
S
Shengliang Guan 已提交
842 843 844 845
  int32_t         code = 0;
  SQWTaskCtx     *ctx = NULL;
  SRpcHandleInfo  connInfo = {0};
  SRpcHandleInfo *readyConnection = NULL;
D
dapan1121 已提交
846

D
dapan1121 已提交
847
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
dengyihao's avatar
dengyihao 已提交
848

D
dapan1121 已提交
849
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
850

D
dapan1121 已提交
851 852 853
  QW_LOCK(QW_WRITE, &ctx->lock);

  if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
854
    QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
855 856 857 858
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
  }

  if (QW_PHASE_POST_QUERY == phase) {
D
dapan1121 已提交
859
#if 0    
D
dapan1121 已提交
860
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
D
dapan1121 已提交
861
      readyConnection = &ctx->connInfo;
D
dapan1121 已提交
862 863
      QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
    }
D
dapan1121 已提交
864
#else
865
    connInfo = ctx->ctrlConnInfo;
D
dapan1121 已提交
866
    readyConnection = &connInfo;
dengyihao's avatar
dengyihao 已提交
867

D
dapan1121 已提交
868 869
    QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
#endif
D
dapan1121 已提交
870 871
  }

D
dapan1121 已提交
872
  if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
873 874 875 876
    if (QW_PHASE_POST_FETCH == phase) {
      QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
D
dapan1121 已提交
877

D
dapan1121 已提交
878 879
    qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
    QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
880

D
dapan1121 已提交
881 882
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
883 884 885
  }

  if (ctx->rspCode) {
dengyihao's avatar
dengyihao 已提交
886 887
    QW_TASK_ELOG("task already failed, phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode,
                 tstrerror(ctx->rspCode));
D
dapan1121 已提交
888
    QW_ERR_JRET(ctx->rspCode);
dengyihao's avatar
dengyihao 已提交
889
  }
D
dapan1121 已提交
890

D
dapan1121 已提交
891
  QW_ERR_JRET(input->code);
D
dapan1121 已提交
892 893 894

_return:

D
dapan1121 已提交
895 896 897 898
  if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED);
  }

D
dapan1121 已提交
899
  if (ctx) {
D
dapan1121 已提交
900
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
901

D
dapan1121 已提交
902 903 904
    if (QW_PHASE_POST_FETCH != phase) {
      atomic_store_8(&ctx->phase, phase);
    }
dengyihao's avatar
dengyihao 已提交
905

D
dapan1121 已提交
906
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
907
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
908 909
  }

D
dapan1121 已提交
910
  if (readyConnection) {
dengyihao's avatar
dengyihao 已提交
911
    qwBuildAndSendReadyRsp(readyConnection, code);
D
dapan1121 已提交
912
    QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code));
D
dapan1121 已提交
913 914
  }

D
dapan1121 已提交
915 916
  if (code) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
D
dapan1121 已提交
917 918
  }

D
dapan1121 已提交
919
  QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
D
dapan1121 已提交
920

D
dapan1121 已提交
921 922 923
  QW_RET(code);
}

D
dapan1121 已提交
924
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) {
dengyihao's avatar
dengyihao 已提交
925 926
  int32_t          code = 0;
  bool             queryRsped = false;
D
dapan1121 已提交
927
  SSubplan*        plan = NULL;
dengyihao's avatar
dengyihao 已提交
928 929 930 931
  SQWPhaseInput    input = {0};
  qTaskInfo_t      pTaskInfo = NULL;
  DataSinkHandle   sinkHandle = NULL;
  SQWTaskCtx *     ctx = NULL;
D
dapan1121 已提交
932

D
dapan1121 已提交
933
  QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
D
dapan1121 已提交
934

D
dapan1121 已提交
935
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
D
dapan1121 已提交
936 937

  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
938

D
dapan1121 已提交
939
  atomic_store_8(&ctx->taskType, taskType);
D
dapan1121 已提交
940
  atomic_store_8(&ctx->explain, explain);
X
Xiaoyu Wang 已提交
941

942
  ctx->ctrlConnInfo = qwMsg->connInfo;
D
dapan1121 已提交
943 944

  QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
X
Xiaoyu Wang 已提交
945

D
dapan1121 已提交
946 947
  code = qStringToSubplan(qwMsg->msg, &plan);
  if (TSDB_CODE_SUCCESS != code) {
948 949
    code = TSDB_CODE_INVALID_MSG;
    QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
950
    QW_ERR_JRET(code);
D
dapan1121 已提交
951
  }
dengyihao's avatar
dengyihao 已提交
952

D
dapan1121 已提交
953 954
  ctx->plan = plan;

955
  code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
956
  if (code) {
D
dapan1121 已提交
957
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
958
    QW_ERR_JRET(code);
D
dapan1121 已提交
959
  }
D
dapan1121 已提交
960

H
Haojun Liao 已提交
961
  if (NULL == sinkHandle || NULL == pTaskInfo) {
D
dapan1121 已提交
962 963 964 965
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

dengyihao's avatar
dengyihao 已提交
966 967
  // QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
  // QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
968

dengyihao's avatar
dengyihao 已提交
969
  // queryRsped = true;
D
dapan1121 已提交
970

D
dapan1121 已提交
971 972 973
  atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
  atomic_store_ptr(&ctx->sinkHandle, sinkHandle);

D
dapan1121 已提交
974
  if (pTaskInfo && sinkHandle) {
D
dapan1121 已提交
975
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
D
dapan1121 已提交
976
  }
dengyihao's avatar
dengyihao 已提交
977

D
dapan1121 已提交
978 979
_return:

D
dapan1121 已提交
980 981
  input.code = code;
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
982

dengyihao's avatar
dengyihao 已提交
983
  // if (!queryRsped) {
D
dapan1121 已提交
984 985 986
  //  qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
  //  QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
  //}
D
dapan1121 已提交
987

D
dapan1121 已提交
988
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
989 990
}

D
dapan1121 已提交
991
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
992
  int32_t     code = 0;
D
dapan1121 已提交
993
  SQWTaskCtx *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
994 995
  int8_t      phase = 0;
  bool        needRsp = true;
D
dapan1121 已提交
996 997

  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
998

D
dapan1121 已提交
999 1000
  QW_LOCK(QW_WRITE, &ctx->lock);

D
dapan1121 已提交
1001 1002 1003
  if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
    QW_TASK_WLOG_E("task is dropping or already dropped");
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
1004
  }
dengyihao's avatar
dengyihao 已提交
1005

D
dapan1121 已提交
1006
  if (ctx->phase == QW_PHASE_PRE_QUERY) {
1007
    ctx->ctrlConnInfo = qwMsg->connInfo;
D
dapan1121 已提交
1008
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY);
D
dapan1121 已提交
1009 1010 1011 1012 1013 1014 1015
    needRsp = false;
    QW_TASK_DLOG_E("ready msg will not rsp now");
    goto _return;
  }

  QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);

dengyihao's avatar
dengyihao 已提交
1016 1017 1018
  if (atomic_load_8((int8_t *)&ctx->queryEnd) || atomic_load_8((int8_t *)&ctx->queryFetched)) {
    QW_TASK_ELOG("got ready msg at wrong status, queryEnd:%d, queryFetched:%d", atomic_load_8((int8_t *)&ctx->queryEnd),
                 atomic_load_8((int8_t *)&ctx->queryFetched));
D
dapan1121 已提交
1019 1020 1021
    QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
  }

D
dapan1121 已提交
1022 1023 1024
  if (ctx->phase == QW_PHASE_POST_QUERY) {
    code = ctx->rspCode;
    goto _return;
D
dapan1121 已提交
1025 1026
  }

D
dapan1121 已提交
1027
  QW_TASK_ELOG("invalid phase when got ready msg, phase:%s", qwPhaseStr(ctx->phase));
H
Haojun Liao 已提交
1028

D
dapan1121 已提交
1029
  QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
D
dapan1121 已提交
1030 1031 1032

_return:

D
dapan1121 已提交
1033
  if (code && ctx) {
D
dapan1121 已提交
1034
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
1035 1036
  }

D
dapan1121 已提交
1037 1038 1039
  if (code) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
  }
H
Haojun Liao 已提交
1040

D
dapan1121 已提交
1041 1042
  if (ctx) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
1043
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
1044 1045
  }

D
dapan1121 已提交
1046
  if (needRsp) {
S
shm  
Shengliang Guan 已提交
1047
    qwBuildAndSendReadyRsp(&qwMsg->connInfo, code);
D
dapan1121 已提交
1048
    QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
1049 1050
  }

D
dapan1121 已提交
1051
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1052 1053
}

D
dapan1121 已提交
1054
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
1055 1056
  SQWTaskCtx *  ctx = NULL;
  int32_t       code = 0;
1057
  SQWPhaseInput input = {0};
dengyihao's avatar
dengyihao 已提交
1058 1059 1060 1061
  void *        rsp = NULL;
  int32_t       dataLen = 0;
  bool          queryEnd = false;

D
dapan1121 已提交
1062
  do {
D
dapan1121 已提交
1063
    QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
D
dapan1121 已提交
1064

D
dapan1121 已提交
1065
    QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
1066

dengyihao's avatar
dengyihao 已提交
1067 1068
    atomic_store_8((int8_t *)&ctx->queryInQueue, 0);
    atomic_store_8((int8_t *)&ctx->queryContinue, 0);
D
dapan1121 已提交
1069

D
dapan1121 已提交
1070
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd));
D
dapan1121 已提交
1071

D
dapan1121 已提交
1072 1073 1074
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
      SOutputData sOutput = {0};
      QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
dengyihao's avatar
dengyihao 已提交
1075 1076

      if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
1077
        QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
dengyihao's avatar
dengyihao 已提交
1078 1079

        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
1080
      }
dengyihao's avatar
dengyihao 已提交
1081

D
dapan1121 已提交
1082
      if (rsp) {
D
dapan1121 已提交
1083
        bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
1084

D
dapan1121 已提交
1085
        qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
1086
        if (qComplete) {
dengyihao's avatar
dengyihao 已提交
1087
          atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
1088
        }
H
Haojun Liao 已提交
1089

D
dapan1121 已提交
1090
        qwMsg->connInfo = ctx->dataConnInfo;
dengyihao's avatar
dengyihao 已提交
1091 1092 1093 1094 1095
        QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);

        qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
        QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
                     tstrerror(code), dataLen);
D
dapan1121 已提交
1096
      } else {
dengyihao's avatar
dengyihao 已提交
1097
        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
1098 1099 1100
      }
    }

dengyihao's avatar
dengyihao 已提交
1101
  _return:
1102

D
dapan1121 已提交
1103 1104 1105 1106
    if (NULL == ctx) {
      break;
    }

D
dapan1121 已提交
1107
    if (code && QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
dengyihao's avatar
dengyihao 已提交
1108
      QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
1109 1110
      qwFreeFetchRsp(rsp);
      rsp = NULL;
dengyihao's avatar
dengyihao 已提交
1111

D
dapan1121 已提交
1112
      qwMsg->connInfo = ctx->dataConnInfo;
S
shm  
Shengliang Guan 已提交
1113
      qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code);
dengyihao's avatar
dengyihao 已提交
1114 1115
      QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
                   0);
1116
    }
D
dapan1121 已提交
1117

D
dapan1121 已提交
1118
    QW_LOCK(QW_WRITE, &ctx->lock);
dengyihao's avatar
dengyihao 已提交
1119
    if (queryEnd || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
D
dapan1121 已提交
1120
      // Note: if necessary, fetch need to put cquery to queue again
D
dapan1121 已提交
1121
      atomic_store_8(&ctx->phase, 0);
dengyihao's avatar
dengyihao 已提交
1122
      QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
1123 1124
      break;
    }
dengyihao's avatar
dengyihao 已提交
1125
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
1126
  } while (true);
D
dapan1121 已提交
1127

D
dapan1121 已提交
1128
  input.code = code;
dengyihao's avatar
dengyihao 已提交
1129
  qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL);
D
dapan1121 已提交
1130

dengyihao's avatar
dengyihao 已提交
1131
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1132
}
D
dapan1121 已提交
1133

D
dapan1121 已提交
1134
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
1135 1136 1137 1138 1139
  int32_t       code = 0;
  int32_t       dataLen = 0;
  bool          locked = false;
  SQWTaskCtx *  ctx = NULL;
  void *        rsp = NULL;
D
dapan1121 已提交
1140
  SQWPhaseInput input = {0};
D
dapan1121 已提交
1141

D
dapan1121 已提交
1142
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
1143

1144
  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
1145

D
dapan 已提交
1146 1147
  SOutputData sOutput = {0};
  QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
D
dapan1121 已提交
1148

1149
  if (NULL == rsp) {
D
dapan1121 已提交
1150 1151
    atomic_store_ptr(&ctx->dataConnInfo.handle, qwMsg->connInfo.handle);
    atomic_store_ptr(&ctx->dataConnInfo.ahandle, qwMsg->connInfo.ahandle);
dengyihao's avatar
dengyihao 已提交
1152

1153
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
D
dapan1121 已提交
1154
  } else {
D
dapan1121 已提交
1155
    bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
1156

D
dapan1121 已提交
1157
    qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
1158
    if (qComplete) {
dengyihao's avatar
dengyihao 已提交
1159
      atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
1160
    }
D
dapan1121 已提交
1161 1162
  }

dengyihao's avatar
dengyihao 已提交
1163
  if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
1164
    QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
D
dapan1121 已提交
1165

D
dapan1121 已提交
1166 1167
    QW_LOCK(QW_WRITE, &ctx->lock);
    locked = true;
1168

D
dapan1121 已提交
1169
    // RC WARNING
D
dapan1121 已提交
1170
    if (QW_IS_QUERY_RUNNING(ctx)) {
dengyihao's avatar
dengyihao 已提交
1171 1172
      atomic_store_8((int8_t *)&ctx->queryContinue, 1);
    } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
H
Haojun Liao 已提交
1173
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING);
D
dapan1121 已提交
1174

dengyihao's avatar
dengyihao 已提交
1175 1176
      atomic_store_8((int8_t *)&ctx->queryInQueue, 1);

D
dapan1121 已提交
1177
      QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
1178
    }
D
dapan 已提交
1179
  }
dengyihao's avatar
dengyihao 已提交
1180

D
dapan1121 已提交
1181
_return:
D
dapan1121 已提交
1182

D
dapan1121 已提交
1183 1184 1185 1186 1187
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

  input.code = code;
D
dapan1121 已提交
1188
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL);
D
dapan1121 已提交
1189

D
dapan 已提交
1190 1191 1192
  if (code) {
    qwFreeFetchRsp(rsp);
    rsp = NULL;
D
dapan1121 已提交
1193
    dataLen = 0;
D
dapan1121 已提交
1194 1195 1196
  }

  if (code || rsp) {
S
shm  
Shengliang Guan 已提交
1197
    qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
dengyihao's avatar
dengyihao 已提交
1198 1199
    QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
                 dataLen);
D
dapan1121 已提交
1200 1201
  }

D
dapan1121 已提交
1202
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1203
}
D
dapan1121 已提交
1204

D
dapan1121 已提交
1205
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
1206 1207
  int32_t     code = 0;
  bool        rsped = false;
D
dapan1121 已提交
1208
  SQWTaskCtx *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
1209
  bool        locked = false;
D
dapan1121 已提交
1210

D
dapan1121 已提交
1211 1212
  // TODO : TASK ALREADY REMOVED AND A NEW DROP MSG RECEIVED

D
dapan1121 已提交
1213
  QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
1214

D
dapan1121 已提交
1215 1216 1217 1218 1219
  QW_LOCK(QW_WRITE, &ctx->lock);

  locked = true;

  if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
1220
    QW_TASK_WLOG_E("task already dropping");
D
dapan1121 已提交
1221 1222 1223
    QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
  }

D
dapan1121 已提交
1224
  if (QW_IS_QUERY_RUNNING(ctx)) {
D
dapan1121 已提交
1225
    QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
D
dapan1121 已提交
1226
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING);
D
dapan1121 已提交
1227
  } else if (ctx->phase > 0) {
D
dapan1121 已提交
1228 1229 1230 1231
    if (0 == qwMsg->code) {
      qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
      QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
    }
1232

D
dapan1121 已提交
1233
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
1234
    rsped = true;
D
dapan1121 已提交
1235 1236
  } else {
    // task not started
D
dapan1121 已提交
1237
  }
D
dapan1121 已提交
1238

D
dapan1121 已提交
1239
  if (!rsped) {
1240
    ctx->ctrlConnInfo = qwMsg->connInfo;
dengyihao's avatar
dengyihao 已提交
1241

D
dapan1121 已提交
1242 1243
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
  }
1244

D
dapan1121 已提交
1245
_return:
D
dapan1121 已提交
1246

D
dapan1121 已提交
1247
  if (code) {
D
dapan1121 已提交
1248 1249 1250
    if (ctx) {
      QW_UPDATE_RSP_CODE(ctx, code);
    }
H
Haojun Liao 已提交
1251

D
dapan1121 已提交
1252
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
D
dapan1121 已提交
1253 1254
  }

D
dapan 已提交
1255 1256 1257 1258
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

D
dapan1121 已提交
1259
  if (ctx) {
D
dapan1121 已提交
1260
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
1261 1262
  }

D
dapan1121 已提交
1263
  if ((TSDB_CODE_SUCCESS != code) && (0 == qwMsg->code)) {
S
shm  
Shengliang Guan 已提交
1264
    qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
D
dapan1121 已提交
1265
    QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
1266
  }
D
dapan1121 已提交
1267

D
dapan1121 已提交
1268
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1269
}
D
dapan1121 已提交
1270

D
dapan1121 已提交
1271
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
dengyihao's avatar
dengyihao 已提交
1272
  int32_t         code = 0;
D
dapan1121 已提交
1273
  SSchedulerHbRsp rsp = {0};
dengyihao's avatar
dengyihao 已提交
1274
  SQWSchStatus *  sch = NULL;
D
dapan1121 已提交
1275 1276 1277 1278 1279 1280

  QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));

  QW_LOCK(QW_WRITE, &sch->hbConnLock);

  if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
1281
    tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
D
dapan1121 已提交
1282 1283 1284 1285 1286 1287 1288
    sch->hbConnInfo.handle = NULL;
    sch->hbConnInfo.ahandle = NULL;

    QW_DLOG("release hb handle due to connection broken, handle:%p", qwMsg->connInfo.handle);
  } else {
    QW_DLOG("ignore hb connection broken, handle:%p, currentHandle:%p", qwMsg->connInfo.handle, sch->hbConnInfo.handle);
  }
dengyihao's avatar
dengyihao 已提交
1289

D
dapan1121 已提交
1290
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
dengyihao's avatar
dengyihao 已提交
1291

D
dapan1121 已提交
1292
  qwReleaseScheduler(QW_READ, mgmt);
dengyihao's avatar
dengyihao 已提交
1293

D
dapan1121 已提交
1294 1295 1296
  QW_RET(TSDB_CODE_SUCCESS);
}

D
dapan1121 已提交
1297
int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
dengyihao's avatar
dengyihao 已提交
1298
  int32_t         code = 0;
D
dapan1121 已提交
1299
  SSchedulerHbRsp rsp = {0};
dengyihao's avatar
dengyihao 已提交
1300
  SQWSchStatus *  sch = NULL;
D
dapan1121 已提交
1301

D
dapan1121 已提交
1302 1303 1304
  if (qwMsg->code) {
    QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req));
  }
D
dapan1121 已提交
1305 1306 1307

  QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));

D
dapan1121 已提交
1308 1309
  QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo));

D
dapan1121 已提交
1310
  QW_LOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
1311

D
dapan1121 已提交
1312
  if (sch->hbConnInfo.handle) {
1313
    tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
S
Shengliang Guan 已提交
1314
    sch->hbConnInfo.handle = NULL;
D
dapan1121 已提交
1315
  }
D
dapan1121 已提交
1316

D
dapan1121 已提交
1317
  memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
D
dapan1121 已提交
1318
  memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId));
dengyihao's avatar
dengyihao 已提交
1319

D
dapan1121 已提交
1320
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
dengyihao's avatar
dengyihao 已提交
1321 1322 1323

  QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", req->sId,
          req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle);
D
dapan1121 已提交
1324

D
dapan1121 已提交
1325 1326 1327 1328
  qwReleaseScheduler(QW_READ, mgmt);

_return:

D
dapan1121 已提交
1329 1330
  memcpy(&rsp.epId, &req->epId, sizeof(req->epId));

S
shm  
Shengliang Guan 已提交
1331
  qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
D
dapan1121 已提交
1332 1333

  if (code) {
1334
    tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER);
S
Shengliang Guan 已提交
1335
    qwMsg->connInfo.handle = NULL;
D
dapan1121 已提交
1336
  }
dengyihao's avatar
dengyihao 已提交
1337

D
dapan1121 已提交
1338
  QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
dengyihao's avatar
dengyihao 已提交
1339

D
dapan1121 已提交
1340
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1341 1342 1343
}

void qwProcessHbTimerEvent(void *param, void *tmrId) {
D
dapan1121 已提交
1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356
  SQWHbParam* hbParam = (SQWHbParam*)param;
  if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) {
    return;
  }

  int64_t refId = hbParam->refId;
  SQWorker *mgmt = qwAcquire(refId);
  if (NULL == mgmt) {
    QW_DLOG("qwAcquire %" PRIx64 "failed", refId);
    taosMemoryFree(param);
    return;
  }
  
D
dapan1121 已提交
1357
  SQWSchStatus *sch = NULL;
dengyihao's avatar
dengyihao 已提交
1358 1359 1360
  int32_t       taskNum = 0;
  SQWHbInfo *   rspList = NULL;
  int32_t       code = 0;
D
dapan1121 已提交
1361

D
dapan1121 已提交
1362 1363
  qwDbgDumpMgmtInfo(mgmt);

D
dapan1121 已提交
1364 1365 1366 1367 1368
  QW_LOCK(QW_READ, &mgmt->schLock);

  int32_t schNum = taosHashGetSize(mgmt->schHash);
  if (schNum <= 0) {
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
1369
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
1370
    qwRelease(refId);
D
dapan1121 已提交
1371
    return;
D
dapan1121 已提交
1372 1373
  }

wafwerar's avatar
wafwerar 已提交
1374
  rspList = taosMemoryCalloc(schNum, sizeof(SQWHbInfo));
D
dapan1121 已提交
1375 1376
  if (NULL == rspList) {
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
1377 1378
    QW_ELOG("calloc %d SQWHbInfo failed", schNum);
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
1379
    qwRelease(refId);
D
dapan1121 已提交
1380
    return;
D
dapan1121 已提交
1381 1382
  }

dengyihao's avatar
dengyihao 已提交
1383 1384
  void *  key = NULL;
  size_t  keyLen = 0;
D
dapan1121 已提交
1385 1386 1387 1388
  int32_t i = 0;

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
D
dapan1121 已提交
1389 1390 1391 1392 1393 1394 1395
    SQWSchStatus *sch = (SQWSchStatus *)pIter;
    if (NULL == sch->hbConnInfo.handle) {
      uint64_t *sId = taosHashGetKey(pIter, NULL);
      QW_DLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
      pIter = taosHashIterate(mgmt->schHash, pIter);
      continue;
    }
dengyihao's avatar
dengyihao 已提交
1396

D
dapan1121 已提交
1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411
    code = qwGenerateSchHbRsp(mgmt, (SQWSchStatus *)pIter, &rspList[i]);
    if (code) {
      taosHashCancelIterate(mgmt->schHash, pIter);
      QW_ERR_JRET(code);
    }

    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

_return:

  QW_UNLOCK(QW_READ, &mgmt->schLock);

  for (int32_t j = 0; j < i; ++j) {
S
shm  
Shengliang Guan 已提交
1412
    qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code);
1413 1414
    QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),
            (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));
D
dapan1121 已提交
1415
    tFreeSSchedulerHbRsp(&rspList[j].rsp);
D
dapan1121 已提交
1416 1417
  }

wafwerar's avatar
wafwerar 已提交
1418
  taosMemoryFreeClear(rspList);
D
dapan1121 已提交
1419

dengyihao's avatar
dengyihao 已提交
1420
  taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
1421 1422 1423 1424 1425 1426 1427 1428 1429 1430
  qwRelease(refId);  
}

void qwCloseRef(void) {
  taosWLockLatch(&gQwMgmt.lock);
  if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) {
    taosCloseRef(gQwMgmt.qwRef);
    gQwMgmt.qwRef= -1;
  }
  taosWUnLockLatch(&gQwMgmt.lock);
D
dapan1121 已提交
1431 1432
}

D
dapan1121 已提交
1433 1434 1435 1436
void qwDestroySchStatus(SQWSchStatus *pStatus) {
  taosHashCleanup(pStatus->tasksHash);
}

D
dapan1121 已提交
1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447
void qwDestroyImpl(void *pMgmt) {
  SQWorker *mgmt = (SQWorker *)pMgmt;

  taosTmrStopA(&mgmt->hbTimer);
  taosTmrCleanUp(mgmt->timer);

  // TODO STOP ALL QUERY

  // TODO FREE ALL

  taosHashCleanup(mgmt->ctxHash);
D
dapan1121 已提交
1448 1449 1450 1451 1452 1453 1454

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
    SQWSchStatus *sch = (SQWSchStatus *)pIter;
    qwDestroySchStatus(sch);
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }  
D
dapan1121 已提交
1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478
  taosHashCleanup(mgmt->schHash);

  taosMemoryFree(mgmt);

  atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);  

  qwCloseRef();
}

int32_t qwOpenRef(void) {
  taosWLockLatch(&gQwMgmt.lock);
  if (gQwMgmt.qwRef < 0) {
    gQwMgmt.qwRef= taosOpenRef(100, qwDestroyImpl);
    if (gQwMgmt.qwRef < 0) {
      taosWUnLockLatch(&gQwMgmt.lock);
      qError("init qworker ref failed");
      QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }
  taosWUnLockLatch(&gQwMgmt.lock);
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500
void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
  int32_t paramIdx = 0;
  int32_t newParamIdx = 0;
  
  while (true) {
    paramIdx = atomic_load_32(&gQwMgmt.paramIdx);
    if (paramIdx == tListLen(gQwMgmt.param)) {
      newParamIdx = 0;
    } else {
      newParamIdx = paramIdx + 1;
    }
    
    if (paramIdx == atomic_val_compare_exchange_32(&gQwMgmt.paramIdx, paramIdx, newParamIdx)) {
      break;
    }
  }

  gQwMgmt.param[paramIdx].qwrId = gQwMgmt.qwRef;
  gQwMgmt.param[paramIdx].refId = refId;

  *pParam = &gQwMgmt.param[paramIdx];
}
D
dapan1121 已提交
1501

S
Shengliang Guan 已提交
1502
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
1503
  if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) {
D
dapan1121 已提交
1504 1505 1506
    qError("invalid param to init qworker");
    QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
S
Shengliang 已提交
1507

D
dapan1121 已提交
1508 1509 1510 1511
  int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1);
  if (1 == qwNum) {
    memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param));
  }
D
dapan1121 已提交
1512 1513 1514 1515 1516 1517 1518 1519

  int32_t code = qwOpenRef();
  if (code) {
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
    QW_RET(code);
  }

  SQWorker *mgmt = taosMemoryCalloc(1, sizeof(SQWorker));
D
dapan1121 已提交
1520
  if (NULL == mgmt) {
D
dapan1121 已提交
1521 1522
    qError("calloc %d failed", (int32_t)sizeof(SQWorker));
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
D
dapan1121 已提交
1523
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1524 1525 1526 1527
  }

  if (cfg) {
    mgmt->cfg = *cfg;
D
dapan1121 已提交
1528
    if (0 == mgmt->cfg.maxSchedulerNum) {
D
dapan1121 已提交
1529
      mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
D
dapan1121 已提交
1530 1531
    }
    if (0 == mgmt->cfg.maxTaskNum) {
D
dapan1121 已提交
1532
      mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
D
dapan1121 已提交
1533 1534
    }
    if (0 == mgmt->cfg.maxSchTaskNum) {
D
dapan1121 已提交
1535
      mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
1536
    }
D
dapan1121 已提交
1537
  } else {
D
dapan1121 已提交
1538 1539 1540
    mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
    mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
    mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
1541 1542
  }

dengyihao's avatar
dengyihao 已提交
1543 1544
  mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                               HASH_ENTRY_LOCK);
D
dapan1121 已提交
1545
  if (NULL == mgmt->schHash) {
wafwerar's avatar
wafwerar 已提交
1546
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1547
    qError("init %d scheduler hash failed", mgmt->cfg.maxSchedulerNum);
D
dapan1121 已提交
1548
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1549 1550
  }

dengyihao's avatar
dengyihao 已提交
1551 1552
  mgmt->ctxHash =
      taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1553
  if (NULL == mgmt->ctxHash) {
D
dapan1121 已提交
1554
    qError("init %d task ctx hash failed", mgmt->cfg.maxTaskNum);
D
dapan1121 已提交
1555 1556 1557 1558 1559 1560 1561 1562 1563
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  mgmt->timer = taosTmrInit(0, 0, 0, "qworker");
  if (NULL == mgmt->timer) {
    qError("init timer failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1564 1565
  mgmt->nodeType = nodeType;
  mgmt->nodeId = nodeId;
S
Shengliang Guan 已提交
1566
  mgmt->msgCb = *pMsgCb;
D
dapan1121 已提交
1567

D
dapan1121 已提交
1568 1569 1570 1571 1572 1573
  mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt);
  if (mgmt->refId < 0) {
    qError("taosAddRef qw failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(terrno);
  }

D
dapan1121 已提交
1574 1575 1576
  SQWHbParam *param = NULL;
  qwSetHbParam(mgmt->refId, &param);

D
dapan1121 已提交
1577 1578 1579 1580 1581 1582
  mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void*)param, mgmt->timer);
  if (NULL == mgmt->hbTimer) {
    qError("start hb timer failed");
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  
D
dapan1121 已提交
1583 1584
  *qWorkerMgmt = mgmt;

D
dapan1121 已提交
1585 1586
  qDebug("qworker initialized for node, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt);

D
dapan1121 已提交
1587
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1588 1589 1590

_return:

D
dapan1121 已提交
1591 1592 1593 1594 1595 1596 1597
  if (mgmt->refId >= 0) {
    qwRelease(mgmt->refId);
  } else {
    taosHashCleanup(mgmt->schHash);
    taosHashCleanup(mgmt->ctxHash);
    taosTmrCleanUp(mgmt->timer);
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1598

D
dapan1121 已提交
1599 1600 1601
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);    
  }
  
D
dapan1121 已提交
1602
  QW_RET(code);
D
dapan1121 已提交
1603
}
D
dapan1121 已提交
1604 1605 1606 1607

void qWorkerDestroy(void **qWorkerMgmt) {
  if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
    return;
D
dapan1121 已提交
1608
  }
D
dapan 已提交
1609

D
dapan1121 已提交
1610
  SQWorker *mgmt = *qWorkerMgmt;
D
dapan1121 已提交
1611

D
dapan1121 已提交
1612 1613 1614
  if (taosRemoveRef(gQwMgmt.qwRef, mgmt->refId)) {
    qError("remove qw from ref list failed, refId:%" PRIx64, mgmt->refId);
  }
D
dapan1121 已提交
1615
}
D
dapan1121 已提交
1616

D
dapan1121 已提交
1617
int32_t qwGetSchTasksStatus(SQWorker *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) {
dengyihao's avatar
dengyihao 已提交
1618 1619 1620
  /*
    SQWSchStatus *sch = NULL;
    int32_t taskNum = 0;
1621

dengyihao's avatar
dengyihao 已提交
1622
    QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch));
1623

dengyihao's avatar
dengyihao 已提交
1624
    sch->lastAccessTs = taosGetTimestampSec();
1625

dengyihao's avatar
dengyihao 已提交
1626
    QW_LOCK(QW_READ, &sch->tasksLock);
1627

dengyihao's avatar
dengyihao 已提交
1628
    taskNum = taosHashGetSize(sch->tasksHash);
D
dapan1121 已提交
1629

dengyihao's avatar
dengyihao 已提交
1630 1631 1632 1633 1634 1635
    int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum;
    *rsp = taosMemoryCalloc(1, size);
    if (NULL == *rsp) {
      QW_SCH_ELOG("calloc %d failed", size);
      QW_UNLOCK(QW_READ, &sch->tasksLock);
      qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
1636

dengyihao's avatar
dengyihao 已提交
1637 1638
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
D
dapan1121 已提交
1639

dengyihao's avatar
dengyihao 已提交
1640 1641 1642 1643 1644 1645 1646 1647
    void *key = NULL;
    size_t keyLen = 0;
    int32_t i = 0;

    void *pIter = taosHashIterate(sch->tasksHash, NULL);
    while (pIter) {
      SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
      taosHashGetKey(pIter, &key, &keyLen);
D
dapan1121 已提交
1648

dengyihao's avatar
dengyihao 已提交
1649 1650
      QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId);
      (*rsp)->status[i].status = taskStatus->status;
D
dapan1121 已提交
1651

dengyihao's avatar
dengyihao 已提交
1652 1653 1654
      ++i;
      pIter = taosHashIterate(sch->tasksHash, pIter);
    }
D
dapan1121 已提交
1655

dengyihao's avatar
dengyihao 已提交
1656 1657 1658 1659 1660 1661 1662
    QW_UNLOCK(QW_READ, &sch->tasksLock);
    qwReleaseScheduler(QW_READ, mgmt);

    (*rsp)->num = taskNum;
  */
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
1663

D
dapan1121 已提交
1664
int32_t qwUpdateSchLastAccess(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
D
dapan1121 已提交
1665
  SQWSchStatus *sch = NULL;
D
dapan1121 已提交
1666

dengyihao's avatar
dengyihao 已提交
1667 1668
  /*
    QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
D
dapan1121 已提交
1669

dengyihao's avatar
dengyihao 已提交
1670
    sch->lastAccessTs = taosGetTimestampSec();
D
dapan1121 已提交
1671

dengyihao's avatar
dengyihao 已提交
1672 1673
    qwReleaseScheduler(QW_READ, mgmt);
  */
D
dapan1121 已提交
1674 1675 1676
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1677
int32_t qwGetTaskStatus(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t *taskStatus) {
dengyihao's avatar
dengyihao 已提交
1678
  SQWSchStatus * sch = NULL;
D
dapan1121 已提交
1679
  SQWTaskStatus *task = NULL;
dengyihao's avatar
dengyihao 已提交
1680
  int32_t        code = 0;
D
dapan1121 已提交
1681

dengyihao's avatar
dengyihao 已提交
1682 1683 1684 1685 1686
  /*
    if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch)) {
      *taskStatus = JOB_TASK_STATUS_NULL;
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
1687

dengyihao's avatar
dengyihao 已提交
1688 1689
    if (qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)) {
      qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
1690

dengyihao's avatar
dengyihao 已提交
1691 1692 1693
      *taskStatus = JOB_TASK_STATUS_NULL;
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
1694

dengyihao's avatar
dengyihao 已提交
1695 1696 1697 1698 1699
    *taskStatus = task->status;

    qwReleaseTask(QW_READ, sch);
    qwReleaseScheduler(QW_READ, mgmt);
  */
D
dapan1121 已提交
1700 1701 1702 1703

  QW_RET(code);
}

D
dapan1121 已提交
1704
int32_t qwCancelTask(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
dengyihao's avatar
dengyihao 已提交
1705
  SQWSchStatus * sch = NULL;
D
dapan1121 已提交
1706
  SQWTaskStatus *task = NULL;
dengyihao's avatar
dengyihao 已提交
1707 1708 1709 1710
  int32_t        code = 0;

  /*
    QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch));
D
dapan1121 已提交
1711

dengyihao's avatar
dengyihao 已提交
1712
    QW_ERR_JRET(qwAcquireAddTask(mgmt, QW_READ, sch, qId, tId, JOB_TASK_STATUS_NOT_START, &task));
D
dapan1121 已提交
1713

D
dapan1121 已提交
1714

dengyihao's avatar
dengyihao 已提交
1715
    QW_LOCK(QW_WRITE, &task->lock);
D
dapan1121 已提交
1716

dengyihao's avatar
dengyihao 已提交
1717
    task->cancel = true;
D
dapan1121 已提交
1718

dengyihao's avatar
dengyihao 已提交
1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732
    int8_t oriStatus = task->status;
    int8_t newStatus = 0;

    if (task->status == JOB_TASK_STATUS_CANCELLED || task->status == JOB_TASK_STATUS_NOT_START || task->status ==
  JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING) { QW_UNLOCK(QW_WRITE, &task->lock);

      qwReleaseTask(QW_READ, sch);
      qwReleaseScheduler(QW_READ, mgmt);

      return TSDB_CODE_SUCCESS;
    } else if (task->status == JOB_TASK_STATUS_FAILED || task->status == JOB_TASK_STATUS_SUCCEED || task->status ==
  JOB_TASK_STATUS_PARTIAL_SUCCEED) { QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED)); } else {
      QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLING));
    }
D
dapan1121 已提交
1733

D
dapan1121 已提交
1734
    QW_UNLOCK(QW_WRITE, &task->lock);
D
dapan1121 已提交
1735

D
dapan1121 已提交
1736 1737
    qwReleaseTask(QW_READ, sch);
    qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
1738

dengyihao's avatar
dengyihao 已提交
1739 1740 1741
    if (oriStatus == JOB_TASK_STATUS_EXECUTING) {
      //TODO call executer to cancel subquery async
    }
D
dapan1121 已提交
1742

dengyihao's avatar
dengyihao 已提交
1743
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1744

dengyihao's avatar
dengyihao 已提交
1745
  _return:
D
dapan1121 已提交
1746

dengyihao's avatar
dengyihao 已提交
1747 1748
    if (task) {
      QW_UNLOCK(QW_WRITE, &task->lock);
D
dapan1121 已提交
1749

dengyihao's avatar
dengyihao 已提交
1750 1751 1752 1753 1754 1755 1756
      qwReleaseTask(QW_READ, sch);
    }

    if (sch) {
      qwReleaseScheduler(QW_READ, mgmt);
    }
  */
D
dapan1121 已提交
1757

D
dapan1121 已提交
1758
  QW_RET(code);
D
dapan1121 已提交
1759
}