qworker.c 48.5 KB
Newer Older
D
dapan1121 已提交
1
#include "qworker.h"
dengyihao's avatar
dengyihao 已提交
2
#include "dataSinkMgt.h"
3
#include "executor.h"
D
dapan1121 已提交
4
#include "planner.h"
H
Haojun Liao 已提交
5 6
#include "query.h"
#include "qworkerInt.h"
D
dapan1121 已提交
7
#include "qworkerMsg.h"
dengyihao's avatar
dengyihao 已提交
8
#include "tcommon.h"
H
Haojun Liao 已提交
9
#include "tmsg.h"
10
#include "tname.h"
D
dapan1121 已提交
11

12
SQWDebug     gQWDebug = {.statusEnable = true, .dumpEnable = true};
D
dapan1121 已提交
13
SQWorkerMgmt gQwMgmt = {
14 15 16
    .lock = 0,
    .qwRef = -1,
    .qwNum = 0,
D
dapan1121 已提交
17
};
18

D
dapan1121 已提交
19 20 21
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
  if (!gQWDebug.statusEnable) {
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
22
  }
23

D
dapan1121 已提交
24
  int32_t code = 0;
D
dapan1121 已提交
25

D
dapan1121 已提交
26
  if (oriStatus == newStatus) {
D
dapan1121 已提交
27 28 29 30
    if (newStatus == JOB_TASK_STATUS_EXECUTING || newStatus == JOB_TASK_STATUS_FAILED) {
      *ignore = true;
      return TSDB_CODE_SUCCESS;
    }
H
Haojun Liao 已提交
31

D
dapan1121 已提交
32 33
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }
dengyihao's avatar
dengyihao 已提交
34

D
dapan1121 已提交
35 36
  switch (oriStatus) {
    case JOB_TASK_STATUS_NULL:
dengyihao's avatar
dengyihao 已提交
37 38
      if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED &&
          newStatus != JOB_TASK_STATUS_NOT_START) {
D
dapan1121 已提交
39 40
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
41

D
dapan1121 已提交
42 43
      break;
    case JOB_TASK_STATUS_NOT_START:
D
dapan1121 已提交
44
      if (newStatus != JOB_TASK_STATUS_CANCELLED) {
D
dapan1121 已提交
45 46
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
47

D
dapan1121 已提交
48 49
      break;
    case JOB_TASK_STATUS_EXECUTING:
dengyihao's avatar
dengyihao 已提交
50 51 52
      if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED && newStatus != JOB_TASK_STATUS_SUCCEED &&
          newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_CANCELLING &&
          newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
53 54
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
55

D
dapan1121 已提交
56 57
      break;
    case JOB_TASK_STATUS_PARTIAL_SUCCEED:
dengyihao's avatar
dengyihao 已提交
58 59 60
      if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_SUCCEED &&
          newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_FAILED &&
          newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
61 62
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
63

D
dapan1121 已提交
64 65
      break;
    case JOB_TASK_STATUS_SUCCEED:
dengyihao's avatar
dengyihao 已提交
66 67
      if (newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING &&
          newStatus != JOB_TASK_STATUS_FAILED) {
D
dapan1121 已提交
68 69 70 71
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
72
    case JOB_TASK_STATUS_FAILED:
D
dapan1121 已提交
73 74 75 76
      if (newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING) {
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      break;
H
Haojun Liao 已提交
77

D
dapan1121 已提交
78 79 80 81
    case JOB_TASK_STATUS_CANCELLING:
      if (newStatus != JOB_TASK_STATUS_CANCELLED) {
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
dengyihao's avatar
dengyihao 已提交
82

D
dapan1121 已提交
83 84
      break;
    case JOB_TASK_STATUS_CANCELLED:
D
dapan1121 已提交
85
    case JOB_TASK_STATUS_DROPPING:
D
dapan1121 已提交
86 87 88
      if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
D
dapan1121 已提交
89
      break;
dengyihao's avatar
dengyihao 已提交
90

D
dapan1121 已提交
91
    default:
D
dapan1121 已提交
92
      QW_TASK_ELOG("invalid task origStatus:%s", jobTaskStatusStr(oriStatus));
D
dapan1121 已提交
93 94 95
      return TSDB_CODE_QRY_APP_ERROR;
  }

D
dapan1121 已提交
96
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
97

D
dapan1121 已提交
98
_return:
D
dapan1121 已提交
99

D
dapan1121 已提交
100
  QW_TASK_ELOG("invalid task status update from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan1121 已提交
101
  QW_RET(code);
D
dapan1121 已提交
102 103
}

dengyihao's avatar
dengyihao 已提交
104
void qwDbgDumpSchInfo(SQWSchStatus *sch, int32_t i) {}
D
dapan1121 已提交
105

D
dapan1121 已提交
106
void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
D
dapan1121 已提交
107 108 109 110 111
  if (!gQWDebug.dumpEnable) {
    return;
  }

  QW_LOCK(QW_READ, &mgmt->schLock);
dengyihao's avatar
dengyihao 已提交
112

113
  /*QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash));*/
D
dapan1121 已提交
114

115
  void         *key = NULL;
dengyihao's avatar
dengyihao 已提交
116 117
  size_t        keyLen = 0;
  int32_t       i = 0;
D
dapan1121 已提交
118 119 120 121 122 123 124 125 126 127 128 129
  SQWSchStatus *sch = NULL;

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
    sch = (SQWSchStatus *)pIter;
    qwDbgDumpSchInfo(sch, i);
    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

  QW_UNLOCK(QW_READ, &mgmt->schLock);

130
  /*QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash));*/
D
dapan1121 已提交
131
}
D
dapan1121 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168

char *qwPhaseStr(int32_t phase) {
  switch (phase) {
    case QW_PHASE_PRE_QUERY:
      return "PRE_QUERY";
    case QW_PHASE_POST_QUERY:
      return "POST_QUERY";
    case QW_PHASE_PRE_FETCH:
      return "PRE_FETCH";
    case QW_PHASE_POST_FETCH:
      return "POST_FETCH";
    case QW_PHASE_PRE_CQUERY:
      return "PRE_CQUERY";
    case QW_PHASE_POST_CQUERY:
      return "POST_CQUERY";
    default:
      break;
  }

  return "UNKNOWN";
}

char *qwBufStatusStr(int32_t bufStatus) {
  switch (bufStatus) {
    case DS_BUF_LOW:
      return "LOW";
    case DS_BUF_FULL:
      return "FULL";
    case DS_BUF_EMPTY:
      return "EMPTY";
    default:
      break;
  }

  return "UNKNOWN";
}

D
dapan1121 已提交
169
int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) {
D
dapan1121 已提交
170
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
171 172
  int8_t  origStatus = 0;
  bool    ignore = false;
D
dapan1121 已提交
173 174 175

  while (true) {
    origStatus = atomic_load_8(&task->status);
dengyihao's avatar
dengyihao 已提交
176

D
dapan1121 已提交
177 178 179 180
    QW_ERR_RET(qwDbgValidateStatus(QW_FPARAMS(), origStatus, status, &ignore));
    if (ignore) {
      break;
    }
dengyihao's avatar
dengyihao 已提交
181

D
dapan1121 已提交
182 183
    if (origStatus != atomic_val_compare_exchange_8(&task->status, origStatus, status)) {
      continue;
D
dapan1121 已提交
184
    }
dengyihao's avatar
dengyihao 已提交
185

D
dapan1121 已提交
186
    QW_TASK_DLOG("task status updated from %s to %s", jobTaskStatusStr(origStatus), jobTaskStatusStr(status));
D
dapan1121 已提交
187 188

    break;
D
dapan1121 已提交
189
  }
dengyihao's avatar
dengyihao 已提交
190

D
dapan1121 已提交
191 192 193
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
194
int32_t qwAddSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType) {
D
dapan1121 已提交
195
  SQWSchStatus newSch = {0};
dengyihao's avatar
dengyihao 已提交
196 197
  newSch.tasksHash =
      taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
D
dapan1121 已提交
198
  if (NULL == newSch.tasksHash) {
D
dapan1121 已提交
199 200
    QW_SCH_ELOG("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
201 202
  }

D
dapan1121 已提交
203 204 205 206 207
  QW_LOCK(QW_WRITE, &mgmt->schLock);
  int32_t code = taosHashPut(mgmt->schHash, &sId, sizeof(sId), &newSch, sizeof(newSch));
  if (0 != code) {
    if (!HASH_NODE_EXIST(code)) {
      QW_UNLOCK(QW_WRITE, &mgmt->schLock);
dengyihao's avatar
dengyihao 已提交
208

D
dapan1121 已提交
209 210 211
      QW_SCH_ELOG("taosHashPut new sch to scheduleHash failed, errno:%d", errno);
      taosHashCleanup(newSch.tasksHash);
      QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
212
    }
D
dapan1121 已提交
213 214

    taosHashCleanup(newSch.tasksHash);
D
dapan1121 已提交
215
  }
D
dapan1121 已提交
216
  QW_UNLOCK(QW_WRITE, &mgmt->schLock);
D
dapan1121 已提交
217

dengyihao's avatar
dengyihao 已提交
218
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
219 220
}

D
dapan1121 已提交
221
int32_t qwAcquireSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
D
dapan1121 已提交
222 223 224 225 226
  while (true) {
    QW_LOCK(rwType, &mgmt->schLock);
    *sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
    if (NULL == (*sch)) {
      QW_UNLOCK(rwType, &mgmt->schLock);
dengyihao's avatar
dengyihao 已提交
227

D
dapan1121 已提交
228
      if (QW_NOT_EXIST_ADD == nOpt) {
D
dapan1121 已提交
229
        QW_ERR_RET(qwAddSchedulerImpl(mgmt, sId, rwType));
D
dapan1121 已提交
230 231

        nOpt = QW_NOT_EXIST_RET_ERR;
dengyihao's avatar
dengyihao 已提交
232

D
dapan1121 已提交
233 234 235 236
        continue;
      } else if (QW_NOT_EXIST_RET_ERR == nOpt) {
        QW_RET(TSDB_CODE_QRY_SCH_NOT_EXIST);
      } else {
D
dapan1121 已提交
237
        QW_SCH_ELOG("unknown notExistOpt:%d", nOpt);
D
dapan1121 已提交
238
        QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
239
      }
D
dapan1121 已提交
240
    }
D
dapan1121 已提交
241 242

    break;
D
dapan1121 已提交
243 244 245 246 247
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
248
int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
D
dapan1121 已提交
249
  return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_ADD);
D
dapan1121 已提交
250 251
}

D
dapan1121 已提交
252
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
D
dapan1121 已提交
253
  return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_RET_ERR);
D
dapan1121 已提交
254 255
}

D
dapan1121 已提交
256
void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); }
D
dapan1121 已提交
257

D
dapan1121 已提交
258
int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) {
D
dapan1121 已提交
259 260 261 262 263 264 265 266 267 268 269 270 271
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);

  QW_LOCK(rwType, &sch->tasksLock);
  *task = taosHashGet(sch->tasksHash, id, sizeof(id));
  if (NULL == (*task)) {
    QW_UNLOCK(rwType, &sch->tasksLock);
    QW_ERR_RET(TSDB_CODE_QRY_TASK_NOT_EXIST);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
272
int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) {
D
dapan1121 已提交
273 274
  int32_t code = 0;

D
dapan1121 已提交
275 276
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
D
dapan1121 已提交
277

D
dapan1121 已提交
278 279
  SQWTaskStatus ntask = {0};
  ntask.status = status;
D
dapan1121 已提交
280
  ntask.refId = rId;
D
dapan1121 已提交
281 282 283 284 285 286

  QW_LOCK(QW_WRITE, &sch->tasksLock);
  code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask));
  if (0 != code) {
    QW_UNLOCK(QW_WRITE, &sch->tasksLock);
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
287 288
      if (rwType && task) {
        QW_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
D
dapan1121 已提交
289
      } else {
D
dapan1121 已提交
290
        QW_TASK_ELOG("task status already exist, newStatus:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
291
        QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
D
dapan1121 已提交
292 293
      }
    } else {
D
dapan1121 已提交
294
      QW_TASK_ELOG("taosHashPut to tasksHash failed, error:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
295
      QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
296 297 298
    }
  }
  QW_UNLOCK(QW_WRITE, &sch->tasksLock);
D
dapan1121 已提交
299

D
dapan1121 已提交
300 301
  QW_TASK_DLOG("task status added, newStatus:%s", jobTaskStatusStr(status));

D
dapan1121 已提交
302 303
  if (rwType && task) {
    QW_ERR_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
D
dapan1121 已提交
304 305
  }

D
dapan1121 已提交
306 307
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
308

D
dapan1121 已提交
309
int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status) {
D
dapan1121 已提交
310
  SQWSchStatus *tsch = NULL;
dengyihao's avatar
dengyihao 已提交
311
  int32_t       code = 0;
D
dapan1121 已提交
312
  QW_ERR_RET(qwAcquireAddScheduler(mgmt, sId, QW_READ, &tsch));
D
dapan1121 已提交
313

D
dapan1121 已提交
314
  QW_ERR_JRET(qwAddTaskStatusImpl(QW_FPARAMS(), tsch, 0, status, NULL));
D
dapan1121 已提交
315 316 317 318

_return:

  qwReleaseScheduler(QW_READ, mgmt);
dengyihao's avatar
dengyihao 已提交
319

D
dapan1121 已提交
320
  QW_RET(code);
D
dapan1121 已提交
321 322
}

dengyihao's avatar
dengyihao 已提交
323 324
int32_t qwAddAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, int32_t status,
                               SQWTaskStatus **task) {
D
dapan1121 已提交
325
  return qwAddTaskStatusImpl(QW_FPARAMS(), sch, rwType, status, task);
D
dapan1121 已提交
326
}
D
dapan1121 已提交
327

dengyihao's avatar
dengyihao 已提交
328
void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) { QW_UNLOCK(rwType, &sch->tasksLock); }
D
dapan1121 已提交
329

D
dapan1121 已提交
330
int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
D
dapan1121 已提交
331 332
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
H
Haojun Liao 已提交
333

D
dapan1121 已提交
334
  *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
D
dapan1121 已提交
335
  if (NULL == (*ctx)) {
D
dapan1121 已提交
336
    QW_TASK_DLOG_E("task ctx not exist, may be dropped");
D
dapan1121 已提交
337
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
338 339 340 341 342
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
343
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
D
dapan1121 已提交
344 345
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
dengyihao's avatar
dengyihao 已提交
346

D
dapan1121 已提交
347 348
  *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
  if (NULL == (*ctx)) {
D
dapan1121 已提交
349
    QW_TASK_DLOG_E("task ctx not exist, may be dropped");
D
dapan1121 已提交
350
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
351 352 353 354 355
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
356
int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) {
D
dapan1121 已提交
357 358 359
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);

D
dapan1121 已提交
360
  SQWTaskCtx nctx = {0};
D
dapan1121 已提交
361

D
dapan1121 已提交
362
  int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &nctx, sizeof(SQWTaskCtx));
D
dapan1121 已提交
363 364
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
365
      if (acquire && ctx) {
D
dapan1121 已提交
366
        QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
367 368
      } else if (ctx) {
        QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
369
      } else {
D
dapan1121 已提交
370
        QW_TASK_ELOG_E("task ctx already exist");
D
dapan1121 已提交
371 372 373
        QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
      }
    } else {
D
dapan1121 已提交
374
      QW_TASK_ELOG("taosHashPut to ctxHash failed, error:%x", code);
D
dapan1121 已提交
375 376 377 378
      QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

D
dapan1121 已提交
379
  if (acquire && ctx) {
D
dapan1121 已提交
380
    QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
381 382
  } else if (ctx) {
    QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
383 384 385 386 387
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
388
int32_t qwAddTaskCtx(QW_FPARAMS_DEF) { QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, NULL)); }
D
dapan1121 已提交
389

dengyihao's avatar
dengyihao 已提交
390
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTaskCtxImpl(QW_FPARAMS(), true, ctx); }
D
dapan1121 已提交
391

D
dapan1121 已提交
392
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); }
D
dapan1121 已提交
393

dengyihao's avatar
dengyihao 已提交
394
void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) {
D
dapan1121 已提交
395
  // Note: free/kill may in RC
D
dapan1121 已提交
396 397 398
  qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
  if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
    qDestroyTask(otaskHandle);
D
dapan1121 已提交
399 400
  }
}
D
dapan1121 已提交
401

D
dapan1121 已提交
402 403
int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
  int32_t code = 0;
D
dapan1121 已提交
404
  // Note: free/kill may in RC
D
dapan1121 已提交
405 406
  qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
  if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
D
dapan1121 已提交
407
    code = qAsyncKillTask(taskHandle);
D
dapan1121 已提交
408
    atomic_store_ptr(&ctx->taskHandle, taskHandle);
D
dapan1121 已提交
409
  }
D
dapan1121 已提交
410

D
dapan1121 已提交
411 412 413 414
  QW_RET(code);
}

void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
415
  tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER);
D
dapan1121 已提交
416
  ctx->ctrlConnInfo.handle = NULL;
dengyihao's avatar
dengyihao 已提交
417
  ctx->ctrlConnInfo.refId = -1;
D
dapan1121 已提交
418 419

  // NO need to release dataConnInfo
D
dapan1121 已提交
420

D
dapan1121 已提交
421
  qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle);
dengyihao's avatar
dengyihao 已提交
422

D
dapan1121 已提交
423 424 425
  if (ctx->sinkHandle) {
    dsDestroyDataSinker(ctx->sinkHandle);
    ctx->sinkHandle = NULL;
D
dapan1121 已提交
426
  }
D
dapan1121 已提交
427 428 429 430 431

  if (ctx->plan) {
    nodesDestroyNode(ctx->plan);
    ctx->plan = NULL;
  }
D
dapan1121 已提交
432
}
D
dapan1121 已提交
433

D
dapan1121 已提交
434
int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
D
dapan1121 已提交
435 436 437
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
  SQWTaskCtx octx;
D
dapan1121 已提交
438

D
dapan1121 已提交
439 440
  SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
  if (NULL == ctx) {
D
dapan1121 已提交
441
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
442
  }
D
dapan1121 已提交
443

D
dapan1121 已提交
444
  octx = *ctx;
D
dapan1121 已提交
445

D
dapan1121 已提交
446 447
  atomic_store_ptr(&ctx->taskHandle, NULL);
  atomic_store_ptr(&ctx->sinkHandle, NULL);
D
dapan1121 已提交
448
  atomic_store_ptr(&ctx->plan, NULL);
D
dapan1121 已提交
449

D
dapan1121 已提交
450 451
  QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP);

D
dapan1121 已提交
452
  if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
dengyihao's avatar
dengyihao 已提交
453
    QW_TASK_ELOG_E("taosHashRemove from ctx hash failed");
D
dapan1121 已提交
454
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
455 456
  }

D
dapan1121 已提交
457
  qwFreeTask(QW_FPARAMS(), &octx);
D
dapan1121 已提交
458 459

  QW_TASK_DLOG_E("task ctx dropped");
dengyihao's avatar
dengyihao 已提交
460

D
dapan1121 已提交
461 462 463
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
464
int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
465
  SQWSchStatus  *sch = NULL;
D
dapan1121 已提交
466
  SQWTaskStatus *task = NULL;
dengyihao's avatar
dengyihao 已提交
467 468
  int32_t        code = 0;

D
dapan1121 已提交
469 470
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
D
dapan1121 已提交
471

D
dapan1121 已提交
472
  if (qwAcquireScheduler(mgmt, sId, QW_WRITE, &sch)) {
D
dapan1121 已提交
473
    QW_TASK_WLOG_E("scheduler does not exist");
D
dapan1121 已提交
474 475
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
476

D
dapan1121 已提交
477 478
  if (qwAcquireTaskStatus(QW_FPARAMS(), QW_WRITE, sch, &task)) {
    qwReleaseScheduler(QW_WRITE, mgmt);
dengyihao's avatar
dengyihao 已提交
479

D
dapan1121 已提交
480
    QW_TASK_WLOG_E("task does not exist");
D
dapan1121 已提交
481 482
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
483

D
dapan1121 已提交
484
  if (taosHashRemove(sch->tasksHash, id, sizeof(id))) {
D
dapan1121 已提交
485
    QW_TASK_ELOG_E("taosHashRemove task from hash failed");
D
dapan1121 已提交
486 487 488
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

D
dapan1121 已提交
489
  QW_TASK_DLOG_E("task status dropped");
D
dapan1121 已提交
490 491 492

_return:

D
dapan1121 已提交
493 494 495
  if (task) {
    qwReleaseTaskStatus(QW_WRITE, sch);
  }
D
dapan1121 已提交
496
  qwReleaseScheduler(QW_WRITE, mgmt);
dengyihao's avatar
dengyihao 已提交
497

D
dapan1121 已提交
498 499 500
  QW_RET(code);
}

D
dapan1121 已提交
501
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) {
502
  SQWSchStatus  *sch = NULL;
D
dapan1121 已提交
503
  SQWTaskStatus *task = NULL;
dengyihao's avatar
dengyihao 已提交
504
  int32_t        code = 0;
D
dapan1121 已提交
505

D
dapan1121 已提交
506
  QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch));
D
dapan1121 已提交
507
  QW_ERR_JRET(qwAcquireTaskStatus(QW_FPARAMS(), QW_READ, sch, &task));
D
dapan1121 已提交
508

D
dapan1121 已提交
509
  QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status));
dengyihao's avatar
dengyihao 已提交
510

D
dapan1121 已提交
511 512
_return:

D
dapan1121 已提交
513 514 515
  if (task) {
    qwReleaseTaskStatus(QW_READ, sch);
  }
D
dapan1121 已提交
516
  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
517 518 519 520

  QW_RET(code);
}

D
dapan1121 已提交
521
int32_t qwDropTask(QW_FPARAMS_DEF) {
H
Haojun Liao 已提交
522
  QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS()));
D
dapan1121 已提交
523
  QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS()));
H
Haojun Liao 已提交
524

D
dapan1121 已提交
525 526
  QW_TASK_DLOG_E("task is dropped");

D
dapan1121 已提交
527 528 529
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
530
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
dengyihao's avatar
dengyihao 已提交
531
  qTaskInfo_t *taskHandle = &ctx->taskHandle;
D
dapan1121 已提交
532 533 534 535

  if (TASK_TYPE_TEMP == ctx->taskType) {
    if (ctx->explain) {
      SExplainExecInfo *execInfo = NULL;
dengyihao's avatar
dengyihao 已提交
536
      int32_t           resNum = 0;
D
dapan1121 已提交
537
      QW_ERR_RET(qGetExplainExecInfo(ctx->taskHandle, &resNum, &execInfo));
538 539 540 541

      SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
      connInfo.ahandle = NULL;
      QW_ERR_RET(qwBuildAndSendExplainRsp(&connInfo, execInfo, resNum));
D
dapan1121 已提交
542
    }
dengyihao's avatar
dengyihao 已提交
543

D
dapan1121 已提交
544 545 546 547 548 549
    qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
550
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
dengyihao's avatar
dengyihao 已提交
551 552
  int32_t        code = 0;
  bool           qcontinue = true;
553
  SSDataBlock   *pRes = NULL;
dengyihao's avatar
dengyihao 已提交
554 555 556
  uint64_t       useconds = 0;
  int32_t        i = 0;
  int32_t        execNum = 0;
557
  qTaskInfo_t   *taskHandle = &ctx->taskHandle;
D
dapan1121 已提交
558
  DataSinkHandle sinkHandle = ctx->sinkHandle;
dengyihao's avatar
dengyihao 已提交
559

D
dapan1121 已提交
560
  while (true) {
H
Haojun Liao 已提交
561
    QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
D
dapan1121 已提交
562

D
dapan1121 已提交
563
    code = qExecTask(*taskHandle, &pRes, &useconds);
D
dapan1121 已提交
564
    if (code) {
D
dapan1121 已提交
565 566
      QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
      QW_ERR_RET(code);
D
dapan1121 已提交
567 568
    }

D
dapan1121 已提交
569 570
    ++execNum;

D
dapan1121 已提交
571
    if (NULL == pRes) {
dengyihao's avatar
dengyihao 已提交
572
      QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
H
Haojun Liao 已提交
573

D
dapan1121 已提交
574
      dsEndPut(sinkHandle, useconds);
D
dapan1121 已提交
575 576

      QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
D
dapan1121 已提交
577 578 579 580

      if (queryEnd) {
        *queryEnd = true;
      }
dengyihao's avatar
dengyihao 已提交
581

D
dapan1121 已提交
582 583 584
      break;
    }

D
dapan1121 已提交
585
    int32_t rows = pRes->info.rows;
H
Haojun Liao 已提交
586

587 588
    ASSERT(pRes->info.rows > 0);

H
Haojun Liao 已提交
589
    SInputData inputData = {.pData = pRes};
D
dapan1121 已提交
590 591
    code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
    if (code) {
D
dapan1121 已提交
592 593
      QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code));
      QW_ERR_RET(code);
D
dapan1121 已提交
594
    }
D
dapan1121 已提交
595

D
dapan1121 已提交
596
    QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", rows, qcontinue);
dengyihao's avatar
dengyihao 已提交
597

D
dapan1121 已提交
598 599 600 601
    if (!qcontinue) {
      break;
    }

D
dapan1121 已提交
602 603 604 605 606
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) {
      break;
    }

    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
607 608
      break;
    }
D
dapan1121 已提交
609

D
dapan1121 已提交
610 611 612
    if (atomic_load_32(&ctx->rspCode)) {
      break;
    }
D
dapan1121 已提交
613 614 615 616
  }

  QW_RET(code);
}
D
dapan1121 已提交
617

D
dapan1121 已提交
618
int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
D
dapan1121 已提交
619 620
  int32_t taskNum = 0;

D
dapan1121 已提交
621
  hbInfo->connInfo = sch->hbConnInfo;
D
dapan1121 已提交
622
  hbInfo->rsp.epId = sch->hbEpId;
D
dapan1121 已提交
623

D
dapan1121 已提交
624
  QW_LOCK(QW_READ, &sch->tasksLock);
dengyihao's avatar
dengyihao 已提交
625

D
dapan1121 已提交
626
  taskNum = taosHashGetSize(sch->tasksHash);
D
dapan1121 已提交
627 628 629

  hbInfo->rsp.taskStatus = taosArrayInit(taskNum, sizeof(STaskStatus));
  if (NULL == hbInfo->rsp.taskStatus) {
D
dapan1121 已提交
630
    QW_UNLOCK(QW_READ, &sch->tasksLock);
D
dapan1121 已提交
631
    QW_ELOG("taosArrayInit taskStatus failed, num:%d", taskNum);
D
dapan1121 已提交
632 633 634
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }

635
  void       *key = NULL;
dengyihao's avatar
dengyihao 已提交
636 637
  size_t      keyLen = 0;
  int32_t     i = 0;
D
dapan1121 已提交
638
  STaskStatus status = {0};
D
dapan1121 已提交
639 640 641 642

  void *pIter = taosHashIterate(sch->tasksHash, NULL);
  while (pIter) {
    SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
D
dapan1121 已提交
643
    key = taosHashGetKey(pIter, &keyLen);
D
dapan1121 已提交
644

dengyihao's avatar
dengyihao 已提交
645
    // TODO GET EXECUTOR API TO GET MORE INFO
D
dapan1121 已提交
646

D
dapan1121 已提交
647 648 649
    QW_GET_QTID(key, status.queryId, status.taskId);
    status.status = taskStatus->status;
    status.refId = taskStatus->refId;
dengyihao's avatar
dengyihao 已提交
650

D
dapan1121 已提交
651
    taosArrayPush(hbInfo->rsp.taskStatus, &status);
dengyihao's avatar
dengyihao 已提交
652

D
dapan1121 已提交
653 654
    ++i;
    pIter = taosHashIterate(sch->tasksHash, pIter);
dengyihao's avatar
dengyihao 已提交
655
  }
D
dapan1121 已提交
656 657 658 659 660 661

  QW_UNLOCK(QW_READ, &sch->tasksLock);

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
662
int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
dengyihao's avatar
dengyihao 已提交
663
  int32_t            len = 0;
D
dapan1121 已提交
664
  SRetrieveTableRsp *rsp = NULL;
dengyihao's avatar
dengyihao 已提交
665 666
  bool               queryEnd = false;
  int32_t            code = 0;
D
dapan1121 已提交
667

D
dapan1121 已提交
668
  dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
D
dapan1121 已提交
669

D
dapan1121 已提交
670 671 672 673
  if (len < 0) {
    QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
D
dapan1121 已提交
674

D
dapan1121 已提交
675 676
  if (len == 0) {
    if (queryEnd) {
D
dapan 已提交
677
      code = dsGetDataBlock(ctx->sinkHandle, pOutput);
D
dapan1121 已提交
678
      if (code) {
D
dapan1121 已提交
679
        QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
680 681
        QW_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
682

D
dapan1121 已提交
683
      QW_TASK_DLOG_E("no data in sink and query end");
H
Haojun Liao 已提交
684

D
dapan1121 已提交
685
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
L
Liu Jicong 已提交
686
      QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
687

D
dapan1121 已提交
688
      *rspMsg = rsp;
D
dapan 已提交
689
      *dataLen = 0;
D
dapan1121 已提交
690 691
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
692 693

    pOutput->bufStatus = DS_BUF_EMPTY;
D
dapan1121 已提交
694

D
dapan1121 已提交
695
    return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
696
  }
D
dapan1121 已提交
697

D
dapan1121 已提交
698
  // Got data from sink
D
dapan1121 已提交
699
  QW_TASK_DLOG("there are data in sink, dataLength:%d", len);
D
dapan1121 已提交
700

D
dapan 已提交
701
  *dataLen = len;
dengyihao's avatar
dengyihao 已提交
702

D
dapan1121 已提交
703
  QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
D
dapan 已提交
704
  *rspMsg = rsp;
dengyihao's avatar
dengyihao 已提交
705

D
dapan 已提交
706 707
  pOutput->pData = rsp->data;
  code = dsGetDataBlock(ctx->sinkHandle, pOutput);
D
dapan1121 已提交
708
  if (code) {
D
dapan1121 已提交
709
    QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
710 711
    QW_ERR_RET(code);
  }
D
dapan1121 已提交
712

D
dapan1121 已提交
713
  if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
D
dapan1121 已提交
714
    QW_TASK_DLOG_E("task all data fetched, done");
D
dapan1121 已提交
715
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
716 717
  }

D
dapan1121 已提交
718
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
719 720
}

D
dapan1121 已提交
721
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
722 723
  int32_t         code = 0;
  SQWTaskCtx     *ctx = NULL;
S
Shengliang Guan 已提交
724 725
  SRpcHandleInfo *dropConnection = NULL;
  SRpcHandleInfo *cancelConnection = NULL;
D
dapan1121 已提交
726

D
dapan1121 已提交
727
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
728 729 730 731 732 733

  if (QW_PHASE_PRE_QUERY == phase) {
    QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
  } else {
    QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
  }
dengyihao's avatar
dengyihao 已提交
734

D
dapan1121 已提交
735
  QW_LOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
736

D
dapan1121 已提交
737
  if (QW_PHASE_PRE_FETCH == phase) {
dengyihao's avatar
dengyihao 已提交
738
    atomic_store_8((int8_t *)&ctx->queryFetched, true);
D
dapan1121 已提交
739
  } else {
D
dapan1121 已提交
740 741
    atomic_store_8(&ctx->phase, phase);
  }
D
dapan1121 已提交
742

dengyihao's avatar
dengyihao 已提交
743
  if (atomic_load_8((int8_t *)&ctx->queryEnd)) {
D
dapan1121 已提交
744 745 746
    QW_TASK_ELOG_E("query already end");
    QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
  }
D
dapan1121 已提交
747

D
dapan1121 已提交
748 749
  switch (phase) {
    case QW_PHASE_PRE_QUERY: {
D
dapan1121 已提交
750
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
H
Haojun Liao 已提交
751
        QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
752
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
D
dapan1121 已提交
753 754
        break;
      }
D
dapan1121 已提交
755

D
dapan1121 已提交
756
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
757
        dropConnection = &ctx->ctrlConnInfo;
D
dapan1121 已提交
758
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
759
        dropConnection = NULL;
760

D
dapan1121 已提交
761 762
        qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
763

D
dapan1121 已提交
764
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan 已提交
765
        break;
D
dapan1121 已提交
766
      }
D
dapan1121 已提交
767

D
dapan1121 已提交
768
      QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
D
dapan1121 已提交
769 770
      break;
    }
D
dapan1121 已提交
771
    case QW_PHASE_PRE_FETCH: {
D
dapan1121 已提交
772 773
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
        QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
774 775
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
      }
D
dapan1121 已提交
776

D
dapan1121 已提交
777
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
778
        QW_TASK_WLOG("last fetch still not processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
779 780 781 782
        QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
      }

      if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) {
D
dapan1121 已提交
783
        QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
784
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
D
dapan1121 已提交
785 786
      }
      break;
dengyihao's avatar
dengyihao 已提交
787
    }
D
dapan1121 已提交
788 789
    case QW_PHASE_PRE_CQUERY: {
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
790
        QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
791
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
792
      }
D
dapan1121 已提交
793

D
dapan1121 已提交
794
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
795
        dropConnection = &ctx->ctrlConnInfo;
D
dapan1121 已提交
796
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
797
        dropConnection = NULL;
H
Haojun Liao 已提交
798

D
dapan1121 已提交
799 800
        qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
801

D
dapan1121 已提交
802
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
803
      }
D
dapan1121 已提交
804

D
dapan1121 已提交
805
      break;
D
dapan1121 已提交
806 807 808 809 810 811 812
    }
    default:
      QW_TASK_ELOG("invalid phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

  if (ctx->rspCode) {
813
    QW_TASK_ELOG("task already failed at phase %s, code:%s", qwPhaseStr(phase), tstrerror(ctx->rspCode));
D
dapan1121 已提交
814
    QW_ERR_JRET(ctx->rspCode);
D
dapan1121 已提交
815
  }
D
dapan1121 已提交
816

D
dapan1121 已提交
817 818
_return:
  if (ctx) {
D
dapan1121 已提交
819
    QW_UPDATE_RSP_CODE(ctx, code);
dengyihao's avatar
dengyihao 已提交
820

D
dapan1121 已提交
821
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
822 823
    qwReleaseTaskCtx(mgmt, ctx);
  }
D
dapan1121 已提交
824

D
dapan1121 已提交
825
  if (dropConnection) {
S
shm  
Shengliang Guan 已提交
826
    qwBuildAndSendDropRsp(dropConnection, code);
D
dapan1121 已提交
827
    QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", dropConnection->handle, code, tstrerror(code));
D
dapan1121 已提交
828
  }
D
dapan1121 已提交
829

D
dapan1121 已提交
830
  if (cancelConnection) {
S
shm  
Shengliang Guan 已提交
831
    qwBuildAndSendCancelRsp(cancelConnection, code);
D
dapan1121 已提交
832
    QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code));
D
dapan1121 已提交
833 834
  }

835 836 837 838 839
  if (code != TSDB_CODE_SUCCESS) {
    QW_TASK_ELOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
  } else {
    QW_TASK_DLOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
  }
D
dapan1121 已提交
840 841 842 843 844

  QW_RET(code);
}

int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
S
Shengliang Guan 已提交
845 846 847 848
  int32_t         code = 0;
  SQWTaskCtx     *ctx = NULL;
  SRpcHandleInfo  connInfo = {0};
  SRpcHandleInfo *readyConnection = NULL;
D
dapan1121 已提交
849

D
dapan1121 已提交
850
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
dengyihao's avatar
dengyihao 已提交
851

D
dapan1121 已提交
852
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
853

D
dapan1121 已提交
854 855 856
  QW_LOCK(QW_WRITE, &ctx->lock);

  if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
857
    QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
858 859 860 861
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
  }

  if (QW_PHASE_POST_QUERY == phase) {
D
dapan1121 已提交
862
#if 0    
D
dapan1121 已提交
863
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
D
dapan1121 已提交
864
      readyConnection = &ctx->connInfo;
D
dapan1121 已提交
865 866
      QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
    }
D
dapan1121 已提交
867
#else
868
    connInfo = ctx->ctrlConnInfo;
D
dapan1121 已提交
869
    readyConnection = &connInfo;
dengyihao's avatar
dengyihao 已提交
870

D
dapan1121 已提交
871 872
    QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
#endif
D
dapan1121 已提交
873 874
  }

D
dapan1121 已提交
875
  if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
876 877 878 879
    if (QW_PHASE_POST_FETCH == phase) {
      QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
D
dapan1121 已提交
880

D
dapan1121 已提交
881 882
    qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
    QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
883

D
dapan1121 已提交
884 885
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
886 887 888
  }

  if (ctx->rspCode) {
dengyihao's avatar
dengyihao 已提交
889 890
    QW_TASK_ELOG("task already failed, phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode,
                 tstrerror(ctx->rspCode));
D
dapan1121 已提交
891
    QW_ERR_JRET(ctx->rspCode);
dengyihao's avatar
dengyihao 已提交
892
  }
D
dapan1121 已提交
893

D
dapan1121 已提交
894
  QW_ERR_JRET(input->code);
D
dapan1121 已提交
895 896 897

_return:

D
dapan1121 已提交
898 899 900 901
  if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED);
  }

D
dapan1121 已提交
902
  if (ctx) {
D
dapan1121 已提交
903
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
904

D
dapan1121 已提交
905 906 907
    if (QW_PHASE_POST_FETCH != phase) {
      atomic_store_8(&ctx->phase, phase);
    }
dengyihao's avatar
dengyihao 已提交
908

D
dapan1121 已提交
909
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
910
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
911 912
  }

D
dapan1121 已提交
913
  if (readyConnection) {
dengyihao's avatar
dengyihao 已提交
914
    qwBuildAndSendReadyRsp(readyConnection, code);
D
dapan1121 已提交
915
    QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code));
D
dapan1121 已提交
916 917
  }

D
dapan1121 已提交
918 919
  if (code) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
D
dapan1121 已提交
920 921
  }

D
dapan1121 已提交
922
  QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
D
dapan1121 已提交
923

D
dapan1121 已提交
924 925 926
  QW_RET(code);
}

D
dapan1121 已提交
927
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) {
928 929 930 931 932 933 934
  int32_t        code = 0;
  bool           queryRsped = false;
  SSubplan      *plan = NULL;
  SQWPhaseInput  input = {0};
  qTaskInfo_t    pTaskInfo = NULL;
  DataSinkHandle sinkHandle = NULL;
  SQWTaskCtx    *ctx = NULL;
D
dapan1121 已提交
935

D
dapan1121 已提交
936
  QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
D
dapan1121 已提交
937

D
dapan1121 已提交
938
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
D
dapan1121 已提交
939 940

  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
941

D
dapan1121 已提交
942
  atomic_store_8(&ctx->taskType, taskType);
D
dapan1121 已提交
943
  atomic_store_8(&ctx->explain, explain);
X
Xiaoyu Wang 已提交
944

945
  ctx->ctrlConnInfo = qwMsg->connInfo;
D
dapan1121 已提交
946

947
  /*QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);*/
X
Xiaoyu Wang 已提交
948

D
dapan1121 已提交
949 950
  code = qStringToSubplan(qwMsg->msg, &plan);
  if (TSDB_CODE_SUCCESS != code) {
951 952
    code = TSDB_CODE_INVALID_MSG;
    QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
953
    QW_ERR_JRET(code);
D
dapan1121 已提交
954
  }
dengyihao's avatar
dengyihao 已提交
955

D
dapan1121 已提交
956 957
  ctx->plan = plan;

958
  code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
959
  if (code) {
D
dapan1121 已提交
960
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
961
    QW_ERR_JRET(code);
D
dapan1121 已提交
962
  }
D
dapan1121 已提交
963

H
Haojun Liao 已提交
964
  if (NULL == sinkHandle || NULL == pTaskInfo) {
D
dapan1121 已提交
965 966 967 968
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

dengyihao's avatar
dengyihao 已提交
969 970
  // QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
  // QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
971

dengyihao's avatar
dengyihao 已提交
972
  // queryRsped = true;
D
dapan1121 已提交
973

D
dapan1121 已提交
974 975 976
  atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
  atomic_store_ptr(&ctx->sinkHandle, sinkHandle);

D
dapan1121 已提交
977
  if (pTaskInfo && sinkHandle) {
D
dapan1121 已提交
978
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
D
dapan1121 已提交
979
  }
dengyihao's avatar
dengyihao 已提交
980

D
dapan1121 已提交
981 982
_return:

D
dapan1121 已提交
983 984
  input.code = code;
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
985

dengyihao's avatar
dengyihao 已提交
986
  // if (!queryRsped) {
D
dapan1121 已提交
987 988 989
  //  qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
  //  QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
  //}
D
dapan1121 已提交
990

D
dapan1121 已提交
991
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
992 993
}

D
dapan1121 已提交
994
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
995
  int32_t     code = 0;
D
dapan1121 已提交
996
  SQWTaskCtx *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
997 998
  int8_t      phase = 0;
  bool        needRsp = true;
D
dapan1121 已提交
999 1000

  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
1001

D
dapan1121 已提交
1002 1003
  QW_LOCK(QW_WRITE, &ctx->lock);

D
dapan1121 已提交
1004 1005 1006
  if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
    QW_TASK_WLOG_E("task is dropping or already dropped");
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
1007
  }
dengyihao's avatar
dengyihao 已提交
1008

D
dapan1121 已提交
1009
  if (ctx->phase == QW_PHASE_PRE_QUERY) {
1010
    ctx->ctrlConnInfo = qwMsg->connInfo;
D
dapan1121 已提交
1011
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY);
D
dapan1121 已提交
1012 1013 1014 1015 1016 1017 1018
    needRsp = false;
    QW_TASK_DLOG_E("ready msg will not rsp now");
    goto _return;
  }

  QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);

dengyihao's avatar
dengyihao 已提交
1019 1020 1021
  if (atomic_load_8((int8_t *)&ctx->queryEnd) || atomic_load_8((int8_t *)&ctx->queryFetched)) {
    QW_TASK_ELOG("got ready msg at wrong status, queryEnd:%d, queryFetched:%d", atomic_load_8((int8_t *)&ctx->queryEnd),
                 atomic_load_8((int8_t *)&ctx->queryFetched));
D
dapan1121 已提交
1022 1023 1024
    QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
  }

D
dapan1121 已提交
1025 1026 1027
  if (ctx->phase == QW_PHASE_POST_QUERY) {
    code = ctx->rspCode;
    goto _return;
D
dapan1121 已提交
1028 1029
  }

D
dapan1121 已提交
1030
  QW_TASK_ELOG("invalid phase when got ready msg, phase:%s", qwPhaseStr(ctx->phase));
H
Haojun Liao 已提交
1031

D
dapan1121 已提交
1032
  QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
D
dapan1121 已提交
1033 1034 1035

_return:

D
dapan1121 已提交
1036
  if (code && ctx) {
D
dapan1121 已提交
1037
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
1038 1039
  }

D
dapan1121 已提交
1040 1041 1042
  if (code) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
  }
H
Haojun Liao 已提交
1043

D
dapan1121 已提交
1044 1045
  if (ctx) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
1046
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
1047 1048
  }

D
dapan1121 已提交
1049
  if (needRsp) {
S
shm  
Shengliang Guan 已提交
1050
    qwBuildAndSendReadyRsp(&qwMsg->connInfo, code);
D
dapan1121 已提交
1051
    QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
1052 1053
  }

D
dapan1121 已提交
1054
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1055 1056
}

D
dapan1121 已提交
1057
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
1058
  SQWTaskCtx   *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
1059
  int32_t       code = 0;
1060
  SQWPhaseInput input = {0};
1061
  void         *rsp = NULL;
dengyihao's avatar
dengyihao 已提交
1062 1063 1064
  int32_t       dataLen = 0;
  bool          queryEnd = false;

D
dapan1121 已提交
1065
  do {
D
dapan1121 已提交
1066
    QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
D
dapan1121 已提交
1067

D
dapan1121 已提交
1068
    QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
1069

dengyihao's avatar
dengyihao 已提交
1070 1071
    atomic_store_8((int8_t *)&ctx->queryInQueue, 0);
    atomic_store_8((int8_t *)&ctx->queryContinue, 0);
D
dapan1121 已提交
1072

D
dapan1121 已提交
1073
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd));
D
dapan1121 已提交
1074

D
dapan1121 已提交
1075 1076 1077
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
      SOutputData sOutput = {0};
      QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
dengyihao's avatar
dengyihao 已提交
1078 1079

      if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
1080
        QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
dengyihao's avatar
dengyihao 已提交
1081 1082

        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
1083
      }
dengyihao's avatar
dengyihao 已提交
1084

D
dapan1121 已提交
1085
      if (rsp) {
D
dapan1121 已提交
1086
        bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
1087

D
dapan1121 已提交
1088
        qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
1089
        if (qComplete) {
dengyihao's avatar
dengyihao 已提交
1090
          atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
1091
        }
H
Haojun Liao 已提交
1092

D
dapan1121 已提交
1093
        qwMsg->connInfo = ctx->dataConnInfo;
dengyihao's avatar
dengyihao 已提交
1094 1095 1096 1097 1098
        QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);

        qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
        QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
                     tstrerror(code), dataLen);
D
dapan1121 已提交
1099
      } else {
dengyihao's avatar
dengyihao 已提交
1100
        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
1101 1102 1103
      }
    }

dengyihao's avatar
dengyihao 已提交
1104
  _return:
1105

D
dapan1121 已提交
1106 1107 1108 1109
    if (NULL == ctx) {
      break;
    }

D
dapan1121 已提交
1110
    if (code && QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
dengyihao's avatar
dengyihao 已提交
1111
      QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
1112 1113
      qwFreeFetchRsp(rsp);
      rsp = NULL;
dengyihao's avatar
dengyihao 已提交
1114

D
dapan1121 已提交
1115
      qwMsg->connInfo = ctx->dataConnInfo;
S
shm  
Shengliang Guan 已提交
1116
      qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code);
dengyihao's avatar
dengyihao 已提交
1117 1118
      QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
                   0);
1119
    }
D
dapan1121 已提交
1120

D
dapan1121 已提交
1121
    QW_LOCK(QW_WRITE, &ctx->lock);
dengyihao's avatar
dengyihao 已提交
1122
    if (queryEnd || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
D
dapan1121 已提交
1123
      // Note: if necessary, fetch need to put cquery to queue again
D
dapan1121 已提交
1124
      atomic_store_8(&ctx->phase, 0);
dengyihao's avatar
dengyihao 已提交
1125
      QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
1126 1127
      break;
    }
dengyihao's avatar
dengyihao 已提交
1128
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
1129
  } while (true);
D
dapan1121 已提交
1130

D
dapan1121 已提交
1131
  input.code = code;
dengyihao's avatar
dengyihao 已提交
1132
  qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL);
D
dapan1121 已提交
1133

dengyihao's avatar
dengyihao 已提交
1134
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1135
}
D
dapan1121 已提交
1136

D
dapan1121 已提交
1137
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
1138 1139 1140
  int32_t       code = 0;
  int32_t       dataLen = 0;
  bool          locked = false;
1141 1142
  SQWTaskCtx   *ctx = NULL;
  void         *rsp = NULL;
D
dapan1121 已提交
1143
  SQWPhaseInput input = {0};
D
dapan1121 已提交
1144

D
dapan1121 已提交
1145
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
1146

1147
  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
1148

D
dapan 已提交
1149 1150
  SOutputData sOutput = {0};
  QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
D
dapan1121 已提交
1151

1152
  if (NULL == rsp) {
D
dapan1121 已提交
1153 1154
    atomic_store_ptr(&ctx->dataConnInfo.handle, qwMsg->connInfo.handle);
    atomic_store_ptr(&ctx->dataConnInfo.ahandle, qwMsg->connInfo.ahandle);
dengyihao's avatar
dengyihao 已提交
1155

1156
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
D
dapan1121 已提交
1157
  } else {
D
dapan1121 已提交
1158
    bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
1159

D
dapan1121 已提交
1160
    qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
1161
    if (qComplete) {
dengyihao's avatar
dengyihao 已提交
1162
      atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
1163
    }
D
dapan1121 已提交
1164 1165
  }

dengyihao's avatar
dengyihao 已提交
1166
  if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
1167
    QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
D
dapan1121 已提交
1168

D
dapan1121 已提交
1169 1170
    QW_LOCK(QW_WRITE, &ctx->lock);
    locked = true;
1171

D
dapan1121 已提交
1172
    // RC WARNING
D
dapan1121 已提交
1173
    if (QW_IS_QUERY_RUNNING(ctx)) {
dengyihao's avatar
dengyihao 已提交
1174 1175
      atomic_store_8((int8_t *)&ctx->queryContinue, 1);
    } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
H
Haojun Liao 已提交
1176
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING);
D
dapan1121 已提交
1177

dengyihao's avatar
dengyihao 已提交
1178 1179
      atomic_store_8((int8_t *)&ctx->queryInQueue, 1);

D
dapan1121 已提交
1180
      QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
1181
    }
D
dapan 已提交
1182
  }
dengyihao's avatar
dengyihao 已提交
1183

D
dapan1121 已提交
1184
_return:
D
dapan1121 已提交
1185

D
dapan1121 已提交
1186 1187 1188 1189 1190
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

  input.code = code;
D
dapan1121 已提交
1191
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL);
D
dapan1121 已提交
1192

D
dapan 已提交
1193 1194 1195
  if (code) {
    qwFreeFetchRsp(rsp);
    rsp = NULL;
D
dapan1121 已提交
1196
    dataLen = 0;
D
dapan1121 已提交
1197 1198 1199
  }

  if (code || rsp) {
S
shm  
Shengliang Guan 已提交
1200
    qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
dengyihao's avatar
dengyihao 已提交
1201 1202
    QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
                 dataLen);
D
dapan1121 已提交
1203 1204
  }

D
dapan1121 已提交
1205
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1206
}
D
dapan1121 已提交
1207

D
dapan1121 已提交
1208
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
1209 1210
  int32_t     code = 0;
  bool        rsped = false;
D
dapan1121 已提交
1211
  SQWTaskCtx *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
1212
  bool        locked = false;
D
dapan1121 已提交
1213

D
dapan1121 已提交
1214 1215
  // TODO : TASK ALREADY REMOVED AND A NEW DROP MSG RECEIVED

D
dapan1121 已提交
1216
  QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
1217

D
dapan1121 已提交
1218 1219 1220 1221 1222
  QW_LOCK(QW_WRITE, &ctx->lock);

  locked = true;

  if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
1223
    QW_TASK_WLOG_E("task already dropping");
D
dapan1121 已提交
1224 1225 1226
    QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
  }

D
dapan1121 已提交
1227
  if (QW_IS_QUERY_RUNNING(ctx)) {
D
dapan1121 已提交
1228
    QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
D
dapan1121 已提交
1229
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING);
D
dapan1121 已提交
1230
  } else if (ctx->phase > 0) {
D
dapan1121 已提交
1231 1232 1233 1234
    if (0 == qwMsg->code) {
      qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
      QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
    }
1235

D
dapan1121 已提交
1236
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
1237
    rsped = true;
D
dapan1121 已提交
1238 1239
  } else {
    // task not started
D
dapan1121 已提交
1240
  }
D
dapan1121 已提交
1241

D
dapan1121 已提交
1242
  if (!rsped) {
1243
    ctx->ctrlConnInfo = qwMsg->connInfo;
dengyihao's avatar
dengyihao 已提交
1244

D
dapan1121 已提交
1245 1246
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
  }
1247

D
dapan1121 已提交
1248
_return:
D
dapan1121 已提交
1249

D
dapan1121 已提交
1250
  if (code) {
D
dapan1121 已提交
1251 1252 1253
    if (ctx) {
      QW_UPDATE_RSP_CODE(ctx, code);
    }
H
Haojun Liao 已提交
1254

D
dapan1121 已提交
1255
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
D
dapan1121 已提交
1256 1257
  }

D
dapan 已提交
1258 1259 1260 1261
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

D
dapan1121 已提交
1262
  if (ctx) {
D
dapan1121 已提交
1263
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
1264 1265
  }

D
dapan1121 已提交
1266
  if ((TSDB_CODE_SUCCESS != code) && (0 == qwMsg->code)) {
S
shm  
Shengliang Guan 已提交
1267
    qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
D
dapan1121 已提交
1268
    QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
1269
  }
D
dapan1121 已提交
1270

D
dapan1121 已提交
1271
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1272
}
D
dapan1121 已提交
1273

D
dapan1121 已提交
1274
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
dengyihao's avatar
dengyihao 已提交
1275
  int32_t         code = 0;
D
dapan1121 已提交
1276
  SSchedulerHbRsp rsp = {0};
1277
  SQWSchStatus   *sch = NULL;
D
dapan1121 已提交
1278 1279 1280 1281 1282 1283

  QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));

  QW_LOCK(QW_WRITE, &sch->hbConnLock);

  if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
1284
    tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
D
dapan1121 已提交
1285 1286 1287 1288 1289 1290 1291
    sch->hbConnInfo.handle = NULL;
    sch->hbConnInfo.ahandle = NULL;

    QW_DLOG("release hb handle due to connection broken, handle:%p", qwMsg->connInfo.handle);
  } else {
    QW_DLOG("ignore hb connection broken, handle:%p, currentHandle:%p", qwMsg->connInfo.handle, sch->hbConnInfo.handle);
  }
dengyihao's avatar
dengyihao 已提交
1292

D
dapan1121 已提交
1293
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
dengyihao's avatar
dengyihao 已提交
1294

D
dapan1121 已提交
1295
  qwReleaseScheduler(QW_READ, mgmt);
dengyihao's avatar
dengyihao 已提交
1296

D
dapan1121 已提交
1297 1298 1299
  QW_RET(TSDB_CODE_SUCCESS);
}

D
dapan1121 已提交
1300
int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
dengyihao's avatar
dengyihao 已提交
1301
  int32_t         code = 0;
D
dapan1121 已提交
1302
  SSchedulerHbRsp rsp = {0};
1303
  SQWSchStatus   *sch = NULL;
D
dapan1121 已提交
1304

D
dapan1121 已提交
1305 1306 1307
  if (qwMsg->code) {
    QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req));
  }
D
dapan1121 已提交
1308 1309 1310

  QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));

D
dapan1121 已提交
1311 1312
  QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo));

D
dapan1121 已提交
1313
  QW_LOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
1314

D
dapan1121 已提交
1315
  if (sch->hbConnInfo.handle) {
1316
    tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
S
Shengliang Guan 已提交
1317
    sch->hbConnInfo.handle = NULL;
D
dapan1121 已提交
1318
  }
D
dapan1121 已提交
1319

D
dapan1121 已提交
1320
  memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
D
dapan1121 已提交
1321
  memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId));
dengyihao's avatar
dengyihao 已提交
1322

D
dapan1121 已提交
1323
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
dengyihao's avatar
dengyihao 已提交
1324 1325 1326

  QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", req->sId,
          req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle);
D
dapan1121 已提交
1327

D
dapan1121 已提交
1328 1329 1330 1331
  qwReleaseScheduler(QW_READ, mgmt);

_return:

D
dapan1121 已提交
1332 1333
  memcpy(&rsp.epId, &req->epId, sizeof(req->epId));

S
shm  
Shengliang Guan 已提交
1334
  qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
D
dapan1121 已提交
1335 1336

  if (code) {
1337
    tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER);
S
Shengliang Guan 已提交
1338
    qwMsg->connInfo.handle = NULL;
D
dapan1121 已提交
1339
  }
dengyihao's avatar
dengyihao 已提交
1340

1341
  /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));*/
dengyihao's avatar
dengyihao 已提交
1342

D
dapan1121 已提交
1343
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1344 1345 1346
}

void qwProcessHbTimerEvent(void *param, void *tmrId) {
1347
  SQWHbParam *hbParam = (SQWHbParam *)param;
D
dapan1121 已提交
1348 1349 1350 1351
  if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) {
    return;
  }

1352
  int64_t   refId = hbParam->refId;
D
dapan1121 已提交
1353 1354 1355 1356 1357 1358
  SQWorker *mgmt = qwAcquire(refId);
  if (NULL == mgmt) {
    QW_DLOG("qwAcquire %" PRIx64 "failed", refId);
    taosMemoryFree(param);
    return;
  }
1359

D
dapan1121 已提交
1360
  SQWSchStatus *sch = NULL;
dengyihao's avatar
dengyihao 已提交
1361
  int32_t       taskNum = 0;
1362
  SQWHbInfo    *rspList = NULL;
dengyihao's avatar
dengyihao 已提交
1363
  int32_t       code = 0;
D
dapan1121 已提交
1364

D
dapan1121 已提交
1365 1366
  qwDbgDumpMgmtInfo(mgmt);

D
dapan1121 已提交
1367 1368 1369 1370 1371
  QW_LOCK(QW_READ, &mgmt->schLock);

  int32_t schNum = taosHashGetSize(mgmt->schHash);
  if (schNum <= 0) {
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
1372
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
1373
    qwRelease(refId);
D
dapan1121 已提交
1374
    return;
D
dapan1121 已提交
1375 1376
  }

wafwerar's avatar
wafwerar 已提交
1377
  rspList = taosMemoryCalloc(schNum, sizeof(SQWHbInfo));
D
dapan1121 已提交
1378 1379
  if (NULL == rspList) {
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
1380 1381
    QW_ELOG("calloc %d SQWHbInfo failed", schNum);
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
1382
    qwRelease(refId);
D
dapan1121 已提交
1383
    return;
D
dapan1121 已提交
1384 1385
  }

1386
  void   *key = NULL;
dengyihao's avatar
dengyihao 已提交
1387
  size_t  keyLen = 0;
D
dapan1121 已提交
1388 1389 1390 1391
  int32_t i = 0;

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
D
dapan1121 已提交
1392 1393 1394 1395 1396 1397 1398
    SQWSchStatus *sch = (SQWSchStatus *)pIter;
    if (NULL == sch->hbConnInfo.handle) {
      uint64_t *sId = taosHashGetKey(pIter, NULL);
      QW_DLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
      pIter = taosHashIterate(mgmt->schHash, pIter);
      continue;
    }
dengyihao's avatar
dengyihao 已提交
1399

D
dapan1121 已提交
1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414
    code = qwGenerateSchHbRsp(mgmt, (SQWSchStatus *)pIter, &rspList[i]);
    if (code) {
      taosHashCancelIterate(mgmt->schHash, pIter);
      QW_ERR_JRET(code);
    }

    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

_return:

  QW_UNLOCK(QW_READ, &mgmt->schLock);

  for (int32_t j = 0; j < i; ++j) {
S
shm  
Shengliang Guan 已提交
1415
    qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code);
1416 1417
    /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),*/
    /*(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));*/
D
dapan1121 已提交
1418
    tFreeSSchedulerHbRsp(&rspList[j].rsp);
D
dapan1121 已提交
1419 1420
  }

wafwerar's avatar
wafwerar 已提交
1421
  taosMemoryFreeClear(rspList);
D
dapan1121 已提交
1422

dengyihao's avatar
dengyihao 已提交
1423
  taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
1424
  qwRelease(refId);
D
dapan1121 已提交
1425 1426 1427 1428 1429 1430
}

void qwCloseRef(void) {
  taosWLockLatch(&gQwMgmt.lock);
  if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) {
    taosCloseRef(gQwMgmt.qwRef);
1431
    gQwMgmt.qwRef = -1;
D
dapan1121 已提交
1432 1433
  }
  taosWUnLockLatch(&gQwMgmt.lock);
D
dapan1121 已提交
1434 1435
}

1436
void qwDestroySchStatus(SQWSchStatus *pStatus) { taosHashCleanup(pStatus->tasksHash); }
D
dapan1121 已提交
1437

D
dapan1121 已提交
1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448
void qwDestroyImpl(void *pMgmt) {
  SQWorker *mgmt = (SQWorker *)pMgmt;

  taosTmrStopA(&mgmt->hbTimer);
  taosTmrCleanUp(mgmt->timer);

  // TODO STOP ALL QUERY

  // TODO FREE ALL

  taosHashCleanup(mgmt->ctxHash);
D
dapan1121 已提交
1449 1450 1451 1452 1453 1454

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
    SQWSchStatus *sch = (SQWSchStatus *)pIter;
    qwDestroySchStatus(sch);
    pIter = taosHashIterate(mgmt->schHash, pIter);
1455
  }
D
dapan1121 已提交
1456 1457 1458 1459
  taosHashCleanup(mgmt->schHash);

  taosMemoryFree(mgmt);

1460
  atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
D
dapan1121 已提交
1461 1462 1463 1464 1465 1466 1467

  qwCloseRef();
}

int32_t qwOpenRef(void) {
  taosWLockLatch(&gQwMgmt.lock);
  if (gQwMgmt.qwRef < 0) {
1468
    gQwMgmt.qwRef = taosOpenRef(100, qwDestroyImpl);
D
dapan1121 已提交
1469 1470 1471 1472 1473 1474 1475
    if (gQwMgmt.qwRef < 0) {
      taosWUnLockLatch(&gQwMgmt.lock);
      qError("init qworker ref failed");
      QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }
  taosWUnLockLatch(&gQwMgmt.lock);
1476

D
dapan1121 已提交
1477 1478 1479
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1480 1481 1482
void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
  int32_t paramIdx = 0;
  int32_t newParamIdx = 0;
1483

D
dapan1121 已提交
1484 1485 1486 1487 1488 1489 1490
  while (true) {
    paramIdx = atomic_load_32(&gQwMgmt.paramIdx);
    if (paramIdx == tListLen(gQwMgmt.param)) {
      newParamIdx = 0;
    } else {
      newParamIdx = paramIdx + 1;
    }
1491

D
dapan1121 已提交
1492 1493 1494 1495 1496 1497 1498 1499 1500 1501
    if (paramIdx == atomic_val_compare_exchange_32(&gQwMgmt.paramIdx, paramIdx, newParamIdx)) {
      break;
    }
  }

  gQwMgmt.param[paramIdx].qwrId = gQwMgmt.qwRef;
  gQwMgmt.param[paramIdx].refId = refId;

  *pParam = &gQwMgmt.param[paramIdx];
}
D
dapan1121 已提交
1502

S
Shengliang Guan 已提交
1503
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
1504
  if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) {
D
dapan1121 已提交
1505 1506 1507
    qError("invalid param to init qworker");
    QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
S
Shengliang 已提交
1508

D
dapan1121 已提交
1509 1510 1511 1512
  int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1);
  if (1 == qwNum) {
    memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param));
  }
D
dapan1121 已提交
1513 1514 1515 1516 1517 1518 1519 1520

  int32_t code = qwOpenRef();
  if (code) {
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
    QW_RET(code);
  }

  SQWorker *mgmt = taosMemoryCalloc(1, sizeof(SQWorker));
D
dapan1121 已提交
1521
  if (NULL == mgmt) {
D
dapan1121 已提交
1522 1523
    qError("calloc %d failed", (int32_t)sizeof(SQWorker));
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
D
dapan1121 已提交
1524
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1525 1526 1527 1528
  }

  if (cfg) {
    mgmt->cfg = *cfg;
D
dapan1121 已提交
1529
    if (0 == mgmt->cfg.maxSchedulerNum) {
D
dapan1121 已提交
1530
      mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
D
dapan1121 已提交
1531 1532
    }
    if (0 == mgmt->cfg.maxTaskNum) {
D
dapan1121 已提交
1533
      mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
D
dapan1121 已提交
1534 1535
    }
    if (0 == mgmt->cfg.maxSchTaskNum) {
D
dapan1121 已提交
1536
      mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
1537
    }
D
dapan1121 已提交
1538
  } else {
D
dapan1121 已提交
1539 1540 1541
    mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
    mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
    mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
1542 1543
  }

dengyihao's avatar
dengyihao 已提交
1544 1545
  mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                               HASH_ENTRY_LOCK);
D
dapan1121 已提交
1546
  if (NULL == mgmt->schHash) {
wafwerar's avatar
wafwerar 已提交
1547
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1548
    qError("init %d scheduler hash failed", mgmt->cfg.maxSchedulerNum);
D
dapan1121 已提交
1549
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1550 1551
  }

dengyihao's avatar
dengyihao 已提交
1552 1553
  mgmt->ctxHash =
      taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1554
  if (NULL == mgmt->ctxHash) {
D
dapan1121 已提交
1555
    qError("init %d task ctx hash failed", mgmt->cfg.maxTaskNum);
D
dapan1121 已提交
1556 1557 1558 1559 1560 1561 1562 1563 1564
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  mgmt->timer = taosTmrInit(0, 0, 0, "qworker");
  if (NULL == mgmt->timer) {
    qError("init timer failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1565 1566
  mgmt->nodeType = nodeType;
  mgmt->nodeId = nodeId;
S
Shengliang Guan 已提交
1567
  mgmt->msgCb = *pMsgCb;
D
dapan1121 已提交
1568

D
dapan1121 已提交
1569 1570 1571 1572 1573 1574
  mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt);
  if (mgmt->refId < 0) {
    qError("taosAddRef qw failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(terrno);
  }

D
dapan1121 已提交
1575 1576 1577
  SQWHbParam *param = NULL;
  qwSetHbParam(mgmt->refId, &param);

1578
  mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void *)param, mgmt->timer);
D
dapan1121 已提交
1579 1580 1581 1582
  if (NULL == mgmt->hbTimer) {
    qError("start hb timer failed");
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
1583

D
dapan1121 已提交
1584 1585
  *qWorkerMgmt = mgmt;

D
dapan1121 已提交
1586 1587
  qDebug("qworker initialized for node, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt);

D
dapan1121 已提交
1588
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1589 1590 1591

_return:

D
dapan1121 已提交
1592 1593 1594 1595 1596 1597 1598
  if (mgmt->refId >= 0) {
    qwRelease(mgmt->refId);
  } else {
    taosHashCleanup(mgmt->schHash);
    taosHashCleanup(mgmt->ctxHash);
    taosTmrCleanUp(mgmt->timer);
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1599

1600
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
D
dapan1121 已提交
1601
  }
1602

D
dapan1121 已提交
1603
  QW_RET(code);
D
dapan1121 已提交
1604
}
D
dapan1121 已提交
1605 1606 1607 1608

void qWorkerDestroy(void **qWorkerMgmt) {
  if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
    return;
D
dapan1121 已提交
1609
  }
D
dapan 已提交
1610

D
dapan1121 已提交
1611
  SQWorker *mgmt = *qWorkerMgmt;
D
dapan1121 已提交
1612

D
dapan1121 已提交
1613 1614 1615
  if (taosRemoveRef(gQwMgmt.qwRef, mgmt->refId)) {
    qError("remove qw from ref list failed, refId:%" PRIx64, mgmt->refId);
  }
D
dapan1121 已提交
1616
}
D
dapan1121 已提交
1617

D
dapan1121 已提交
1618
int32_t qwGetSchTasksStatus(SQWorker *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) {
dengyihao's avatar
dengyihao 已提交
1619 1620 1621
  /*
    SQWSchStatus *sch = NULL;
    int32_t taskNum = 0;
1622

dengyihao's avatar
dengyihao 已提交
1623
    QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch));
1624

dengyihao's avatar
dengyihao 已提交
1625
    sch->lastAccessTs = taosGetTimestampSec();
1626

dengyihao's avatar
dengyihao 已提交
1627
    QW_LOCK(QW_READ, &sch->tasksLock);
1628

dengyihao's avatar
dengyihao 已提交
1629
    taskNum = taosHashGetSize(sch->tasksHash);
D
dapan1121 已提交
1630

dengyihao's avatar
dengyihao 已提交
1631 1632 1633 1634 1635 1636
    int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum;
    *rsp = taosMemoryCalloc(1, size);
    if (NULL == *rsp) {
      QW_SCH_ELOG("calloc %d failed", size);
      QW_UNLOCK(QW_READ, &sch->tasksLock);
      qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
1637

dengyihao's avatar
dengyihao 已提交
1638 1639
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
D
dapan1121 已提交
1640

dengyihao's avatar
dengyihao 已提交
1641 1642 1643 1644 1645 1646 1647 1648
    void *key = NULL;
    size_t keyLen = 0;
    int32_t i = 0;

    void *pIter = taosHashIterate(sch->tasksHash, NULL);
    while (pIter) {
      SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
      taosHashGetKey(pIter, &key, &keyLen);
D
dapan1121 已提交
1649

dengyihao's avatar
dengyihao 已提交
1650 1651
      QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId);
      (*rsp)->status[i].status = taskStatus->status;
D
dapan1121 已提交
1652

dengyihao's avatar
dengyihao 已提交
1653 1654 1655
      ++i;
      pIter = taosHashIterate(sch->tasksHash, pIter);
    }
D
dapan1121 已提交
1656

dengyihao's avatar
dengyihao 已提交
1657 1658 1659 1660 1661 1662 1663
    QW_UNLOCK(QW_READ, &sch->tasksLock);
    qwReleaseScheduler(QW_READ, mgmt);

    (*rsp)->num = taskNum;
  */
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
1664

D
dapan1121 已提交
1665
int32_t qwUpdateSchLastAccess(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
D
dapan1121 已提交
1666
  SQWSchStatus *sch = NULL;
D
dapan1121 已提交
1667

dengyihao's avatar
dengyihao 已提交
1668 1669
  /*
    QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
D
dapan1121 已提交
1670

dengyihao's avatar
dengyihao 已提交
1671
    sch->lastAccessTs = taosGetTimestampSec();
D
dapan1121 已提交
1672

dengyihao's avatar
dengyihao 已提交
1673 1674
    qwReleaseScheduler(QW_READ, mgmt);
  */
D
dapan1121 已提交
1675 1676 1677
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1678
int32_t qwGetTaskStatus(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t *taskStatus) {
1679
  SQWSchStatus  *sch = NULL;
D
dapan1121 已提交
1680
  SQWTaskStatus *task = NULL;
dengyihao's avatar
dengyihao 已提交
1681
  int32_t        code = 0;
D
dapan1121 已提交
1682

dengyihao's avatar
dengyihao 已提交
1683 1684 1685 1686 1687
  /*
    if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch)) {
      *taskStatus = JOB_TASK_STATUS_NULL;
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
1688

dengyihao's avatar
dengyihao 已提交
1689 1690
    if (qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)) {
      qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
1691

dengyihao's avatar
dengyihao 已提交
1692 1693 1694
      *taskStatus = JOB_TASK_STATUS_NULL;
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
1695

dengyihao's avatar
dengyihao 已提交
1696 1697 1698 1699 1700
    *taskStatus = task->status;

    qwReleaseTask(QW_READ, sch);
    qwReleaseScheduler(QW_READ, mgmt);
  */
D
dapan1121 已提交
1701 1702 1703 1704

  QW_RET(code);
}

D
dapan1121 已提交
1705
int32_t qwCancelTask(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
1706
  SQWSchStatus  *sch = NULL;
D
dapan1121 已提交
1707
  SQWTaskStatus *task = NULL;
dengyihao's avatar
dengyihao 已提交
1708 1709 1710 1711
  int32_t        code = 0;

  /*
    QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch));
D
dapan1121 已提交
1712

dengyihao's avatar
dengyihao 已提交
1713
    QW_ERR_JRET(qwAcquireAddTask(mgmt, QW_READ, sch, qId, tId, JOB_TASK_STATUS_NOT_START, &task));
D
dapan1121 已提交
1714

D
dapan1121 已提交
1715

dengyihao's avatar
dengyihao 已提交
1716
    QW_LOCK(QW_WRITE, &task->lock);
D
dapan1121 已提交
1717

dengyihao's avatar
dengyihao 已提交
1718
    task->cancel = true;
D
dapan1121 已提交
1719

dengyihao's avatar
dengyihao 已提交
1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733
    int8_t oriStatus = task->status;
    int8_t newStatus = 0;

    if (task->status == JOB_TASK_STATUS_CANCELLED || task->status == JOB_TASK_STATUS_NOT_START || task->status ==
  JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING) { QW_UNLOCK(QW_WRITE, &task->lock);

      qwReleaseTask(QW_READ, sch);
      qwReleaseScheduler(QW_READ, mgmt);

      return TSDB_CODE_SUCCESS;
    } else if (task->status == JOB_TASK_STATUS_FAILED || task->status == JOB_TASK_STATUS_SUCCEED || task->status ==
  JOB_TASK_STATUS_PARTIAL_SUCCEED) { QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED)); } else {
      QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLING));
    }
D
dapan1121 已提交
1734

D
dapan1121 已提交
1735
    QW_UNLOCK(QW_WRITE, &task->lock);
D
dapan1121 已提交
1736

D
dapan1121 已提交
1737 1738
    qwReleaseTask(QW_READ, sch);
    qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
1739

dengyihao's avatar
dengyihao 已提交
1740 1741 1742
    if (oriStatus == JOB_TASK_STATUS_EXECUTING) {
      //TODO call executer to cancel subquery async
    }
D
dapan1121 已提交
1743

dengyihao's avatar
dengyihao 已提交
1744
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1745

dengyihao's avatar
dengyihao 已提交
1746
  _return:
D
dapan1121 已提交
1747

dengyihao's avatar
dengyihao 已提交
1748 1749
    if (task) {
      QW_UNLOCK(QW_WRITE, &task->lock);
D
dapan1121 已提交
1750

dengyihao's avatar
dengyihao 已提交
1751 1752 1753 1754 1755 1756 1757
      qwReleaseTask(QW_READ, sch);
    }

    if (sch) {
      qwReleaseScheduler(QW_READ, mgmt);
    }
  */
D
dapan1121 已提交
1758

D
dapan1121 已提交
1759
  QW_RET(code);
D
dapan1121 已提交
1760
}