qworker.c 44.1 KB
Newer Older
D
dapan1121 已提交
1
#include "qworker.h"
S
common  
Shengliang Guan 已提交
2
#include "tcommon.h"
3
#include "executor.h"
D
dapan1121 已提交
4
#include "planner.h"
H
Haojun Liao 已提交
5 6
#include "query.h"
#include "qworkerInt.h"
D
dapan1121 已提交
7
#include "qworkerMsg.h"
H
Haojun Liao 已提交
8
#include "tmsg.h"
9
#include "tname.h"
D
dapan1121 已提交
10
#include "dataSinkMgt.h"
D
dapan1121 已提交
11

D
dapan1121 已提交
12
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true};
13

D
dapan1121 已提交
14 15 16
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
  if (!gQWDebug.statusEnable) {
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
17
  }
18

D
dapan1121 已提交
19
  int32_t code = 0;
D
dapan1121 已提交
20

D
dapan1121 已提交
21
  if (oriStatus == newStatus) {
D
dapan1121 已提交
22 23 24 25
    if (newStatus == JOB_TASK_STATUS_EXECUTING || newStatus == JOB_TASK_STATUS_FAILED) {
      *ignore = true;
      return TSDB_CODE_SUCCESS;
    }
H
Haojun Liao 已提交
26

D
dapan1121 已提交
27 28 29 30 31
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }
  
  switch (oriStatus) {
    case JOB_TASK_STATUS_NULL:
D
dapan1121 已提交
32 33 34
      if (newStatus != JOB_TASK_STATUS_EXECUTING 
       && newStatus != JOB_TASK_STATUS_FAILED 
       && newStatus != JOB_TASK_STATUS_NOT_START) {
D
dapan1121 已提交
35 36 37 38 39
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      
      break;
    case JOB_TASK_STATUS_NOT_START:
D
dapan1121 已提交
40
      if (newStatus != JOB_TASK_STATUS_CANCELLED) {
D
dapan1121 已提交
41 42 43 44 45
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      
      break;
    case JOB_TASK_STATUS_EXECUTING:
D
dapan1121 已提交
46
      if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED 
D
dapan1121 已提交
47
       && newStatus != JOB_TASK_STATUS_SUCCEED 
D
dapan1121 已提交
48 49 50 51
       && newStatus != JOB_TASK_STATUS_FAILED 
       && newStatus != JOB_TASK_STATUS_CANCELLING 
       && newStatus != JOB_TASK_STATUS_CANCELLED 
       && newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
52 53 54 55 56
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      
      break;
    case JOB_TASK_STATUS_PARTIAL_SUCCEED:
D
dapan1121 已提交
57 58
      if (newStatus != JOB_TASK_STATUS_EXECUTING 
       && newStatus != JOB_TASK_STATUS_SUCCEED
D
dapan1121 已提交
59 60 61
       && newStatus != JOB_TASK_STATUS_CANCELLED
       && newStatus != JOB_TASK_STATUS_FAILED
       && newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
62 63 64 65 66
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      
      break;
    case JOB_TASK_STATUS_SUCCEED:
D
dapan1121 已提交
67
      if (newStatus != JOB_TASK_STATUS_CANCELLED
D
dapan1121 已提交
68 69
       && newStatus != JOB_TASK_STATUS_DROPPING
       && newStatus != JOB_TASK_STATUS_FAILED) {
D
dapan1121 已提交
70 71 72 73
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
74
    case JOB_TASK_STATUS_FAILED:
D
dapan1121 已提交
75 76 77 78
      if (newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING) {
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      break;
H
Haojun Liao 已提交
79

D
dapan1121 已提交
80 81 82 83 84 85 86
    case JOB_TASK_STATUS_CANCELLING:
      if (newStatus != JOB_TASK_STATUS_CANCELLED) {
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      
      break;
    case JOB_TASK_STATUS_CANCELLED:
D
dapan1121 已提交
87
    case JOB_TASK_STATUS_DROPPING:
D
dapan1121 已提交
88 89 90
      if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
D
dapan1121 已提交
91 92
      break;
      
D
dapan1121 已提交
93
    default:
D
dapan1121 已提交
94
      QW_TASK_ELOG("invalid task origStatus:%s", jobTaskStatusStr(oriStatus));
D
dapan1121 已提交
95 96 97
      return TSDB_CODE_QRY_APP_ERROR;
  }

D
dapan1121 已提交
98
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
99

D
dapan1121 已提交
100
_return:
D
dapan1121 已提交
101

D
dapan1121 已提交
102
  QW_TASK_ELOG("invalid task status update from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan1121 已提交
103
  QW_RET(code);
D
dapan1121 已提交
104 105
}

D
dapan1121 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
void qwDbgDumpSchInfo(SQWSchStatus *sch, int32_t i) {

}

void qwDbgDumpMgmtInfo(SQWorkerMgmt *mgmt) {
  if (!gQWDebug.dumpEnable) {
    return;
  }

  QW_LOCK(QW_READ, &mgmt->schLock);
  
  QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash));

  void *key = NULL;
  size_t keyLen = 0;
  int32_t i = 0;
  SQWSchStatus *sch = NULL;

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
    sch = (SQWSchStatus *)pIter;
    qwDbgDumpSchInfo(sch, i);
    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

  QW_UNLOCK(QW_READ, &mgmt->schLock);

  QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash));
}
D
dapan1121 已提交
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172

char *qwPhaseStr(int32_t phase) {
  switch (phase) {
    case QW_PHASE_PRE_QUERY:
      return "PRE_QUERY";
    case QW_PHASE_POST_QUERY:
      return "POST_QUERY";
    case QW_PHASE_PRE_FETCH:
      return "PRE_FETCH";
    case QW_PHASE_POST_FETCH:
      return "POST_FETCH";
    case QW_PHASE_PRE_CQUERY:
      return "PRE_CQUERY";
    case QW_PHASE_POST_CQUERY:
      return "POST_CQUERY";
    default:
      break;
  }

  return "UNKNOWN";
}

char *qwBufStatusStr(int32_t bufStatus) {
  switch (bufStatus) {
    case DS_BUF_LOW:
      return "LOW";
    case DS_BUF_FULL:
      return "FULL";
    case DS_BUF_EMPTY:
      return "EMPTY";
    default:
      break;
  }

  return "UNKNOWN";
}

D
dapan1121 已提交
173
int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) {
D
dapan1121 已提交
174
  int32_t code = 0;
D
dapan1121 已提交
175
  int8_t origStatus = 0;
D
dapan1121 已提交
176
  bool ignore = false;
D
dapan1121 已提交
177 178 179 180

  while (true) {
    origStatus = atomic_load_8(&task->status);
    
D
dapan1121 已提交
181 182 183 184
    QW_ERR_RET(qwDbgValidateStatus(QW_FPARAMS(), origStatus, status, &ignore));
    if (ignore) {
      break;
    }
D
dapan1121 已提交
185 186 187
    
    if (origStatus != atomic_val_compare_exchange_8(&task->status, origStatus, status)) {
      continue;
D
dapan1121 已提交
188
    }
D
dapan1121 已提交
189
    
D
dapan1121 已提交
190
    QW_TASK_DLOG("task status updated from %s to %s", jobTaskStatusStr(origStatus), jobTaskStatusStr(status));
D
dapan1121 已提交
191 192

    break;
D
dapan1121 已提交
193
  }
D
dapan1121 已提交
194
  
D
dapan1121 已提交
195 196 197 198
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
199
int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType) {
D
dapan1121 已提交
200
  SQWSchStatus newSch = {0};
D
dapan1121 已提交
201 202
  newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (NULL == newSch.tasksHash) {
D
dapan1121 已提交
203 204
    QW_SCH_ELOG("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
205 206
  }

D
dapan1121 已提交
207 208 209 210 211
  QW_LOCK(QW_WRITE, &mgmt->schLock);
  int32_t code = taosHashPut(mgmt->schHash, &sId, sizeof(sId), &newSch, sizeof(newSch));
  if (0 != code) {
    if (!HASH_NODE_EXIST(code)) {
      QW_UNLOCK(QW_WRITE, &mgmt->schLock);
D
dapan1121 已提交
212
      
D
dapan1121 已提交
213 214 215
      QW_SCH_ELOG("taosHashPut new sch to scheduleHash failed, errno:%d", errno);
      taosHashCleanup(newSch.tasksHash);
      QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
216
    }
D
dapan1121 已提交
217 218

    taosHashCleanup(newSch.tasksHash);
D
dapan1121 已提交
219
  }
D
dapan1121 已提交
220
  QW_UNLOCK(QW_WRITE, &mgmt->schLock);
D
dapan1121 已提交
221

D
dapan1121 已提交
222
  return TSDB_CODE_SUCCESS;  
D
dapan1121 已提交
223 224
}

D
dapan1121 已提交
225
int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
D
dapan1121 已提交
226 227 228 229 230 231 232
  while (true) {
    QW_LOCK(rwType, &mgmt->schLock);
    *sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
    if (NULL == (*sch)) {
      QW_UNLOCK(rwType, &mgmt->schLock);
      
      if (QW_NOT_EXIST_ADD == nOpt) {
D
dapan1121 已提交
233
        QW_ERR_RET(qwAddSchedulerImpl(mgmt, sId, rwType));
D
dapan1121 已提交
234 235 236 237 238 239 240

        nOpt = QW_NOT_EXIST_RET_ERR;
        
        continue;
      } else if (QW_NOT_EXIST_RET_ERR == nOpt) {
        QW_RET(TSDB_CODE_QRY_SCH_NOT_EXIST);
      } else {
D
dapan1121 已提交
241
        QW_SCH_ELOG("unknown notExistOpt:%d", nOpt);
D
dapan1121 已提交
242
        QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
243
      }
D
dapan1121 已提交
244
    }
D
dapan1121 已提交
245 246

    break;
D
dapan1121 已提交
247 248 249 250 251
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
252 253
int32_t qwAcquireAddScheduler(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
  return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_ADD);
D
dapan1121 已提交
254 255
}

D
dapan1121 已提交
256 257
int32_t qwAcquireScheduler(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
  return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_RET_ERR);
D
dapan1121 已提交
258 259
}

D
dapan1121 已提交
260
void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) {
D
dapan1121 已提交
261 262 263
  QW_UNLOCK(rwType, &mgmt->schLock);
}

D
dapan1121 已提交
264

D
dapan1121 已提交
265
int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) {
D
dapan1121 已提交
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);

  QW_LOCK(rwType, &sch->tasksLock);
  *task = taosHashGet(sch->tasksHash, id, sizeof(id));
  if (NULL == (*task)) {
    QW_UNLOCK(rwType, &sch->tasksLock);
    QW_ERR_RET(TSDB_CODE_QRY_TASK_NOT_EXIST);
  }

  return TSDB_CODE_SUCCESS;
}



D
dapan1121 已提交
281
int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) {
D
dapan1121 已提交
282 283
  int32_t code = 0;

D
dapan1121 已提交
284 285
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
D
dapan1121 已提交
286

D
dapan1121 已提交
287 288
  SQWTaskStatus ntask = {0};
  ntask.status = status;
D
dapan1121 已提交
289
  ntask.refId = rId;
D
dapan1121 已提交
290 291 292 293 294 295

  QW_LOCK(QW_WRITE, &sch->tasksLock);
  code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask));
  if (0 != code) {
    QW_UNLOCK(QW_WRITE, &sch->tasksLock);
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
296 297
      if (rwType && task) {
        QW_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
D
dapan1121 已提交
298
      } else {
D
dapan1121 已提交
299
        QW_TASK_ELOG("task status already exist, newStatus:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
300
        QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
D
dapan1121 已提交
301 302
      }
    } else {
D
dapan1121 已提交
303
      QW_TASK_ELOG("taosHashPut to tasksHash failed, error:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
304
      QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
305 306 307
    }
  }
  QW_UNLOCK(QW_WRITE, &sch->tasksLock);
D
dapan1121 已提交
308

D
dapan1121 已提交
309 310
  QW_TASK_DLOG("task status added, newStatus:%s", jobTaskStatusStr(status));

D
dapan1121 已提交
311 312
  if (rwType && task) {
    QW_ERR_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
D
dapan1121 已提交
313 314
  }

D
dapan1121 已提交
315 316
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
317

D
dapan1121 已提交
318
int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status) {
D
dapan1121 已提交
319 320
  SQWSchStatus *tsch = NULL;
  int32_t code = 0;
D
dapan1121 已提交
321
  QW_ERR_RET(qwAcquireAddScheduler(mgmt, sId, QW_READ, &tsch));
D
dapan1121 已提交
322

D
dapan1121 已提交
323
  QW_ERR_JRET(qwAddTaskStatusImpl(QW_FPARAMS(), tsch, 0, status, NULL));
D
dapan1121 已提交
324 325 326 327

_return:

  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
328 329
  
  QW_RET(code);
D
dapan1121 已提交
330 331 332
}


D
dapan1121 已提交
333
int32_t qwAddAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, int32_t status, SQWTaskStatus **task) {
D
dapan1121 已提交
334
  return qwAddTaskStatusImpl(QW_FPARAMS(), sch, rwType, status, task);
D
dapan1121 已提交
335
}
D
dapan1121 已提交
336 337


D
dapan1121 已提交
338
void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) {
D
dapan1121 已提交
339
  QW_UNLOCK(rwType, &sch->tasksLock);
D
dapan1121 已提交
340 341
}

D
dapan1121 已提交
342

D
dapan1121 已提交
343
int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
D
dapan1121 已提交
344 345
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
H
Haojun Liao 已提交
346

D
dapan1121 已提交
347
  *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
D
dapan1121 已提交
348
  if (NULL == (*ctx)) {
D
dapan1121 已提交
349
    QW_TASK_DLOG_E("task ctx not exist, may be dropped");
D
dapan1121 已提交
350
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
351 352 353 354 355
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
356
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
D
dapan1121 已提交
357 358 359 360 361
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
  
  *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
  if (NULL == (*ctx)) {
D
dapan1121 已提交
362
    QW_TASK_DLOG_E("task ctx not exist, may be dropped");
D
dapan1121 已提交
363
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
364 365 366 367 368
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
369
int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) {
D
dapan1121 已提交
370 371 372
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);

D
dapan1121 已提交
373
  SQWTaskCtx nctx = {0};
D
dapan1121 已提交
374

D
dapan1121 已提交
375
  int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &nctx, sizeof(SQWTaskCtx));
D
dapan1121 已提交
376 377
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
378
      if (acquire && ctx) {
D
dapan1121 已提交
379
        QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
380 381
      } else if (ctx) {
        QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
382
      } else {
D
dapan1121 已提交
383
        QW_TASK_ELOG_E("task ctx already exist");
D
dapan1121 已提交
384 385 386
        QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
      }
    } else {
D
dapan1121 已提交
387
      QW_TASK_ELOG("taosHashPut to ctxHash failed, error:%x", code);
D
dapan1121 已提交
388 389 390 391
      QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

D
dapan1121 已提交
392
  if (acquire && ctx) {
D
dapan1121 已提交
393
    QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
394 395
  } else if (ctx) {
    QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
396 397 398 399 400
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
401
int32_t qwAddTaskCtx(QW_FPARAMS_DEF) {
D
dapan1121 已提交
402
  QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, NULL));
D
dapan1121 已提交
403 404
}

D
dapan1121 已提交
405
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
D
dapan1121 已提交
406
  return qwAddTaskCtxImpl(QW_FPARAMS(), true, ctx);
D
dapan1121 已提交
407 408
}

D
dapan1121 已提交
409 410
void qwReleaseTaskCtx(SQWorkerMgmt *mgmt, void *ctx) {
  taosHashRelease(mgmt->ctxHash, ctx);
D
dapan1121 已提交
411 412
}

D
dapan1121 已提交
413
void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) {  
D
dapan1121 已提交
414
  // Note: free/kill may in RC
D
dapan1121 已提交
415 416 417
  qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
  if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
    qDestroyTask(otaskHandle);
D
dapan1121 已提交
418 419
  }
}
D
dapan1121 已提交
420

D
dapan1121 已提交
421 422
int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
  int32_t code = 0;
D
dapan1121 已提交
423
  // Note: free/kill may in RC
D
dapan1121 已提交
424 425
  qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
  if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
D
dapan1121 已提交
426
    code = qAsyncKillTask(taskHandle);
D
dapan1121 已提交
427
    atomic_store_ptr(&ctx->taskHandle, taskHandle);
D
dapan1121 已提交
428
  }
D
dapan1121 已提交
429

D
dapan1121 已提交
430 431 432
  QW_RET(code);
}

D
dapan1121 已提交
433

D
dapan1121 已提交
434
void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
D
dapan1121 已提交
435 436 437 438
  tmsgReleaseHandle(ctx->ctrlConnInfo.handle, TAOS_CONN_SERVER);
  ctx->ctrlConnInfo.handle = NULL;

  // NO need to release dataConnInfo
D
dapan1121 已提交
439

D
dapan1121 已提交
440
  qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle);
D
dapan1121 已提交
441 442 443 444
  
  if (ctx->sinkHandle) {
    dsDestroyDataSinker(ctx->sinkHandle);
    ctx->sinkHandle = NULL;
D
dapan1121 已提交
445
  }
D
dapan1121 已提交
446
}
D
dapan1121 已提交
447 448


D
dapan1121 已提交
449
int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
D
dapan1121 已提交
450 451 452
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
  SQWTaskCtx octx;
D
dapan1121 已提交
453

D
dapan1121 已提交
454 455
  SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
  if (NULL == ctx) {
D
dapan1121 已提交
456
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
457
  }
D
dapan1121 已提交
458

D
dapan1121 已提交
459
  octx = *ctx;
D
dapan1121 已提交
460

D
dapan1121 已提交
461 462 463
  atomic_store_ptr(&ctx->taskHandle, NULL);
  atomic_store_ptr(&ctx->sinkHandle, NULL);

D
dapan1121 已提交
464 465
  QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP);

D
dapan1121 已提交
466
  if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
D
dapan1121 已提交
467
    QW_TASK_ELOG_E("taosHashRemove from ctx hash failed");    
D
dapan1121 已提交
468
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
469 470
  }

D
dapan1121 已提交
471
  qwFreeTask(QW_FPARAMS(), &octx);
D
dapan1121 已提交
472 473

  QW_TASK_DLOG_E("task ctx dropped");
D
dapan1121 已提交
474
  
D
dapan1121 已提交
475 476 477
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
478
int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
D
dapan1121 已提交
479
  SQWSchStatus *sch = NULL;
D
dapan1121 已提交
480 481 482 483 484
  SQWTaskStatus *task = NULL;
  int32_t code = 0;
  
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
D
dapan1121 已提交
485

D
dapan1121 已提交
486
  if (qwAcquireScheduler(mgmt, sId, QW_WRITE, &sch)) {
D
dapan1121 已提交
487
    QW_TASK_WLOG_E("scheduler does not exist");
D
dapan1121 已提交
488 489
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
490

D
dapan1121 已提交
491 492 493
  if (qwAcquireTaskStatus(QW_FPARAMS(), QW_WRITE, sch, &task)) {
    qwReleaseScheduler(QW_WRITE, mgmt);
    
D
dapan1121 已提交
494
    QW_TASK_WLOG_E("task does not exist");
D
dapan1121 已提交
495 496
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
497

D
dapan1121 已提交
498
  if (taosHashRemove(sch->tasksHash, id, sizeof(id))) {
D
dapan1121 已提交
499
    QW_TASK_ELOG_E("taosHashRemove task from hash failed");
D
dapan1121 已提交
500 501 502
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

D
dapan1121 已提交
503
  QW_TASK_DLOG_E("task status dropped");
D
dapan1121 已提交
504 505 506

_return:

D
dapan1121 已提交
507 508 509
  if (task) {
    qwReleaseTaskStatus(QW_WRITE, sch);
  }
D
dapan1121 已提交
510 511 512 513 514
  qwReleaseScheduler(QW_WRITE, mgmt);
  
  QW_RET(code);
}

D
dapan1121 已提交
515
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) {
D
dapan1121 已提交
516 517
  SQWSchStatus *sch = NULL;
  SQWTaskStatus *task = NULL;
D
dapan1121 已提交
518 519
  int32_t code = 0;

D
dapan1121 已提交
520
  QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch));
D
dapan1121 已提交
521
  QW_ERR_JRET(qwAcquireTaskStatus(QW_FPARAMS(), QW_READ, sch, &task));
D
dapan1121 已提交
522

D
dapan1121 已提交
523
  QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status));
D
dapan1121 已提交
524
  
D
dapan1121 已提交
525 526
_return:

D
dapan1121 已提交
527 528 529
  if (task) {
    qwReleaseTaskStatus(QW_READ, sch);
  }
D
dapan1121 已提交
530
  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
531 532 533 534

  QW_RET(code);
}

D
dapan1121 已提交
535
int32_t qwDropTask(QW_FPARAMS_DEF) {
H
Haojun Liao 已提交
536
  QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS()));
D
dapan1121 已提交
537
  QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS()));
H
Haojun Liao 已提交
538

D
dapan1121 已提交
539 540
  QW_TASK_DLOG_E("task is dropped");

D
dapan1121 已提交
541 542 543
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
544 545 546 547 548 549 550 551 552 553

int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
  qTaskInfo_t *taskHandle = &ctx->taskHandle; 

  if (TASK_TYPE_TEMP == ctx->taskType) {
    if (ctx->explain) {
      SExplainExecInfo *execInfo = NULL;
      int32_t resNum = 0;
      QW_ERR_RET(qGetExplainExecInfo(ctx->taskHandle, &resNum, &execInfo));

D
dapan1121 已提交
554 555 556 557
      SQWConnInfo connInfo = {0};
      connInfo.handle = ctx->ctrlConnInfo.handle;
      
      QW_ERR_RET(qwBuildAndSendExplainRsp(&connInfo, execInfo, resNum));
D
dapan1121 已提交
558 559 560 561 562 563 564 565 566
    }
    
    qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
  }

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
567
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
D
dapan1121 已提交
568 569 570 571
  int32_t code = 0;
  bool  qcontinue = true;
  SSDataBlock* pRes = NULL;
  uint64_t useconds = 0;
D
dapan1121 已提交
572
  int32_t i = 0;
D
dapan1121 已提交
573 574 575
  int32_t execNum = 0;
  qTaskInfo_t *taskHandle = &ctx->taskHandle; 
  DataSinkHandle sinkHandle = ctx->sinkHandle;
D
dapan1121 已提交
576 577
 
  while (true) {
H
Haojun Liao 已提交
578
    QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
D
dapan1121 已提交
579

D
dapan1121 已提交
580
    code = qExecTask(*taskHandle, &pRes, &useconds);
D
dapan1121 已提交
581
    if (code) {
D
dapan1121 已提交
582 583
      QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
      QW_ERR_RET(code);
D
dapan1121 已提交
584 585
    }

D
dapan1121 已提交
586 587
    ++execNum;

D
dapan1121 已提交
588
    if (NULL == pRes) {
D
dapan1121 已提交
589
      QW_TASK_DLOG("qExecTask end with empty res, useconds:%"PRIu64, useconds);
H
Haojun Liao 已提交
590

D
dapan1121 已提交
591
      dsEndPut(sinkHandle, useconds);
D
dapan1121 已提交
592 593

      QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
D
dapan1121 已提交
594 595 596 597

      if (queryEnd) {
        *queryEnd = true;
      }
D
dapan1121 已提交
598
      
D
dapan1121 已提交
599 600 601
      break;
    }

D
dapan1121 已提交
602
    int32_t rows = pRes->info.rows;
H
Haojun Liao 已提交
603

604 605
    ASSERT(pRes->info.rows > 0);

H
Haojun Liao 已提交
606
    SInputData inputData = {.pData = pRes};
D
dapan1121 已提交
607 608
    code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
    if (code) {
D
dapan1121 已提交
609 610
      QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code));
      QW_ERR_RET(code);
D
dapan1121 已提交
611
    }
D
dapan1121 已提交
612

D
dapan1121 已提交
613
    QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", rows, qcontinue);
D
dapan1121 已提交
614
    
D
dapan1121 已提交
615 616 617 618
    if (!qcontinue) {
      break;
    }

D
dapan1121 已提交
619 620 621 622 623
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) {
      break;
    }

    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
624 625
      break;
    }
D
dapan1121 已提交
626

D
dapan1121 已提交
627 628 629
    if (atomic_load_32(&ctx->rspCode)) {
      break;
    }
D
dapan1121 已提交
630 631 632 633
  }

  QW_RET(code);
}
D
dapan1121 已提交
634

D
dapan1121 已提交
635
int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
D
dapan1121 已提交
636 637
  int32_t taskNum = 0;

D
dapan1121 已提交
638
  hbInfo->connInfo = sch->hbConnInfo;
D
dapan1121 已提交
639
  hbInfo->rsp.epId = sch->hbEpId;
D
dapan1121 已提交
640

D
dapan1121 已提交
641 642 643
  QW_LOCK(QW_READ, &sch->tasksLock);
  
  taskNum = taosHashGetSize(sch->tasksHash);
D
dapan1121 已提交
644 645 646

  hbInfo->rsp.taskStatus = taosArrayInit(taskNum, sizeof(STaskStatus));
  if (NULL == hbInfo->rsp.taskStatus) {
D
dapan1121 已提交
647
    QW_UNLOCK(QW_READ, &sch->tasksLock);
D
dapan1121 已提交
648
    QW_ELOG("taosArrayInit taskStatus failed, num:%d", taskNum);
D
dapan1121 已提交
649 650 651 652 653 654
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }

  void *key = NULL;
  size_t keyLen = 0;
  int32_t i = 0;
D
dapan1121 已提交
655
  STaskStatus status = {0};
D
dapan1121 已提交
656 657 658 659

  void *pIter = taosHashIterate(sch->tasksHash, NULL);
  while (pIter) {
    SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
D
dapan1121 已提交
660
    key = taosHashGetKey(pIter, &keyLen);
D
dapan1121 已提交
661 662 663

    //TODO GET EXECUTOR API TO GET MORE INFO

D
dapan1121 已提交
664 665 666 667 668
    QW_GET_QTID(key, status.queryId, status.taskId);
    status.status = taskStatus->status;
    status.refId = taskStatus->refId;
    
    taosArrayPush(hbInfo->rsp.taskStatus, &status);
D
dapan1121 已提交
669 670 671 672 673 674 675 676 677 678
    
    ++i;
    pIter = taosHashIterate(sch->tasksHash, pIter);
  }  

  QW_UNLOCK(QW_READ, &sch->tasksLock);

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
679 680

int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
D
dapan1121 已提交
681 682 683
  int32_t len = 0;
  SRetrieveTableRsp *rsp = NULL;
  bool queryEnd = false;
D
dapan1121 已提交
684 685
  int32_t code = 0;

D
dapan1121 已提交
686
  dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
D
dapan1121 已提交
687

D
dapan1121 已提交
688 689 690 691
  if (len < 0) {
    QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
D
dapan1121 已提交
692

D
dapan1121 已提交
693 694
  if (len == 0) {
    if (queryEnd) {
D
dapan 已提交
695
      code = dsGetDataBlock(ctx->sinkHandle, pOutput);
D
dapan1121 已提交
696
      if (code) {
D
dapan1121 已提交
697
        QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
698 699 700
        QW_ERR_RET(code);
      }
    
D
dapan1121 已提交
701
      QW_TASK_DLOG_E("no data in sink and query end");
H
Haojun Liao 已提交
702

D
dapan1121 已提交
703
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
L
Liu Jicong 已提交
704
      QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
705

D
dapan1121 已提交
706
      *rspMsg = rsp;
D
dapan 已提交
707
      *dataLen = 0;
D
dapan1121 已提交
708 709
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
710 711

    pOutput->bufStatus = DS_BUF_EMPTY;
D
dapan1121 已提交
712

D
dapan1121 已提交
713
    return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
714
  }
D
dapan1121 已提交
715

D
dapan1121 已提交
716
  // Got data from sink
D
dapan1121 已提交
717
  QW_TASK_DLOG("there are data in sink, dataLength:%d", len);
D
dapan1121 已提交
718

D
dapan 已提交
719
  *dataLen = len;
D
dapan1121 已提交
720
  
D
dapan1121 已提交
721
  QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
D
dapan 已提交
722
  *rspMsg = rsp;
D
dapan1121 已提交
723
  
D
dapan 已提交
724 725
  pOutput->pData = rsp->data;
  code = dsGetDataBlock(ctx->sinkHandle, pOutput);
D
dapan1121 已提交
726
  if (code) {
D
dapan1121 已提交
727
    QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
728 729
    QW_ERR_RET(code);
  }
D
dapan1121 已提交
730

D
dapan1121 已提交
731
  if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
D
dapan1121 已提交
732
    QW_TASK_DLOG_E("task all data fetched, done");
D
dapan1121 已提交
733
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
734 735
  }

D
dapan1121 已提交
736
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
737 738
}

D
dapan1121 已提交
739
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
D
dapan1121 已提交
740 741
  int32_t code = 0;
  SQWTaskCtx *ctx = NULL;
D
dapan1121 已提交
742 743
  SQWConnInfo *dropConnection = NULL;
  SQWConnInfo *cancelConnection = NULL;
D
dapan1121 已提交
744

D
dapan1121 已提交
745
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
746 747 748 749 750 751 752 753

  if (QW_PHASE_PRE_QUERY == phase) {
    QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
  } else {
    QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
  }
  
  QW_LOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
754

D
dapan1121 已提交
755
  if (QW_PHASE_PRE_FETCH == phase) {
X
Xiaoyu Wang 已提交
756
    atomic_store_8((int8_t*)&ctx->queryFetched, true);
D
dapan1121 已提交
757
  } else {
D
dapan1121 已提交
758 759
    atomic_store_8(&ctx->phase, phase);
  }
D
dapan1121 已提交
760

X
Xiaoyu Wang 已提交
761
  if (atomic_load_8((int8_t*)&ctx->queryEnd)) {
D
dapan1121 已提交
762 763 764
    QW_TASK_ELOG_E("query already end");
    QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
  }
D
dapan1121 已提交
765

D
dapan1121 已提交
766 767
  switch (phase) {
    case QW_PHASE_PRE_QUERY: {
D
dapan1121 已提交
768
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
H
Haojun Liao 已提交
769
        QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
770
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
D
dapan1121 已提交
771 772
        break;
      }
D
dapan1121 已提交
773

D
dapan1121 已提交
774
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
775
        dropConnection = &ctx->ctrlConnInfo;
D
dapan1121 已提交
776
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
777
        dropConnection = NULL;
778

D
dapan1121 已提交
779 780
        qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
781

D
dapan1121 已提交
782
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan 已提交
783
        break;
D
dapan1121 已提交
784
      }
D
dapan1121 已提交
785

D
dapan1121 已提交
786
      QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
D
dapan1121 已提交
787 788
      break;
    }
D
dapan1121 已提交
789
    case QW_PHASE_PRE_FETCH: {
D
dapan1121 已提交
790 791
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
        QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
792 793
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
      }
D
dapan1121 已提交
794

D
dapan1121 已提交
795
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
796
        QW_TASK_WLOG("last fetch still not processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
797 798 799 800
        QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
      }

      if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) {
D
dapan1121 已提交
801
        QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
802
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
D
dapan1121 已提交
803 804 805
      }
      break;
    }    
D
dapan1121 已提交
806 807
    case QW_PHASE_PRE_CQUERY: {
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
808
        QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
809
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
810
      }
D
dapan1121 已提交
811

D
dapan1121 已提交
812
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
813
        dropConnection = &ctx->ctrlConnInfo;
D
dapan1121 已提交
814
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
815
        dropConnection = NULL;
H
Haojun Liao 已提交
816

D
dapan1121 已提交
817 818
        qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
819

D
dapan1121 已提交
820
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
821
      }
D
dapan1121 已提交
822

D
dapan1121 已提交
823
      break;
D
dapan1121 已提交
824 825 826 827 828 829 830 831 832
    }
    default:
      QW_TASK_ELOG("invalid phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

  if (ctx->rspCode) {
    QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode));
    QW_ERR_JRET(ctx->rspCode);
D
dapan1121 已提交
833
  }
D
dapan1121 已提交
834

D
dapan1121 已提交
835
_return:
D
dapan1121 已提交
836

D
dapan1121 已提交
837
  if (ctx) {
D
dapan1121 已提交
838
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
839
    
D
dapan1121 已提交
840
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
841 842
    qwReleaseTaskCtx(mgmt, ctx);
  }
D
dapan1121 已提交
843

D
dapan1121 已提交
844
  if (dropConnection) {
S
shm  
Shengliang Guan 已提交
845
    qwBuildAndSendDropRsp(dropConnection, code);
D
dapan1121 已提交
846
    QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", dropConnection->handle, code, tstrerror(code));
D
dapan1121 已提交
847
  }
D
dapan1121 已提交
848

D
dapan1121 已提交
849
  if (cancelConnection) {
S
shm  
Shengliang Guan 已提交
850
    qwBuildAndSendCancelRsp(cancelConnection, code);
D
dapan1121 已提交
851
    QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code));
D
dapan1121 已提交
852 853
  }

D
dapan1121 已提交
854
  QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
D
dapan1121 已提交
855 856 857 858 859 860 861 862

  QW_RET(code);
}


int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
  int32_t code = 0;
  SQWTaskCtx *ctx = NULL;
D
dapan1121 已提交
863
  SQWConnInfo connInfo = {0};
D
dapan1121 已提交
864
  SQWConnInfo *readyConnection = NULL;
D
dapan1121 已提交
865

D
dapan1121 已提交
866
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
867 868 869 870 871 872
  
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
  
  QW_LOCK(QW_WRITE, &ctx->lock);

  if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
873
    QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
874 875 876 877
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
  }

  if (QW_PHASE_POST_QUERY == phase) {
D
dapan1121 已提交
878
#if 0    
D
dapan1121 已提交
879
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
D
dapan1121 已提交
880
      readyConnection = &ctx->connInfo;
D
dapan1121 已提交
881 882
      QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
    }
D
dapan1121 已提交
883
#else
D
dapan1121 已提交
884
    connInfo.handle = ctx->ctrlConnInfo.handle;
D
dapan1121 已提交
885 886 887 888
    readyConnection = &connInfo;
    
    QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
#endif
D
dapan1121 已提交
889 890
  }

D
dapan1121 已提交
891
  if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
892 893 894 895
    if (QW_PHASE_POST_FETCH == phase) {
      QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
D
dapan1121 已提交
896

D
dapan1121 已提交
897 898
    qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
    QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
899

D
dapan1121 已提交
900 901
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
902 903 904
  }

  if (ctx->rspCode) {
D
dapan1121 已提交
905 906
    QW_TASK_ELOG("task already failed, phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode));
    QW_ERR_JRET(ctx->rspCode);
D
dapan1121 已提交
907 908
  }      

D
dapan1121 已提交
909
  QW_ERR_JRET(input->code);
D
dapan1121 已提交
910 911 912

_return:

D
dapan1121 已提交
913 914 915 916
  if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED);
  }

D
dapan1121 已提交
917
  if (ctx) {
D
dapan1121 已提交
918
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
919

D
dapan1121 已提交
920 921 922
    if (QW_PHASE_POST_FETCH != phase) {
      atomic_store_8(&ctx->phase, phase);
    }
D
dapan1121 已提交
923
    
D
dapan1121 已提交
924
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
925
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
926 927
  }

D
dapan1121 已提交
928
  if (readyConnection) {
S
shm  
Shengliang Guan 已提交
929
    qwBuildAndSendReadyRsp(readyConnection, code);    
D
dapan1121 已提交
930
    QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code));
D
dapan1121 已提交
931 932
  }

D
dapan1121 已提交
933 934
  if (code) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
D
dapan1121 已提交
935 936
  }

D
dapan1121 已提交
937
  QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
D
dapan1121 已提交
938

D
dapan1121 已提交
939 940 941
  QW_RET(code);
}

D
dapan1121 已提交
942
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) {
D
dapan1121 已提交
943 944 945 946
  int32_t code = 0;
  bool queryRsped = false;
  struct SSubplan *plan = NULL;
  SQWPhaseInput input = {0};
D
dapan1121 已提交
947 948
  qTaskInfo_t pTaskInfo = NULL;
  DataSinkHandle sinkHandle = NULL;
D
dapan1121 已提交
949
  SQWTaskCtx *ctx = NULL;
D
dapan1121 已提交
950

D
dapan1121 已提交
951 952
  QW_ERR_JRET(qwRegisterBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));

D
dapan1121 已提交
953
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
D
dapan1121 已提交
954 955 956 957

  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
  
  atomic_store_8(&ctx->taskType, taskType);
D
dapan1121 已提交
958
  atomic_store_8(&ctx->explain, explain);
X
Xiaoyu Wang 已提交
959

D
dapan1121 已提交
960 961
  atomic_store_ptr(&ctx->ctrlConnInfo.handle, qwMsg->connInfo.handle);
  atomic_store_ptr(&ctx->ctrlConnInfo.ahandle, qwMsg->connInfo.ahandle);
D
dapan1121 已提交
962 963

  QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
X
Xiaoyu Wang 已提交
964

D
dapan1121 已提交
965 966
  code = qStringToSubplan(qwMsg->msg, &plan);
  if (TSDB_CODE_SUCCESS != code) {
D
dapan1121 已提交
967
    QW_TASK_ELOG("task string to subplan failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
968
    QW_ERR_JRET(code);
D
dapan1121 已提交
969
  }
D
dapan1121 已提交
970
  
971
  code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
972
  if (code) {
D
dapan1121 已提交
973
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
974
    QW_ERR_JRET(code);
D
dapan1121 已提交
975
  }
D
dapan1121 已提交
976

H
Haojun Liao 已提交
977
  if (NULL == sinkHandle || NULL == pTaskInfo) {
D
dapan1121 已提交
978 979 980 981
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

D
dapan1121 已提交
982 983
  //QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
  //QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
984

D
dapan1121 已提交
985
  //queryRsped = true;
D
dapan1121 已提交
986

D
dapan1121 已提交
987 988 989
  atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
  atomic_store_ptr(&ctx->sinkHandle, sinkHandle);

D
dapan1121 已提交
990
  if (pTaskInfo && sinkHandle) {
D
dapan1121 已提交
991
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
D
dapan1121 已提交
992
  }
D
dapan1121 已提交
993
  
D
dapan1121 已提交
994 995
_return:

D
dapan1121 已提交
996 997
  input.code = code;
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
998

D
dapan1121 已提交
999 1000 1001 1002
  //if (!queryRsped) {
  //  qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
  //  QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
  //}
D
dapan1121 已提交
1003

D
dapan1121 已提交
1004
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1005 1006
}

D
dapan1121 已提交
1007
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
D
dapan1121 已提交
1008 1009
  int32_t code = 0;
  SQWTaskCtx *ctx = NULL;
D
dapan1121 已提交
1010
  int8_t phase = 0;
D
dapan1121 已提交
1011
  bool needRsp = true;
D
dapan1121 已提交
1012 1013

  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
1014

D
dapan1121 已提交
1015 1016
  QW_LOCK(QW_WRITE, &ctx->lock);

D
dapan1121 已提交
1017 1018 1019
  if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
    QW_TASK_WLOG_E("task is dropping or already dropped");
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
1020
  }
D
dapan1121 已提交
1021
  
D
dapan1121 已提交
1022
  if (ctx->phase == QW_PHASE_PRE_QUERY) {
D
dapan1121 已提交
1023 1024
    ctx->ctrlConnInfo.handle == qwMsg->connInfo.handle;
    ctx->ctrlConnInfo.ahandle = qwMsg->connInfo.ahandle;
D
dapan1121 已提交
1025
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY);
D
dapan1121 已提交
1026 1027 1028 1029 1030 1031 1032
    needRsp = false;
    QW_TASK_DLOG_E("ready msg will not rsp now");
    goto _return;
  }

  QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);

X
Xiaoyu Wang 已提交
1033 1034
  if (atomic_load_8((int8_t*)&ctx->queryEnd) || atomic_load_8((int8_t*)&ctx->queryFetched)) {
    QW_TASK_ELOG("got ready msg at wrong status, queryEnd:%d, queryFetched:%d", atomic_load_8((int8_t*)&ctx->queryEnd), atomic_load_8((int8_t*)&ctx->queryFetched));
D
dapan1121 已提交
1035 1036 1037
    QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
  }

D
dapan1121 已提交
1038 1039 1040
  if (ctx->phase == QW_PHASE_POST_QUERY) {
    code = ctx->rspCode;
    goto _return;
D
dapan1121 已提交
1041 1042
  }

D
dapan1121 已提交
1043
  QW_TASK_ELOG("invalid phase when got ready msg, phase:%s", qwPhaseStr(ctx->phase));
H
Haojun Liao 已提交
1044

D
dapan1121 已提交
1045
  QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
D
dapan1121 已提交
1046 1047 1048

_return:

D
dapan1121 已提交
1049
  if (code && ctx) {
D
dapan1121 已提交
1050
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
1051 1052
  }

D
dapan1121 已提交
1053 1054 1055
  if (code) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
  }
H
Haojun Liao 已提交
1056

D
dapan1121 已提交
1057 1058
  if (ctx) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
1059
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
1060 1061
  }

D
dapan1121 已提交
1062
  if (needRsp) {
S
shm  
Shengliang Guan 已提交
1063
    qwBuildAndSendReadyRsp(&qwMsg->connInfo, code);
D
dapan1121 已提交
1064
    QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
1065 1066
  }

D
dapan1121 已提交
1067
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1068 1069 1070
}


D
dapan1121 已提交
1071
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
D
dapan1121 已提交
1072
  SQWTaskCtx *ctx = NULL;
D
dapan1121 已提交
1073
  int32_t code = 0;
1074 1075 1076
  SQWPhaseInput input = {0};
  void *rsp = NULL;
  int32_t dataLen = 0;
D
dapan1121 已提交
1077
  bool queryEnd = false;
D
dapan1121 已提交
1078 1079
  
  do {
D
dapan1121 已提交
1080
    QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
D
dapan1121 已提交
1081

D
dapan1121 已提交
1082
    QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
1083

X
Xiaoyu Wang 已提交
1084 1085
    atomic_store_8((int8_t*)&ctx->queryInQueue, 0);
    atomic_store_8((int8_t*)&ctx->queryContinue, 0);
D
dapan1121 已提交
1086

D
dapan1121 已提交
1087
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd));
D
dapan1121 已提交
1088

D
dapan1121 已提交
1089 1090 1091 1092 1093
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
      SOutputData sOutput = {0};
      QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
      
      if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {    
D
dapan1121 已提交
1094
        QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
D
dapan1121 已提交
1095
        
X
Xiaoyu Wang 已提交
1096
        atomic_store_8((int8_t*)&ctx->queryContinue, 1);
1097
      }
D
dapan1121 已提交
1098 1099
      
      if (rsp) {
D
dapan1121 已提交
1100
        bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
D
dapan1121 已提交
1101
        
D
dapan1121 已提交
1102
        qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
1103 1104 1105
        if (qComplete) {
          atomic_store_8((int8_t*)&ctx->queryEnd, true);
        }
H
Haojun Liao 已提交
1106

D
dapan1121 已提交
1107
        qwMsg->connInfo = ctx->dataConnInfo;
D
dapan1121 已提交
1108
        QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);            
1109
        
S
shm  
Shengliang Guan 已提交
1110
        qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);                
D
dapan1121 已提交
1111
        QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
D
dapan1121 已提交
1112
      } else {
X
Xiaoyu Wang 已提交
1113
        atomic_store_8((int8_t*)&ctx->queryContinue, 1);
1114 1115 1116
      }
    }

D
dapan1121 已提交
1117
_return:
1118

D
dapan1121 已提交
1119 1120 1121 1122
    if (NULL == ctx) {
      break;
    }

D
dapan1121 已提交
1123
    if (code && QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
1124
      QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);    
1125 1126
      qwFreeFetchRsp(rsp);
      rsp = NULL;
D
dapan1121 已提交
1127
      
D
dapan1121 已提交
1128
      qwMsg->connInfo = ctx->dataConnInfo;
S
shm  
Shengliang Guan 已提交
1129
      qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code);
D
dapan1121 已提交
1130
      QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0);
1131
    }
D
dapan1121 已提交
1132

D
dapan1121 已提交
1133
    QW_LOCK(QW_WRITE, &ctx->lock);
X
Xiaoyu Wang 已提交
1134
    if (queryEnd || code || 0 == atomic_load_8((int8_t*)&ctx->queryContinue)) {
D
dapan1121 已提交
1135
      // Note: if necessary, fetch need to put cquery to queue again
D
dapan1121 已提交
1136 1137 1138 1139 1140 1141
      atomic_store_8(&ctx->phase, 0);
      QW_UNLOCK(QW_WRITE,&ctx->lock);
      break;
    }
    QW_UNLOCK(QW_WRITE,&ctx->lock);
  } while (true);
D
dapan1121 已提交
1142

D
dapan1121 已提交
1143
  input.code = code;
D
dapan1121 已提交
1144 1145 1146
  qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL);    

  QW_RET(TSDB_CODE_SUCCESS);  
D
dapan1121 已提交
1147
}
D
dapan1121 已提交
1148

D
dapan1121 已提交
1149

D
dapan1121 已提交
1150
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
D
dapan1121 已提交
1151
  int32_t code = 0;
D
dapan1121 已提交
1152 1153 1154 1155 1156
  int32_t dataLen = 0;
  bool locked = false;
  SQWTaskCtx *ctx = NULL;
  void *rsp = NULL;
  SQWPhaseInput input = {0};
D
dapan1121 已提交
1157

D
dapan1121 已提交
1158
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
1159

1160
  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
1161
  
D
dapan 已提交
1162 1163
  SOutputData sOutput = {0};
  QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
D
dapan1121 已提交
1164

1165
  if (NULL == rsp) {
D
dapan1121 已提交
1166 1167
    atomic_store_ptr(&ctx->dataConnInfo.handle, qwMsg->connInfo.handle);
    atomic_store_ptr(&ctx->dataConnInfo.ahandle, qwMsg->connInfo.ahandle);
D
dapan1121 已提交
1168
    
1169
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
D
dapan1121 已提交
1170
  } else {
D
dapan1121 已提交
1171
    bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
D
dapan1121 已提交
1172
    
D
dapan1121 已提交
1173
    qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
1174 1175 1176
    if (qComplete) {
      atomic_store_8((int8_t*)&ctx->queryEnd, true);
    }
D
dapan1121 已提交
1177 1178
  }

D
dapan1121 已提交
1179
  if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {    
D
dapan1121 已提交
1180
    QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
D
dapan1121 已提交
1181

D
dapan1121 已提交
1182 1183
    QW_LOCK(QW_WRITE, &ctx->lock);
    locked = true;
1184

D
dapan1121 已提交
1185
    // RC WARNING
D
dapan1121 已提交
1186
    if (QW_IS_QUERY_RUNNING(ctx)) {
X
Xiaoyu Wang 已提交
1187 1188
      atomic_store_8((int8_t*)&ctx->queryContinue, 1);
    } else if (0 == atomic_load_8((int8_t*)&ctx->queryInQueue)) {
H
Haojun Liao 已提交
1189
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING);
D
dapan1121 已提交
1190

X
Xiaoyu Wang 已提交
1191
      atomic_store_8((int8_t*)&ctx->queryInQueue, 1);
1192
      
D
dapan1121 已提交
1193
      QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
1194
    }
D
dapan 已提交
1195 1196
  }
  
D
dapan1121 已提交
1197
_return:
D
dapan1121 已提交
1198

D
dapan1121 已提交
1199 1200 1201 1202 1203
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

  input.code = code;
D
dapan1121 已提交
1204
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL);
D
dapan1121 已提交
1205

D
dapan 已提交
1206 1207 1208
  if (code) {
    qwFreeFetchRsp(rsp);
    rsp = NULL;
D
dapan1121 已提交
1209
    dataLen = 0;
D
dapan1121 已提交
1210 1211 1212
  }

  if (code || rsp) {
S
shm  
Shengliang Guan 已提交
1213
    qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
D
dapan1121 已提交
1214
    QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
D
dapan1121 已提交
1215 1216
  }

D
dapan1121 已提交
1217
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1218
}
D
dapan1121 已提交
1219

1220

D
dapan1121 已提交
1221
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
D
dapan1121 已提交
1222
  int32_t code = 0;
D
dapan1121 已提交
1223
  bool rsped = false;
D
dapan1121 已提交
1224 1225 1226
  SQWTaskCtx *ctx = NULL;
  bool locked = false;

D
dapan1121 已提交
1227 1228
  // TODO : TASK ALREADY REMOVED AND A NEW DROP MSG RECEIVED

D
dapan1121 已提交
1229
  QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
1230
  
D
dapan1121 已提交
1231 1232 1233 1234 1235
  QW_LOCK(QW_WRITE, &ctx->lock);

  locked = true;

  if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
1236
    QW_TASK_WLOG_E("task already dropping");
D
dapan1121 已提交
1237 1238 1239
    QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
  }

D
dapan1121 已提交
1240
  if (QW_IS_QUERY_RUNNING(ctx)) {
D
dapan1121 已提交
1241
    QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
D
dapan1121 已提交
1242
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING);
D
dapan1121 已提交
1243
  } else if (ctx->phase > 0) {
D
dapan1121 已提交
1244 1245 1246 1247
    if (0 == qwMsg->code) {
      qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
      QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
    }
1248

D
dapan1121 已提交
1249
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
1250
    rsped = true;
D
dapan1121 已提交
1251 1252
  } else {
    // task not started
D
dapan1121 已提交
1253
  }
D
dapan1121 已提交
1254

D
dapan1121 已提交
1255
  if (!rsped) {
D
dapan1121 已提交
1256 1257
    ctx->ctrlConnInfo.handle = qwMsg->connInfo.handle;
    ctx->ctrlConnInfo.ahandle = qwMsg->connInfo.ahandle;
D
dapan 已提交
1258
    
D
dapan1121 已提交
1259 1260
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
  }
1261

D
dapan1121 已提交
1262
_return:
D
dapan1121 已提交
1263

D
dapan1121 已提交
1264
  if (code) {
D
dapan1121 已提交
1265 1266 1267
    if (ctx) {
      QW_UPDATE_RSP_CODE(ctx, code);
    }
H
Haojun Liao 已提交
1268

D
dapan1121 已提交
1269
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
D
dapan1121 已提交
1270 1271
  }

D
dapan 已提交
1272 1273 1274 1275
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

D
dapan1121 已提交
1276
  if (ctx) {
D
dapan1121 已提交
1277
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
1278 1279
  }

D
dapan1121 已提交
1280
  if ((TSDB_CODE_SUCCESS != code) && (0 == qwMsg->code)) {
S
shm  
Shengliang Guan 已提交
1281
    qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
D
dapan1121 已提交
1282
    QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
1283
  }
D
dapan1121 已提交
1284

D
dapan1121 已提交
1285
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1286
}
D
dapan1121 已提交
1287

D
dapan1121 已提交
1288 1289 1290 1291 1292
int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
  int32_t code = 0;
  SSchedulerHbRsp rsp = {0};
  SQWSchStatus *sch = NULL;
  uint64_t seqId = 0;
D
dapan1121 已提交
1293
  void *origHandle = NULL;
D
dapan1121 已提交
1294 1295 1296 1297 1298

  memcpy(&rsp.epId, &req->epId, sizeof(req->epId));
  
  QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));

D
dapan1121 已提交
1299
  QW_LOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
1300

D
dapan1121 已提交
1301
  if (sch->hbConnInfo.handle) {
S
shm  
Shengliang Guan 已提交
1302
    tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER);
D
dapan1121 已提交
1303
  }
D
dapan1121 已提交
1304
  
D
dapan1121 已提交
1305
  memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
D
dapan1121 已提交
1306
  memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId));
D
dapan1121 已提交
1307
  
D
dapan1121 已提交
1308
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
1309 1310 1311
  
  QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p",
    req->sId, req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle);
D
dapan1121 已提交
1312

D
dapan1121 已提交
1313 1314 1315 1316
  qwReleaseScheduler(QW_READ, mgmt);

_return:

S
shm  
Shengliang Guan 已提交
1317
  qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
D
dapan1121 已提交
1318
  QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
1319
  
D
dapan1121 已提交
1320
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1321 1322 1323 1324
}


void qwProcessHbTimerEvent(void *param, void *tmrId) {
D
dapan1121 已提交
1325 1326 1327
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)param;
  SQWSchStatus *sch = NULL;
  int32_t taskNum = 0;
D
dapan1121 已提交
1328
  SQWHbInfo *rspList = NULL;
D
dapan1121 已提交
1329 1330
  int32_t code = 0;

D
dapan1121 已提交
1331 1332
  qwDbgDumpMgmtInfo(mgmt);

D
dapan1121 已提交
1333 1334 1335 1336 1337
  QW_LOCK(QW_READ, &mgmt->schLock);

  int32_t schNum = taosHashGetSize(mgmt->schHash);
  if (schNum <= 0) {
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
1338 1339
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
    return;
D
dapan1121 已提交
1340 1341
  }

wafwerar's avatar
wafwerar 已提交
1342
  rspList = taosMemoryCalloc(schNum, sizeof(SQWHbInfo));
D
dapan1121 已提交
1343 1344
  if (NULL == rspList) {
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
1345 1346 1347
    QW_ELOG("calloc %d SQWHbInfo failed", schNum);
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
    return;
D
dapan1121 已提交
1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370
  }

  void *key = NULL;
  size_t keyLen = 0;
  int32_t i = 0;

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
    code = qwGenerateSchHbRsp(mgmt, (SQWSchStatus *)pIter, &rspList[i]);
    if (code) {
      taosHashCancelIterate(mgmt->schHash, pIter);
      QW_ERR_JRET(code);
    }

    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

_return:

  QW_UNLOCK(QW_READ, &mgmt->schLock);

  for (int32_t j = 0; j < i; ++j) {
S
shm  
Shengliang Guan 已提交
1371
    qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code);
1372 1373
    QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),
            (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));
D
dapan1121 已提交
1374
    tFreeSSchedulerHbRsp(&rspList[j].rsp);
D
dapan1121 已提交
1375 1376
  }

wafwerar's avatar
wafwerar 已提交
1377
  taosMemoryFreeClear(rspList);
D
dapan1121 已提交
1378

D
dapan1121 已提交
1379
  taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);  
D
dapan1121 已提交
1380 1381
}

S
Shengliang Guan 已提交
1382 1383
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
  if (NULL == qWorkerMgmt || pMsgCb->pWrapper == NULL) {
D
dapan1121 已提交
1384 1385 1386
    qError("invalid param to init qworker");
    QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
S
Shengliang 已提交
1387

D
dapan1121 已提交
1388
  int32_t code = 0;
wafwerar's avatar
wafwerar 已提交
1389
  SQWorkerMgmt *mgmt = taosMemoryCalloc(1, sizeof(SQWorkerMgmt));
D
dapan1121 已提交
1390 1391
  if (NULL == mgmt) {
    qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt));
D
dapan1121 已提交
1392
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1393 1394 1395 1396
  }

  if (cfg) {
    mgmt->cfg = *cfg;
D
dapan1121 已提交
1397
    if (0 == mgmt->cfg.maxSchedulerNum) {
D
dapan1121 已提交
1398
      mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
D
dapan1121 已提交
1399 1400
    }
    if (0 == mgmt->cfg.maxTaskNum) {
D
dapan1121 已提交
1401
      mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
D
dapan1121 已提交
1402 1403
    }
    if (0 == mgmt->cfg.maxSchTaskNum) {
D
dapan1121 已提交
1404
      mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
1405
    }
D
dapan1121 已提交
1406
  } else {
D
dapan1121 已提交
1407 1408 1409
    mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
    mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
    mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
1410 1411
  }

1412
  mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1413
  if (NULL == mgmt->schHash) {
wafwerar's avatar
wafwerar 已提交
1414
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1415
    qError("init %d scheduler hash failed", mgmt->cfg.maxSchedulerNum);
D
dapan1121 已提交
1416
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1417 1418
  }

1419
  mgmt->ctxHash = taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1420
  if (NULL == mgmt->ctxHash) {
D
dapan1121 已提交
1421
    qError("init %d task ctx hash failed", mgmt->cfg.maxTaskNum);
D
dapan1121 已提交
1422 1423 1424 1425 1426 1427 1428 1429 1430
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  mgmt->timer = taosTmrInit(0, 0, 0, "qworker");
  if (NULL == mgmt->timer) {
    qError("init timer failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1431
  mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, mgmt, mgmt->timer);
D
dapan1121 已提交
1432 1433 1434
  if (NULL == mgmt->hbTimer) {
    qError("start hb timer failed");
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1435 1436
  }

D
dapan1121 已提交
1437 1438
  mgmt->nodeType = nodeType;
  mgmt->nodeId = nodeId;
S
Shengliang Guan 已提交
1439
  mgmt->msgCb = *pMsgCb;
D
dapan1121 已提交
1440

D
dapan1121 已提交
1441 1442
  *qWorkerMgmt = mgmt;

D
dapan1121 已提交
1443 1444
  qDebug("qworker initialized for node, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt);

D
dapan1121 已提交
1445
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1446 1447 1448 1449 1450 1451 1452 1453

_return:

  taosHashCleanup(mgmt->schHash);
  taosHashCleanup(mgmt->ctxHash);

  taosTmrCleanUp(mgmt->timer);
  
wafwerar's avatar
wafwerar 已提交
1454
  taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1455 1456

  QW_RET(code);
D
dapan1121 已提交
1457
}
D
dapan1121 已提交
1458 1459 1460 1461

void qWorkerDestroy(void **qWorkerMgmt) {
  if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
    return;
D
dapan1121 已提交
1462
  }
D
dapan 已提交
1463

D
dapan1121 已提交
1464
  SQWorkerMgmt *mgmt = *qWorkerMgmt;
D
dapan1121 已提交
1465 1466 1467

  taosTmrStopA(&mgmt->hbTimer);
  taosTmrCleanUp(mgmt->timer);
D
dapan1121 已提交
1468
  
D
dapan1121 已提交
1469
  //TODO STOP ALL QUERY
D
dapan1121 已提交
1470

D
dapan1121 已提交
1471
  //TODO FREE ALL
D
dapan1121 已提交
1472

wafwerar's avatar
wafwerar 已提交
1473
  taosMemoryFreeClear(*qWorkerMgmt);
D
dapan1121 已提交
1474
}
D
dapan1121 已提交
1475

D
dapan1121 已提交
1476
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) {
D
dapan1121 已提交
1477
/*
D
dapan1121 已提交
1478 1479
  SQWSchStatus *sch = NULL;
  int32_t taskNum = 0;
1480

D
dapan1121 已提交
1481
  QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch));
D
dapan1121 已提交
1482 1483
  
  sch->lastAccessTs = taosGetTimestampSec();
1484

D
dapan1121 已提交
1485 1486 1487 1488 1489
  QW_LOCK(QW_READ, &sch->tasksLock);
  
  taskNum = taosHashGetSize(sch->tasksHash);
  
  int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum;
wafwerar's avatar
wafwerar 已提交
1490
  *rsp = taosMemoryCalloc(1, size);
D
dapan1121 已提交
1491
  if (NULL == *rsp) {
D
dapan1121 已提交
1492
    QW_SCH_ELOG("calloc %d failed", size);
D
dapan1121 已提交
1493 1494 1495 1496 1497
    QW_UNLOCK(QW_READ, &sch->tasksLock);
    qwReleaseScheduler(QW_READ, mgmt);
    
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
1498

D
dapan1121 已提交
1499 1500 1501
  void *key = NULL;
  size_t keyLen = 0;
  int32_t i = 0;
1502

D
dapan1121 已提交
1503 1504 1505 1506
  void *pIter = taosHashIterate(sch->tasksHash, NULL);
  while (pIter) {
    SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
    taosHashGetKey(pIter, &key, &keyLen);
D
dapan1121 已提交
1507

D
dapan1121 已提交
1508 1509 1510
    QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId);
    (*rsp)->status[i].status = taskStatus->status;
    
D
dapan1121 已提交
1511
    ++i;
D
dapan1121 已提交
1512
    pIter = taosHashIterate(sch->tasksHash, pIter);
D
dapan1121 已提交
1513
  }  
D
dapan1121 已提交
1514

D
dapan1121 已提交
1515 1516 1517 1518
  QW_UNLOCK(QW_READ, &sch->tasksLock);
  qwReleaseScheduler(QW_READ, mgmt);

  (*rsp)->num = taskNum;
D
dapan1121 已提交
1519
*/
D
dapan1121 已提交
1520 1521 1522 1523
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
1524

D
dapan1121 已提交
1525 1526
int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
  SQWSchStatus *sch = NULL;
D
dapan1121 已提交
1527

D
dapan1121 已提交
1528
/*
D
dapan1121 已提交
1529
  QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
D
dapan1121 已提交
1530

D
dapan1121 已提交
1531
  sch->lastAccessTs = taosGetTimestampSec();
D
dapan1121 已提交
1532

D
dapan1121 已提交
1533
  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
1534
*/
D
dapan1121 已提交
1535 1536 1537 1538
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
1539 1540 1541 1542
int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t *taskStatus) {
  SQWSchStatus *sch = NULL;
  SQWTaskStatus *task = NULL;
  int32_t code = 0;
D
dapan1121 已提交
1543 1544

/*  
D
dapan1121 已提交
1545 1546 1547 1548
  if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch)) {
    *taskStatus = JOB_TASK_STATUS_NULL;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1549

D
dapan1121 已提交
1550 1551 1552 1553 1554 1555
  if (qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)) {
    qwReleaseScheduler(QW_READ, mgmt);
    
    *taskStatus = JOB_TASK_STATUS_NULL;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1556

D
dapan1121 已提交
1557
  *taskStatus = task->status;
D
dapan1121 已提交
1558

D
dapan1121 已提交
1559 1560
  qwReleaseTask(QW_READ, sch);
  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
1561
*/
D
dapan1121 已提交
1562 1563 1564 1565 1566

  QW_RET(code);
}


D
dapan1121 已提交
1567 1568 1569
int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
  SQWSchStatus *sch = NULL;
  SQWTaskStatus *task = NULL;
D
dapan1121 已提交
1570
  int32_t code = 0;
D
dapan1121 已提交
1571

D
dapan1121 已提交
1572
/*
D
dapan1121 已提交
1573
  QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch));
D
dapan1121 已提交
1574

D
dapan1121 已提交
1575
  QW_ERR_JRET(qwAcquireAddTask(mgmt, QW_READ, sch, qId, tId, JOB_TASK_STATUS_NOT_START, &task));
D
dapan1121 已提交
1576

D
dapan1121 已提交
1577

D
dapan1121 已提交
1578
  QW_LOCK(QW_WRITE, &task->lock);
D
dapan1121 已提交
1579

D
dapan1121 已提交
1580 1581 1582 1583 1584 1585 1586
  task->cancel = true;
  
  int8_t oriStatus = task->status;
  int8_t newStatus = 0;
  
  if (task->status == JOB_TASK_STATUS_CANCELLED || task->status == JOB_TASK_STATUS_NOT_START || task->status == JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING) {
    QW_UNLOCK(QW_WRITE, &task->lock);
D
dapan1121 已提交
1587

D
dapan1121 已提交
1588 1589 1590 1591 1592 1593 1594 1595
    qwReleaseTask(QW_READ, sch);
    qwReleaseScheduler(QW_READ, mgmt);
    
    return TSDB_CODE_SUCCESS;
  } else if (task->status == JOB_TASK_STATUS_FAILED || task->status == JOB_TASK_STATUS_SUCCEED || task->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
    QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
  } else {
    QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLING));
D
dapan1121 已提交
1596 1597
  }

D
dapan1121 已提交
1598 1599 1600 1601
  QW_UNLOCK(QW_WRITE, &task->lock);
  
  qwReleaseTask(QW_READ, sch);
  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
1602

D
dapan1121 已提交
1603 1604 1605 1606 1607
  if (oriStatus == JOB_TASK_STATUS_EXECUTING) {
    //TODO call executer to cancel subquery async
  }
  
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1608 1609

_return:
D
dapan1121 已提交
1610

D
dapan1121 已提交
1611 1612 1613 1614 1615 1616 1617 1618 1619
  if (task) {
    QW_UNLOCK(QW_WRITE, &task->lock);
    
    qwReleaseTask(QW_READ, sch);
  }

  if (sch) {
    qwReleaseScheduler(QW_READ, mgmt);
  }
D
dapan1121 已提交
1620
*/
D
dapan1121 已提交
1621

D
dapan1121 已提交
1622
  QW_RET(code);
D
dapan1121 已提交
1623 1624
}

D
dapan1121 已提交
1625