qworker.c 44.0 KB
Newer Older
D
dapan1121 已提交
1
#include "qworker.h"
S
common  
Shengliang Guan 已提交
2
#include "tcommon.h"
3
#include "executor.h"
D
dapan1121 已提交
4
#include "planner.h"
H
Haojun Liao 已提交
5 6
#include "query.h"
#include "qworkerInt.h"
D
dapan1121 已提交
7
#include "qworkerMsg.h"
H
Haojun Liao 已提交
8
#include "tmsg.h"
9
#include "tname.h"
D
dapan1121 已提交
10
#include "dataSinkMgt.h"
D
dapan1121 已提交
11

D
dapan1121 已提交
12
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true};
13

D
dapan1121 已提交
14 15 16
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
  if (!gQWDebug.statusEnable) {
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
17
  }
18

D
dapan1121 已提交
19
  int32_t code = 0;
D
dapan1121 已提交
20

D
dapan1121 已提交
21
  if (oriStatus == newStatus) {
D
dapan1121 已提交
22 23 24 25
    if (newStatus == JOB_TASK_STATUS_EXECUTING || newStatus == JOB_TASK_STATUS_FAILED) {
      *ignore = true;
      return TSDB_CODE_SUCCESS;
    }
H
Haojun Liao 已提交
26

D
dapan1121 已提交
27 28 29 30 31
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }
  
  switch (oriStatus) {
    case JOB_TASK_STATUS_NULL:
D
dapan1121 已提交
32 33 34
      if (newStatus != JOB_TASK_STATUS_EXECUTING 
       && newStatus != JOB_TASK_STATUS_FAILED 
       && newStatus != JOB_TASK_STATUS_NOT_START) {
D
dapan1121 已提交
35 36 37 38 39
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      
      break;
    case JOB_TASK_STATUS_NOT_START:
D
dapan1121 已提交
40
      if (newStatus != JOB_TASK_STATUS_CANCELLED) {
D
dapan1121 已提交
41 42 43 44 45
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      
      break;
    case JOB_TASK_STATUS_EXECUTING:
D
dapan1121 已提交
46
      if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED 
D
dapan1121 已提交
47
       && newStatus != JOB_TASK_STATUS_SUCCEED 
D
dapan1121 已提交
48 49 50 51
       && newStatus != JOB_TASK_STATUS_FAILED 
       && newStatus != JOB_TASK_STATUS_CANCELLING 
       && newStatus != JOB_TASK_STATUS_CANCELLED 
       && newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
52 53 54 55 56
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      
      break;
    case JOB_TASK_STATUS_PARTIAL_SUCCEED:
D
dapan1121 已提交
57 58
      if (newStatus != JOB_TASK_STATUS_EXECUTING 
       && newStatus != JOB_TASK_STATUS_SUCCEED
D
dapan1121 已提交
59 60 61
       && newStatus != JOB_TASK_STATUS_CANCELLED
       && newStatus != JOB_TASK_STATUS_FAILED
       && newStatus != JOB_TASK_STATUS_DROPPING) {
D
dapan1121 已提交
62 63 64 65 66
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      
      break;
    case JOB_TASK_STATUS_SUCCEED:
D
dapan1121 已提交
67
      if (newStatus != JOB_TASK_STATUS_CANCELLED
D
dapan1121 已提交
68 69
       && newStatus != JOB_TASK_STATUS_DROPPING
       && newStatus != JOB_TASK_STATUS_FAILED) {
D
dapan1121 已提交
70 71 72 73
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
74
    case JOB_TASK_STATUS_FAILED:
D
dapan1121 已提交
75 76 77 78
      if (newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING) {
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      break;
H
Haojun Liao 已提交
79

D
dapan1121 已提交
80 81 82 83 84 85 86
    case JOB_TASK_STATUS_CANCELLING:
      if (newStatus != JOB_TASK_STATUS_CANCELLED) {
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      
      break;
    case JOB_TASK_STATUS_CANCELLED:
D
dapan1121 已提交
87
    case JOB_TASK_STATUS_DROPPING:
D
dapan1121 已提交
88 89 90
      if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
D
dapan1121 已提交
91 92
      break;
      
D
dapan1121 已提交
93
    default:
D
dapan1121 已提交
94
      QW_TASK_ELOG("invalid task origStatus:%s", jobTaskStatusStr(oriStatus));
D
dapan1121 已提交
95 96 97
      return TSDB_CODE_QRY_APP_ERROR;
  }

D
dapan1121 已提交
98
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
99

D
dapan1121 已提交
100
_return:
D
dapan1121 已提交
101

D
dapan1121 已提交
102
  QW_TASK_ELOG("invalid task status update from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
D
dapan1121 已提交
103
  QW_RET(code);
D
dapan1121 已提交
104 105
}

D
dapan1121 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
void qwDbgDumpSchInfo(SQWSchStatus *sch, int32_t i) {

}

void qwDbgDumpMgmtInfo(SQWorkerMgmt *mgmt) {
  if (!gQWDebug.dumpEnable) {
    return;
  }

  QW_LOCK(QW_READ, &mgmt->schLock);
  
  QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash));

  void *key = NULL;
  size_t keyLen = 0;
  int32_t i = 0;
  SQWSchStatus *sch = NULL;

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
    sch = (SQWSchStatus *)pIter;
    qwDbgDumpSchInfo(sch, i);
    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

  QW_UNLOCK(QW_READ, &mgmt->schLock);

  QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash));
}
D
dapan1121 已提交
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172

char *qwPhaseStr(int32_t phase) {
  switch (phase) {
    case QW_PHASE_PRE_QUERY:
      return "PRE_QUERY";
    case QW_PHASE_POST_QUERY:
      return "POST_QUERY";
    case QW_PHASE_PRE_FETCH:
      return "PRE_FETCH";
    case QW_PHASE_POST_FETCH:
      return "POST_FETCH";
    case QW_PHASE_PRE_CQUERY:
      return "PRE_CQUERY";
    case QW_PHASE_POST_CQUERY:
      return "POST_CQUERY";
    default:
      break;
  }

  return "UNKNOWN";
}

char *qwBufStatusStr(int32_t bufStatus) {
  switch (bufStatus) {
    case DS_BUF_LOW:
      return "LOW";
    case DS_BUF_FULL:
      return "FULL";
    case DS_BUF_EMPTY:
      return "EMPTY";
    default:
      break;
  }

  return "UNKNOWN";
}

D
dapan1121 已提交
173
int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) {
D
dapan1121 已提交
174
  int32_t code = 0;
D
dapan1121 已提交
175
  int8_t origStatus = 0;
D
dapan1121 已提交
176
  bool ignore = false;
D
dapan1121 已提交
177 178 179 180

  while (true) {
    origStatus = atomic_load_8(&task->status);
    
D
dapan1121 已提交
181 182 183 184
    QW_ERR_RET(qwDbgValidateStatus(QW_FPARAMS(), origStatus, status, &ignore));
    if (ignore) {
      break;
    }
D
dapan1121 已提交
185 186 187
    
    if (origStatus != atomic_val_compare_exchange_8(&task->status, origStatus, status)) {
      continue;
D
dapan1121 已提交
188
    }
D
dapan1121 已提交
189
    
D
dapan1121 已提交
190
    QW_TASK_DLOG("task status updated from %s to %s", jobTaskStatusStr(origStatus), jobTaskStatusStr(status));
D
dapan1121 已提交
191 192

    break;
D
dapan1121 已提交
193
  }
D
dapan1121 已提交
194
  
D
dapan1121 已提交
195 196 197 198
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
199
int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType) {
D
dapan1121 已提交
200
  SQWSchStatus newSch = {0};
D
dapan1121 已提交
201 202
  newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (NULL == newSch.tasksHash) {
D
dapan1121 已提交
203 204
    QW_SCH_ELOG("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
205 206
  }

D
dapan1121 已提交
207 208 209 210 211
  QW_LOCK(QW_WRITE, &mgmt->schLock);
  int32_t code = taosHashPut(mgmt->schHash, &sId, sizeof(sId), &newSch, sizeof(newSch));
  if (0 != code) {
    if (!HASH_NODE_EXIST(code)) {
      QW_UNLOCK(QW_WRITE, &mgmt->schLock);
D
dapan1121 已提交
212
      
D
dapan1121 已提交
213 214 215
      QW_SCH_ELOG("taosHashPut new sch to scheduleHash failed, errno:%d", errno);
      taosHashCleanup(newSch.tasksHash);
      QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
216
    }
D
dapan1121 已提交
217 218

    taosHashCleanup(newSch.tasksHash);
D
dapan1121 已提交
219
  }
D
dapan1121 已提交
220
  QW_UNLOCK(QW_WRITE, &mgmt->schLock);
D
dapan1121 已提交
221

D
dapan1121 已提交
222
  return TSDB_CODE_SUCCESS;  
D
dapan1121 已提交
223 224
}

D
dapan1121 已提交
225
int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
D
dapan1121 已提交
226 227 228 229 230 231 232
  while (true) {
    QW_LOCK(rwType, &mgmt->schLock);
    *sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
    if (NULL == (*sch)) {
      QW_UNLOCK(rwType, &mgmt->schLock);
      
      if (QW_NOT_EXIST_ADD == nOpt) {
D
dapan1121 已提交
233
        QW_ERR_RET(qwAddSchedulerImpl(mgmt, sId, rwType));
D
dapan1121 已提交
234 235 236 237 238 239 240

        nOpt = QW_NOT_EXIST_RET_ERR;
        
        continue;
      } else if (QW_NOT_EXIST_RET_ERR == nOpt) {
        QW_RET(TSDB_CODE_QRY_SCH_NOT_EXIST);
      } else {
D
dapan1121 已提交
241
        QW_SCH_ELOG("unknown notExistOpt:%d", nOpt);
D
dapan1121 已提交
242
        QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
D
dapan1121 已提交
243
      }
D
dapan1121 已提交
244
    }
D
dapan1121 已提交
245 246

    break;
D
dapan1121 已提交
247 248 249 250 251
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
252 253
int32_t qwAcquireAddScheduler(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
  return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_ADD);
D
dapan1121 已提交
254 255
}

D
dapan1121 已提交
256 257
int32_t qwAcquireScheduler(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
  return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_RET_ERR);
D
dapan1121 已提交
258 259
}

D
dapan1121 已提交
260
void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) {
D
dapan1121 已提交
261 262 263
  QW_UNLOCK(rwType, &mgmt->schLock);
}

D
dapan1121 已提交
264

D
dapan1121 已提交
265
int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) {
D
dapan1121 已提交
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);

  QW_LOCK(rwType, &sch->tasksLock);
  *task = taosHashGet(sch->tasksHash, id, sizeof(id));
  if (NULL == (*task)) {
    QW_UNLOCK(rwType, &sch->tasksLock);
    QW_ERR_RET(TSDB_CODE_QRY_TASK_NOT_EXIST);
  }

  return TSDB_CODE_SUCCESS;
}



D
dapan1121 已提交
281
int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) {
D
dapan1121 已提交
282 283
  int32_t code = 0;

D
dapan1121 已提交
284 285
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
D
dapan1121 已提交
286

D
dapan1121 已提交
287 288
  SQWTaskStatus ntask = {0};
  ntask.status = status;
D
dapan1121 已提交
289
  ntask.refId = rId;
D
dapan1121 已提交
290 291 292 293 294 295

  QW_LOCK(QW_WRITE, &sch->tasksLock);
  code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask));
  if (0 != code) {
    QW_UNLOCK(QW_WRITE, &sch->tasksLock);
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
296 297
      if (rwType && task) {
        QW_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
D
dapan1121 已提交
298
      } else {
D
dapan1121 已提交
299
        QW_TASK_ELOG("task status already exist, newStatus:%s", jobTaskStatusStr(status));
D
dapan1121 已提交
300
        QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
D
dapan1121 已提交
301 302
      }
    } else {
D
dapan1121 已提交
303
      QW_TASK_ELOG("taosHashPut to tasksHash failed, error:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
304
      QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
305 306 307
    }
  }
  QW_UNLOCK(QW_WRITE, &sch->tasksLock);
D
dapan1121 已提交
308

D
dapan1121 已提交
309 310
  QW_TASK_DLOG("task status added, newStatus:%s", jobTaskStatusStr(status));

D
dapan1121 已提交
311 312
  if (rwType && task) {
    QW_ERR_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
D
dapan1121 已提交
313 314
  }

D
dapan1121 已提交
315 316
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
317

D
dapan1121 已提交
318
int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status) {
D
dapan1121 已提交
319 320
  SQWSchStatus *tsch = NULL;
  int32_t code = 0;
D
dapan1121 已提交
321
  QW_ERR_RET(qwAcquireAddScheduler(mgmt, sId, QW_READ, &tsch));
D
dapan1121 已提交
322

D
dapan1121 已提交
323
  QW_ERR_JRET(qwAddTaskStatusImpl(QW_FPARAMS(), tsch, 0, status, NULL));
D
dapan1121 已提交
324 325 326 327

_return:

  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
328 329
  
  QW_RET(code);
D
dapan1121 已提交
330 331 332
}


D
dapan1121 已提交
333
int32_t qwAddAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, int32_t status, SQWTaskStatus **task) {
D
dapan1121 已提交
334
  return qwAddTaskStatusImpl(QW_FPARAMS(), sch, rwType, status, task);
D
dapan1121 已提交
335
}
D
dapan1121 已提交
336 337


D
dapan1121 已提交
338
void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) {
D
dapan1121 已提交
339
  QW_UNLOCK(rwType, &sch->tasksLock);
D
dapan1121 已提交
340 341
}

D
dapan1121 已提交
342

D
dapan1121 已提交
343
int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
D
dapan1121 已提交
344 345
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
H
Haojun Liao 已提交
346

D
dapan1121 已提交
347
  *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
D
dapan1121 已提交
348
  if (NULL == (*ctx)) {
D
dapan1121 已提交
349
    QW_TASK_DLOG_E("task ctx not exist, may be dropped");
D
dapan1121 已提交
350
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
351 352 353 354 355
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
356
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
D
dapan1121 已提交
357 358 359 360 361
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
  
  *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
  if (NULL == (*ctx)) {
D
dapan1121 已提交
362
    QW_TASK_DLOG_E("task ctx not exist, may be dropped");
D
dapan1121 已提交
363
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
364 365 366 367 368
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
369
int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) {
D
dapan1121 已提交
370 371 372
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);

D
dapan1121 已提交
373
  SQWTaskCtx nctx = {0};
D
dapan1121 已提交
374

D
dapan1121 已提交
375
  int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &nctx, sizeof(SQWTaskCtx));
D
dapan1121 已提交
376 377
  if (0 != code) {
    if (HASH_NODE_EXIST(code)) {
D
dapan1121 已提交
378
      if (acquire && ctx) {
D
dapan1121 已提交
379
        QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
380 381
      } else if (ctx) {
        QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
382
      } else {
D
dapan1121 已提交
383
        QW_TASK_ELOG_E("task ctx already exist");
D
dapan1121 已提交
384 385 386
        QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
      }
    } else {
D
dapan1121 已提交
387
      QW_TASK_ELOG("taosHashPut to ctxHash failed, error:%x", code);
D
dapan1121 已提交
388 389 390 391
      QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
  }

D
dapan1121 已提交
392
  if (acquire && ctx) {
D
dapan1121 已提交
393
    QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
394 395
  } else if (ctx) {
    QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
D
dapan1121 已提交
396 397 398 399 400
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
401
int32_t qwAddTaskCtx(QW_FPARAMS_DEF) {
D
dapan1121 已提交
402
  QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, NULL));
D
dapan1121 已提交
403 404
}

D
dapan1121 已提交
405
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
D
dapan1121 已提交
406
  return qwAddTaskCtxImpl(QW_FPARAMS(), true, ctx);
D
dapan1121 已提交
407 408
}

D
dapan1121 已提交
409 410
void qwReleaseTaskCtx(SQWorkerMgmt *mgmt, void *ctx) {
  taosHashRelease(mgmt->ctxHash, ctx);
D
dapan1121 已提交
411 412
}

D
dapan1121 已提交
413
void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) {  
D
dapan1121 已提交
414
  // Note: free/kill may in RC
D
dapan1121 已提交
415 416 417
  qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
  if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
    qDestroyTask(otaskHandle);
D
dapan1121 已提交
418 419
  }
}
D
dapan1121 已提交
420

D
dapan1121 已提交
421 422
int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
  int32_t code = 0;
D
dapan1121 已提交
423
  // Note: free/kill may in RC
D
dapan1121 已提交
424 425
  qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
  if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
D
dapan1121 已提交
426
    code = qAsyncKillTask(taskHandle);
D
dapan1121 已提交
427
    atomic_store_ptr(&ctx->taskHandle, taskHandle);
D
dapan1121 已提交
428
  }
D
dapan1121 已提交
429

D
dapan1121 已提交
430 431 432
  QW_RET(code);
}

D
dapan1121 已提交
433

D
dapan1121 已提交
434
void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
D
dapan1121 已提交
435 436 437 438
  tmsgReleaseHandle(ctx->ctrlConnInfo.handle, TAOS_CONN_SERVER);
  ctx->ctrlConnInfo.handle = NULL;

  // NO need to release dataConnInfo
D
dapan1121 已提交
439

D
dapan1121 已提交
440
  qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle);
D
dapan1121 已提交
441 442 443 444
  
  if (ctx->sinkHandle) {
    dsDestroyDataSinker(ctx->sinkHandle);
    ctx->sinkHandle = NULL;
D
dapan1121 已提交
445
  }
D
dapan1121 已提交
446
}
D
dapan1121 已提交
447 448


D
dapan1121 已提交
449
int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
D
dapan1121 已提交
450 451 452
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
  SQWTaskCtx octx;
D
dapan1121 已提交
453

D
dapan1121 已提交
454 455
  SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
  if (NULL == ctx) {
D
dapan1121 已提交
456
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
457
  }
D
dapan1121 已提交
458

D
dapan1121 已提交
459
  octx = *ctx;
D
dapan1121 已提交
460

D
dapan1121 已提交
461 462 463
  atomic_store_ptr(&ctx->taskHandle, NULL);
  atomic_store_ptr(&ctx->sinkHandle, NULL);

D
dapan1121 已提交
464 465
  QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP);

D
dapan1121 已提交
466
  if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
D
dapan1121 已提交
467
    QW_TASK_ELOG_E("taosHashRemove from ctx hash failed");    
D
dapan1121 已提交
468
    QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
D
dapan1121 已提交
469 470
  }

D
dapan1121 已提交
471
  qwFreeTask(QW_FPARAMS(), &octx);
D
dapan1121 已提交
472 473

  QW_TASK_DLOG_E("task ctx dropped");
D
dapan1121 已提交
474
  
D
dapan1121 已提交
475 476 477
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
478
int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
D
dapan1121 已提交
479
  SQWSchStatus *sch = NULL;
D
dapan1121 已提交
480 481 482 483 484
  SQWTaskStatus *task = NULL;
  int32_t code = 0;
  
  char id[sizeof(qId) + sizeof(tId)] = {0};
  QW_SET_QTID(id, qId, tId);
D
dapan1121 已提交
485

D
dapan1121 已提交
486
  if (qwAcquireScheduler(mgmt, sId, QW_WRITE, &sch)) {
D
dapan1121 已提交
487
    QW_TASK_WLOG_E("scheduler does not exist");
D
dapan1121 已提交
488 489
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
490

D
dapan1121 已提交
491 492 493
  if (qwAcquireTaskStatus(QW_FPARAMS(), QW_WRITE, sch, &task)) {
    qwReleaseScheduler(QW_WRITE, mgmt);
    
D
dapan1121 已提交
494
    QW_TASK_WLOG_E("task does not exist");
D
dapan1121 已提交
495 496
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
497

D
dapan1121 已提交
498
  if (taosHashRemove(sch->tasksHash, id, sizeof(id))) {
D
dapan1121 已提交
499
    QW_TASK_ELOG_E("taosHashRemove task from hash failed");
D
dapan1121 已提交
500 501 502
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

D
dapan1121 已提交
503
  QW_TASK_DLOG_E("task status dropped");
D
dapan1121 已提交
504 505 506

_return:

D
dapan1121 已提交
507 508 509
  if (task) {
    qwReleaseTaskStatus(QW_WRITE, sch);
  }
D
dapan1121 已提交
510 511 512 513 514
  qwReleaseScheduler(QW_WRITE, mgmt);
  
  QW_RET(code);
}

D
dapan1121 已提交
515
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) {
D
dapan1121 已提交
516 517
  SQWSchStatus *sch = NULL;
  SQWTaskStatus *task = NULL;
D
dapan1121 已提交
518 519
  int32_t code = 0;

D
dapan1121 已提交
520
  QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch));
D
dapan1121 已提交
521
  QW_ERR_JRET(qwAcquireTaskStatus(QW_FPARAMS(), QW_READ, sch, &task));
D
dapan1121 已提交
522

D
dapan1121 已提交
523
  QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status));
D
dapan1121 已提交
524
  
D
dapan1121 已提交
525 526
_return:

D
dapan1121 已提交
527 528 529
  if (task) {
    qwReleaseTaskStatus(QW_READ, sch);
  }
D
dapan1121 已提交
530
  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
531 532 533 534

  QW_RET(code);
}

D
dapan1121 已提交
535
int32_t qwDropTask(QW_FPARAMS_DEF) {
H
Haojun Liao 已提交
536
  QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS()));
D
dapan1121 已提交
537
  QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS()));
H
Haojun Liao 已提交
538

D
dapan1121 已提交
539 540 541
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
542 543 544 545 546 547 548 549 550 551

int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
  qTaskInfo_t *taskHandle = &ctx->taskHandle; 

  if (TASK_TYPE_TEMP == ctx->taskType) {
    if (ctx->explain) {
      SExplainExecInfo *execInfo = NULL;
      int32_t resNum = 0;
      QW_ERR_RET(qGetExplainExecInfo(ctx->taskHandle, &resNum, &execInfo));

D
dapan1121 已提交
552 553 554 555
      SQWConnInfo connInfo = {0};
      connInfo.handle = ctx->ctrlConnInfo.handle;
      
      QW_ERR_RET(qwBuildAndSendExplainRsp(&connInfo, execInfo, resNum));
D
dapan1121 已提交
556 557 558 559 560 561 562 563 564
    }
    
    qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
  }

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
565
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
D
dapan1121 已提交
566 567 568 569
  int32_t code = 0;
  bool  qcontinue = true;
  SSDataBlock* pRes = NULL;
  uint64_t useconds = 0;
D
dapan1121 已提交
570
  int32_t i = 0;
D
dapan1121 已提交
571 572 573
  int32_t execNum = 0;
  qTaskInfo_t *taskHandle = &ctx->taskHandle; 
  DataSinkHandle sinkHandle = ctx->sinkHandle;
D
dapan1121 已提交
574 575
 
  while (true) {
H
Haojun Liao 已提交
576
    QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
D
dapan1121 已提交
577

D
dapan1121 已提交
578
    code = qExecTask(*taskHandle, &pRes, &useconds);
D
dapan1121 已提交
579
    if (code) {
D
dapan1121 已提交
580 581
      QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
      QW_ERR_RET(code);
D
dapan1121 已提交
582 583
    }

D
dapan1121 已提交
584 585
    ++execNum;

D
dapan1121 已提交
586
    if (NULL == pRes) {
D
dapan1121 已提交
587
      QW_TASK_DLOG("qExecTask end with empty res, useconds:%"PRIu64, useconds);
H
Haojun Liao 已提交
588

D
dapan1121 已提交
589
      dsEndPut(sinkHandle, useconds);
D
dapan1121 已提交
590 591

      QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
D
dapan1121 已提交
592 593 594 595

      if (queryEnd) {
        *queryEnd = true;
      }
D
dapan1121 已提交
596
      
D
dapan1121 已提交
597 598 599
      break;
    }

D
dapan1121 已提交
600
    int32_t rows = pRes->info.rows;
H
Haojun Liao 已提交
601

602 603
    ASSERT(pRes->info.rows > 0);

H
Haojun Liao 已提交
604
    SInputData inputData = {.pData = pRes};
D
dapan1121 已提交
605 606
    code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
    if (code) {
D
dapan1121 已提交
607 608
      QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code));
      QW_ERR_RET(code);
D
dapan1121 已提交
609
    }
D
dapan1121 已提交
610

D
dapan1121 已提交
611
    QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", rows, qcontinue);
D
dapan1121 已提交
612
    
D
dapan1121 已提交
613 614 615 616
    if (!qcontinue) {
      break;
    }

D
dapan1121 已提交
617 618 619 620 621
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) {
      break;
    }

    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
622 623
      break;
    }
D
dapan1121 已提交
624

D
dapan1121 已提交
625 626 627
    if (atomic_load_32(&ctx->rspCode)) {
      break;
    }
D
dapan1121 已提交
628 629 630 631
  }

  QW_RET(code);
}
D
dapan1121 已提交
632

D
dapan1121 已提交
633
int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
D
dapan1121 已提交
634 635
  int32_t taskNum = 0;

D
dapan1121 已提交
636
  hbInfo->connInfo = sch->hbConnInfo;
D
dapan1121 已提交
637
  hbInfo->rsp.epId = sch->hbEpId;
D
dapan1121 已提交
638

D
dapan1121 已提交
639 640 641
  QW_LOCK(QW_READ, &sch->tasksLock);
  
  taskNum = taosHashGetSize(sch->tasksHash);
D
dapan1121 已提交
642 643 644

  hbInfo->rsp.taskStatus = taosArrayInit(taskNum, sizeof(STaskStatus));
  if (NULL == hbInfo->rsp.taskStatus) {
D
dapan1121 已提交
645
    QW_UNLOCK(QW_READ, &sch->tasksLock);
D
dapan1121 已提交
646
    QW_ELOG("taosArrayInit taskStatus failed, num:%d", taskNum);
D
dapan1121 已提交
647 648 649 650 651 652
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }

  void *key = NULL;
  size_t keyLen = 0;
  int32_t i = 0;
D
dapan1121 已提交
653
  STaskStatus status = {0};
D
dapan1121 已提交
654 655 656 657

  void *pIter = taosHashIterate(sch->tasksHash, NULL);
  while (pIter) {
    SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
D
dapan1121 已提交
658
    key = taosHashGetKey(pIter, &keyLen);
D
dapan1121 已提交
659 660 661

    //TODO GET EXECUTOR API TO GET MORE INFO

D
dapan1121 已提交
662 663 664 665 666
    QW_GET_QTID(key, status.queryId, status.taskId);
    status.status = taskStatus->status;
    status.refId = taskStatus->refId;
    
    taosArrayPush(hbInfo->rsp.taskStatus, &status);
D
dapan1121 已提交
667 668 669 670 671 672 673 674 675 676
    
    ++i;
    pIter = taosHashIterate(sch->tasksHash, pIter);
  }  

  QW_UNLOCK(QW_READ, &sch->tasksLock);

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
677 678

int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
D
dapan1121 已提交
679 680 681
  int32_t len = 0;
  SRetrieveTableRsp *rsp = NULL;
  bool queryEnd = false;
D
dapan1121 已提交
682 683
  int32_t code = 0;

D
dapan1121 已提交
684
  dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
D
dapan1121 已提交
685

D
dapan1121 已提交
686 687 688 689
  if (len < 0) {
    QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
D
dapan1121 已提交
690

D
dapan1121 已提交
691 692
  if (len == 0) {
    if (queryEnd) {
D
dapan 已提交
693
      code = dsGetDataBlock(ctx->sinkHandle, pOutput);
D
dapan1121 已提交
694
      if (code) {
D
dapan1121 已提交
695
        QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
696 697 698
        QW_ERR_RET(code);
      }
    
D
dapan1121 已提交
699
      QW_TASK_DLOG_E("no data in sink and query end");
H
Haojun Liao 已提交
700

D
dapan1121 已提交
701
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
L
Liu Jicong 已提交
702
      QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
703

D
dapan1121 已提交
704
      *rspMsg = rsp;
D
dapan 已提交
705
      *dataLen = 0;
D
dapan1121 已提交
706 707
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
708 709

    pOutput->bufStatus = DS_BUF_EMPTY;
D
dapan1121 已提交
710

D
dapan1121 已提交
711
    return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
712
  }
D
dapan1121 已提交
713

D
dapan1121 已提交
714
  // Got data from sink
D
dapan1121 已提交
715
  QW_TASK_DLOG("there are data in sink, dataLength:%d", len);
D
dapan1121 已提交
716

D
dapan 已提交
717
  *dataLen = len;
D
dapan1121 已提交
718
  
D
dapan1121 已提交
719
  QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
D
dapan 已提交
720
  *rspMsg = rsp;
D
dapan1121 已提交
721
  
D
dapan 已提交
722 723
  pOutput->pData = rsp->data;
  code = dsGetDataBlock(ctx->sinkHandle, pOutput);
D
dapan1121 已提交
724
  if (code) {
D
dapan1121 已提交
725
    QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
726 727
    QW_ERR_RET(code);
  }
D
dapan1121 已提交
728

D
dapan1121 已提交
729
  if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
D
dapan1121 已提交
730
    QW_TASK_DLOG_E("task all data fetched, done");
D
dapan1121 已提交
731
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
732 733
  }

D
dapan1121 已提交
734
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
735 736
}

D
dapan1121 已提交
737
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
D
dapan1121 已提交
738 739
  int32_t code = 0;
  SQWTaskCtx *ctx = NULL;
D
dapan1121 已提交
740 741
  SQWConnInfo *dropConnection = NULL;
  SQWConnInfo *cancelConnection = NULL;
D
dapan1121 已提交
742

D
dapan1121 已提交
743
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
744 745 746 747 748 749 750 751

  if (QW_PHASE_PRE_QUERY == phase) {
    QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
  } else {
    QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
  }
  
  QW_LOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
752

D
dapan1121 已提交
753
  if (QW_PHASE_PRE_FETCH == phase) {
X
Xiaoyu Wang 已提交
754
    atomic_store_8((int8_t*)&ctx->queryFetched, true);
D
dapan1121 已提交
755
  } else {
D
dapan1121 已提交
756 757
    atomic_store_8(&ctx->phase, phase);
  }
D
dapan1121 已提交
758

X
Xiaoyu Wang 已提交
759
  if (atomic_load_8((int8_t*)&ctx->queryEnd)) {
D
dapan1121 已提交
760 761 762
    QW_TASK_ELOG_E("query already end");
    QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
  }
D
dapan1121 已提交
763

D
dapan1121 已提交
764 765
  switch (phase) {
    case QW_PHASE_PRE_QUERY: {
D
dapan1121 已提交
766
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
H
Haojun Liao 已提交
767
        QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
768
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
D
dapan1121 已提交
769 770
        break;
      }
D
dapan1121 已提交
771

D
dapan1121 已提交
772
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
773
        dropConnection = &ctx->ctrlConnInfo;
D
dapan1121 已提交
774
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
775
        dropConnection = NULL;
776

D
dapan1121 已提交
777 778
        qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
779

D
dapan1121 已提交
780
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan 已提交
781
        break;
D
dapan1121 已提交
782
      }
D
dapan1121 已提交
783

D
dapan1121 已提交
784
      QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
D
dapan1121 已提交
785 786
      break;
    }
D
dapan1121 已提交
787
    case QW_PHASE_PRE_FETCH: {
D
dapan1121 已提交
788 789
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
        QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
790 791
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
      }
D
dapan1121 已提交
792

D
dapan1121 已提交
793
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
794
        QW_TASK_WLOG("last fetch still not processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
795 796 797 798
        QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
      }

      if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) {
D
dapan1121 已提交
799
        QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
800
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
D
dapan1121 已提交
801 802 803
      }
      break;
    }    
D
dapan1121 已提交
804 805
    case QW_PHASE_PRE_CQUERY: {
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
806
        QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
807
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
808
      }
D
dapan1121 已提交
809

D
dapan1121 已提交
810
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
811
        dropConnection = &ctx->ctrlConnInfo;
D
dapan1121 已提交
812
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
813
        dropConnection = NULL;
H
Haojun Liao 已提交
814

D
dapan1121 已提交
815 816
        qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
817

D
dapan1121 已提交
818
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
819
      }
D
dapan1121 已提交
820

D
dapan1121 已提交
821
      break;
D
dapan1121 已提交
822 823 824 825 826 827 828 829 830
    }
    default:
      QW_TASK_ELOG("invalid phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

  if (ctx->rspCode) {
    QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode));
    QW_ERR_JRET(ctx->rspCode);
D
dapan1121 已提交
831
  }
D
dapan1121 已提交
832

D
dapan1121 已提交
833
_return:
D
dapan1121 已提交
834

D
dapan1121 已提交
835
  if (ctx) {
D
dapan1121 已提交
836
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
837
    
D
dapan1121 已提交
838
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
839 840
    qwReleaseTaskCtx(mgmt, ctx);
  }
D
dapan1121 已提交
841

D
dapan1121 已提交
842
  if (dropConnection) {
S
shm  
Shengliang Guan 已提交
843
    qwBuildAndSendDropRsp(dropConnection, code);
D
dapan1121 已提交
844
    QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", dropConnection->handle, code, tstrerror(code));
D
dapan1121 已提交
845
  }
D
dapan1121 已提交
846

D
dapan1121 已提交
847
  if (cancelConnection) {
S
shm  
Shengliang Guan 已提交
848
    qwBuildAndSendCancelRsp(cancelConnection, code);
D
dapan1121 已提交
849
    QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code));
D
dapan1121 已提交
850 851
  }

D
dapan1121 已提交
852
  QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
D
dapan1121 已提交
853 854 855 856 857 858 859 860

  QW_RET(code);
}


int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
  int32_t code = 0;
  SQWTaskCtx *ctx = NULL;
D
dapan1121 已提交
861
  SQWConnInfo connInfo = {0};
D
dapan1121 已提交
862
  SQWConnInfo *readyConnection = NULL;
D
dapan1121 已提交
863

D
dapan1121 已提交
864
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
865 866 867 868 869 870
  
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
  
  QW_LOCK(QW_WRITE, &ctx->lock);

  if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
871
    QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
872 873 874 875
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
  }

  if (QW_PHASE_POST_QUERY == phase) {
D
dapan1121 已提交
876
#if 0    
D
dapan1121 已提交
877
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
D
dapan1121 已提交
878
      readyConnection = &ctx->connInfo;
D
dapan1121 已提交
879 880
      QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
    }
D
dapan1121 已提交
881
#else
D
dapan1121 已提交
882
    connInfo.handle = ctx->ctrlConnInfo.handle;
D
dapan1121 已提交
883 884 885 886
    readyConnection = &connInfo;
    
    QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
#endif
D
dapan1121 已提交
887 888
  }

D
dapan1121 已提交
889
  if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
890 891 892 893
    if (QW_PHASE_POST_FETCH == phase) {
      QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
D
dapan1121 已提交
894

D
dapan1121 已提交
895 896
    qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
    QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
897

D
dapan1121 已提交
898 899
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
900 901 902
  }

  if (ctx->rspCode) {
D
dapan1121 已提交
903 904
    QW_TASK_ELOG("task already failed, phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode));
    QW_ERR_JRET(ctx->rspCode);
D
dapan1121 已提交
905 906
  }      

D
dapan1121 已提交
907
  QW_ERR_JRET(input->code);
D
dapan1121 已提交
908 909 910

_return:

D
dapan1121 已提交
911 912 913 914
  if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED);
  }

D
dapan1121 已提交
915
  if (ctx) {
D
dapan1121 已提交
916
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
917

D
dapan1121 已提交
918 919 920
    if (QW_PHASE_POST_FETCH != phase) {
      atomic_store_8(&ctx->phase, phase);
    }
D
dapan1121 已提交
921
    
D
dapan1121 已提交
922
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
923
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
924 925
  }

D
dapan1121 已提交
926
  if (readyConnection) {
S
shm  
Shengliang Guan 已提交
927
    qwBuildAndSendReadyRsp(readyConnection, code);    
D
dapan1121 已提交
928
    QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code));
D
dapan1121 已提交
929 930
  }

D
dapan1121 已提交
931 932
  if (code) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
D
dapan1121 已提交
933 934
  }

D
dapan1121 已提交
935
  QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
D
dapan1121 已提交
936

D
dapan1121 已提交
937 938 939
  QW_RET(code);
}

D
dapan1121 已提交
940
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) {
D
dapan1121 已提交
941 942 943 944
  int32_t code = 0;
  bool queryRsped = false;
  struct SSubplan *plan = NULL;
  SQWPhaseInput input = {0};
D
dapan1121 已提交
945 946
  qTaskInfo_t pTaskInfo = NULL;
  DataSinkHandle sinkHandle = NULL;
D
dapan1121 已提交
947
  SQWTaskCtx *ctx = NULL;
D
dapan1121 已提交
948

D
dapan1121 已提交
949 950
  QW_ERR_JRET(qwRegisterBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));

D
dapan1121 已提交
951
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
D
dapan1121 已提交
952 953 954 955

  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
  
  atomic_store_8(&ctx->taskType, taskType);
D
dapan1121 已提交
956
  atomic_store_8(&ctx->explain, explain);
X
Xiaoyu Wang 已提交
957

D
dapan1121 已提交
958 959
  atomic_store_ptr(&ctx->ctrlConnInfo.handle, qwMsg->connInfo.handle);
  atomic_store_ptr(&ctx->ctrlConnInfo.ahandle, qwMsg->connInfo.ahandle);
D
dapan1121 已提交
960 961

  QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
X
Xiaoyu Wang 已提交
962

D
dapan1121 已提交
963 964
  code = qStringToSubplan(qwMsg->msg, &plan);
  if (TSDB_CODE_SUCCESS != code) {
D
dapan1121 已提交
965
    QW_TASK_ELOG("task string to subplan failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
966
    QW_ERR_JRET(code);
D
dapan1121 已提交
967
  }
D
dapan1121 已提交
968
  
969
  code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
970
  if (code) {
D
dapan1121 已提交
971
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
972
    QW_ERR_JRET(code);
D
dapan1121 已提交
973
  }
D
dapan1121 已提交
974

H
Haojun Liao 已提交
975
  if (NULL == sinkHandle || NULL == pTaskInfo) {
D
dapan1121 已提交
976 977 978 979
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

D
dapan1121 已提交
980 981
  //QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
  //QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
982

D
dapan1121 已提交
983
  //queryRsped = true;
D
dapan1121 已提交
984

D
dapan1121 已提交
985 986 987
  atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
  atomic_store_ptr(&ctx->sinkHandle, sinkHandle);

D
dapan1121 已提交
988
  if (pTaskInfo && sinkHandle) {
D
dapan1121 已提交
989
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
D
dapan1121 已提交
990
  }
D
dapan1121 已提交
991
  
D
dapan1121 已提交
992 993
_return:

D
dapan1121 已提交
994 995
  input.code = code;
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
996

D
dapan1121 已提交
997 998 999 1000
  //if (!queryRsped) {
  //  qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
  //  QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
  //}
D
dapan1121 已提交
1001

D
dapan1121 已提交
1002
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1003 1004
}

D
dapan1121 已提交
1005
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
D
dapan1121 已提交
1006 1007
  int32_t code = 0;
  SQWTaskCtx *ctx = NULL;
D
dapan1121 已提交
1008
  int8_t phase = 0;
D
dapan1121 已提交
1009
  bool needRsp = true;
D
dapan1121 已提交
1010 1011

  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
1012

D
dapan1121 已提交
1013 1014
  QW_LOCK(QW_WRITE, &ctx->lock);

D
dapan1121 已提交
1015 1016 1017
  if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
    QW_TASK_WLOG_E("task is dropping or already dropped");
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
1018
  }
D
dapan1121 已提交
1019
  
D
dapan1121 已提交
1020
  if (ctx->phase == QW_PHASE_PRE_QUERY) {
D
dapan1121 已提交
1021 1022
    ctx->ctrlConnInfo.handle == qwMsg->connInfo.handle;
    ctx->ctrlConnInfo.ahandle = qwMsg->connInfo.ahandle;
D
dapan1121 已提交
1023
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY);
D
dapan1121 已提交
1024 1025 1026 1027 1028 1029 1030
    needRsp = false;
    QW_TASK_DLOG_E("ready msg will not rsp now");
    goto _return;
  }

  QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);

X
Xiaoyu Wang 已提交
1031 1032
  if (atomic_load_8((int8_t*)&ctx->queryEnd) || atomic_load_8((int8_t*)&ctx->queryFetched)) {
    QW_TASK_ELOG("got ready msg at wrong status, queryEnd:%d, queryFetched:%d", atomic_load_8((int8_t*)&ctx->queryEnd), atomic_load_8((int8_t*)&ctx->queryFetched));
D
dapan1121 已提交
1033 1034 1035
    QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
  }

D
dapan1121 已提交
1036 1037 1038
  if (ctx->phase == QW_PHASE_POST_QUERY) {
    code = ctx->rspCode;
    goto _return;
D
dapan1121 已提交
1039 1040
  }

D
dapan1121 已提交
1041
  QW_TASK_ELOG("invalid phase when got ready msg, phase:%s", qwPhaseStr(ctx->phase));
H
Haojun Liao 已提交
1042

D
dapan1121 已提交
1043
  QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
D
dapan1121 已提交
1044 1045 1046

_return:

D
dapan1121 已提交
1047
  if (code && ctx) {
D
dapan1121 已提交
1048
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
1049 1050
  }

D
dapan1121 已提交
1051 1052 1053
  if (code) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
  }
H
Haojun Liao 已提交
1054

D
dapan1121 已提交
1055 1056
  if (ctx) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
1057
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
1058 1059
  }

D
dapan1121 已提交
1060
  if (needRsp) {
S
shm  
Shengliang Guan 已提交
1061
    qwBuildAndSendReadyRsp(&qwMsg->connInfo, code);
D
dapan1121 已提交
1062
    QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
1063 1064
  }

D
dapan1121 已提交
1065
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1066 1067 1068
}


D
dapan1121 已提交
1069
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
D
dapan1121 已提交
1070
  SQWTaskCtx *ctx = NULL;
D
dapan1121 已提交
1071
  int32_t code = 0;
1072 1073 1074
  SQWPhaseInput input = {0};
  void *rsp = NULL;
  int32_t dataLen = 0;
D
dapan1121 已提交
1075
  bool queryEnd = false;
D
dapan1121 已提交
1076 1077
  
  do {
D
dapan1121 已提交
1078
    QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
D
dapan1121 已提交
1079

D
dapan1121 已提交
1080
    QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
1081

X
Xiaoyu Wang 已提交
1082 1083
    atomic_store_8((int8_t*)&ctx->queryInQueue, 0);
    atomic_store_8((int8_t*)&ctx->queryContinue, 0);
D
dapan1121 已提交
1084

D
dapan1121 已提交
1085
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd));
D
dapan1121 已提交
1086

D
dapan1121 已提交
1087 1088 1089 1090 1091
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
      SOutputData sOutput = {0};
      QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
      
      if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {    
D
dapan1121 已提交
1092
        QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
D
dapan1121 已提交
1093
        
X
Xiaoyu Wang 已提交
1094
        atomic_store_8((int8_t*)&ctx->queryContinue, 1);
1095
      }
D
dapan1121 已提交
1096 1097
      
      if (rsp) {
D
dapan1121 已提交
1098
        bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
D
dapan1121 已提交
1099
        
D
dapan1121 已提交
1100
        qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
1101 1102 1103
        if (qComplete) {
          atomic_store_8((int8_t*)&ctx->queryEnd, true);
        }
H
Haojun Liao 已提交
1104

D
dapan1121 已提交
1105
        qwMsg->connInfo = ctx->dataConnInfo;
D
dapan1121 已提交
1106
        QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);            
1107
        
S
shm  
Shengliang Guan 已提交
1108
        qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);                
D
dapan1121 已提交
1109
        QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
D
dapan1121 已提交
1110
      } else {
X
Xiaoyu Wang 已提交
1111
        atomic_store_8((int8_t*)&ctx->queryContinue, 1);
1112 1113 1114
      }
    }

D
dapan1121 已提交
1115
_return:
1116

D
dapan1121 已提交
1117 1118 1119 1120
    if (NULL == ctx) {
      break;
    }

D
dapan1121 已提交
1121
    if (code && QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
1122
      QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);    
1123 1124
      qwFreeFetchRsp(rsp);
      rsp = NULL;
D
dapan1121 已提交
1125
      
D
dapan1121 已提交
1126
      qwMsg->connInfo = ctx->dataConnInfo;
S
shm  
Shengliang Guan 已提交
1127
      qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code);
D
dapan1121 已提交
1128
      QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0);
1129
    }
D
dapan1121 已提交
1130

D
dapan1121 已提交
1131
    QW_LOCK(QW_WRITE, &ctx->lock);
X
Xiaoyu Wang 已提交
1132
    if (queryEnd || code || 0 == atomic_load_8((int8_t*)&ctx->queryContinue)) {
D
dapan1121 已提交
1133
      // Note: if necessary, fetch need to put cquery to queue again
D
dapan1121 已提交
1134 1135 1136 1137 1138 1139
      atomic_store_8(&ctx->phase, 0);
      QW_UNLOCK(QW_WRITE,&ctx->lock);
      break;
    }
    QW_UNLOCK(QW_WRITE,&ctx->lock);
  } while (true);
D
dapan1121 已提交
1140

D
dapan1121 已提交
1141
  input.code = code;
D
dapan1121 已提交
1142 1143 1144
  qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL);    

  QW_RET(TSDB_CODE_SUCCESS);  
D
dapan1121 已提交
1145
}
D
dapan1121 已提交
1146

D
dapan1121 已提交
1147

D
dapan1121 已提交
1148
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
D
dapan1121 已提交
1149
  int32_t code = 0;
D
dapan1121 已提交
1150 1151 1152 1153 1154
  int32_t dataLen = 0;
  bool locked = false;
  SQWTaskCtx *ctx = NULL;
  void *rsp = NULL;
  SQWPhaseInput input = {0};
D
dapan1121 已提交
1155

D
dapan1121 已提交
1156
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
1157

1158
  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
1159
  
D
dapan 已提交
1160 1161
  SOutputData sOutput = {0};
  QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
D
dapan1121 已提交
1162

1163
  if (NULL == rsp) {
D
dapan1121 已提交
1164 1165
    atomic_store_ptr(&ctx->dataConnInfo.handle, qwMsg->connInfo.handle);
    atomic_store_ptr(&ctx->dataConnInfo.ahandle, qwMsg->connInfo.ahandle);
D
dapan1121 已提交
1166
    
1167
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
D
dapan1121 已提交
1168
  } else {
D
dapan1121 已提交
1169
    bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
D
dapan1121 已提交
1170
    
D
dapan1121 已提交
1171
    qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
1172 1173 1174
    if (qComplete) {
      atomic_store_8((int8_t*)&ctx->queryEnd, true);
    }
D
dapan1121 已提交
1175 1176
  }

D
dapan1121 已提交
1177
  if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {    
D
dapan1121 已提交
1178
    QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
D
dapan1121 已提交
1179

D
dapan1121 已提交
1180 1181
    QW_LOCK(QW_WRITE, &ctx->lock);
    locked = true;
1182

D
dapan1121 已提交
1183
    // RC WARNING
D
dapan1121 已提交
1184
    if (QW_IS_QUERY_RUNNING(ctx)) {
X
Xiaoyu Wang 已提交
1185 1186
      atomic_store_8((int8_t*)&ctx->queryContinue, 1);
    } else if (0 == atomic_load_8((int8_t*)&ctx->queryInQueue)) {
H
Haojun Liao 已提交
1187
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING);
D
dapan1121 已提交
1188

X
Xiaoyu Wang 已提交
1189
      atomic_store_8((int8_t*)&ctx->queryInQueue, 1);
1190
      
D
dapan1121 已提交
1191
      QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
1192
    }
D
dapan 已提交
1193 1194
  }
  
D
dapan1121 已提交
1195
_return:
D
dapan1121 已提交
1196

D
dapan1121 已提交
1197 1198 1199 1200 1201
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

  input.code = code;
D
dapan1121 已提交
1202
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL);
D
dapan1121 已提交
1203

D
dapan 已提交
1204 1205 1206
  if (code) {
    qwFreeFetchRsp(rsp);
    rsp = NULL;
D
dapan1121 已提交
1207
    dataLen = 0;
D
dapan1121 已提交
1208 1209 1210
  }

  if (code || rsp) {
S
shm  
Shengliang Guan 已提交
1211
    qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
D
dapan1121 已提交
1212
    QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
D
dapan1121 已提交
1213 1214
  }

D
dapan1121 已提交
1215
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1216
}
D
dapan1121 已提交
1217

1218

D
dapan1121 已提交
1219
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
D
dapan1121 已提交
1220
  int32_t code = 0;
D
dapan1121 已提交
1221
  bool rsped = false;
D
dapan1121 已提交
1222 1223 1224
  SQWTaskCtx *ctx = NULL;
  bool locked = false;

D
dapan1121 已提交
1225 1226
  // TODO : TASK ALREADY REMOVED AND A NEW DROP MSG RECEIVED

D
dapan1121 已提交
1227
  QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
1228
  
D
dapan1121 已提交
1229 1230 1231 1232 1233
  QW_LOCK(QW_WRITE, &ctx->lock);

  locked = true;

  if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
1234
    QW_TASK_WLOG_E("task already dropping");
D
dapan1121 已提交
1235 1236 1237
    QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
  }

D
dapan1121 已提交
1238
  if (QW_IS_QUERY_RUNNING(ctx)) {
D
dapan1121 已提交
1239
    QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
D
dapan1121 已提交
1240
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING);
D
dapan1121 已提交
1241
  } else if (ctx->phase > 0) {
S
shm  
Shengliang Guan 已提交
1242
    qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
D
dapan1121 已提交
1243
    QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
1244

D
dapan1121 已提交
1245
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
1246
    rsped = true;
D
dapan1121 已提交
1247 1248
  } else {
    // task not started
D
dapan1121 已提交
1249
  }
D
dapan1121 已提交
1250

D
dapan1121 已提交
1251
  if (!rsped) {
D
dapan1121 已提交
1252 1253
    ctx->ctrlConnInfo.handle = qwMsg->connInfo.handle;
    ctx->ctrlConnInfo.ahandle = qwMsg->connInfo.ahandle;
D
dapan 已提交
1254
    
D
dapan1121 已提交
1255 1256
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
  }
1257

D
dapan1121 已提交
1258
_return:
D
dapan1121 已提交
1259

D
dapan1121 已提交
1260
  if (code) {
D
dapan1121 已提交
1261 1262 1263
    if (ctx) {
      QW_UPDATE_RSP_CODE(ctx, code);
    }
H
Haojun Liao 已提交
1264

D
dapan1121 已提交
1265
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
D
dapan1121 已提交
1266 1267
  }

D
dapan 已提交
1268 1269 1270 1271
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

D
dapan1121 已提交
1272
  if (ctx) {
D
dapan1121 已提交
1273
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
1274 1275
  }

D
dapan1121 已提交
1276
  if (TSDB_CODE_SUCCESS != code) {
S
shm  
Shengliang Guan 已提交
1277
    qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
D
dapan1121 已提交
1278
    QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
1279
  }
D
dapan1121 已提交
1280

D
dapan1121 已提交
1281
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1282
}
D
dapan1121 已提交
1283

D
dapan1121 已提交
1284 1285 1286 1287 1288
int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
  int32_t code = 0;
  SSchedulerHbRsp rsp = {0};
  SQWSchStatus *sch = NULL;
  uint64_t seqId = 0;
D
dapan1121 已提交
1289
  void *origHandle = NULL;
D
dapan1121 已提交
1290 1291 1292 1293 1294

  memcpy(&rsp.epId, &req->epId, sizeof(req->epId));
  
  QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));

D
dapan1121 已提交
1295
  QW_LOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
1296

D
dapan1121 已提交
1297
  if (sch->hbConnInfo.handle) {
S
shm  
Shengliang Guan 已提交
1298
    tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER);
D
dapan1121 已提交
1299
  }
D
dapan1121 已提交
1300
  
D
dapan1121 已提交
1301
  memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
D
dapan1121 已提交
1302
  memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId));
D
dapan1121 已提交
1303
  
D
dapan1121 已提交
1304
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
1305 1306 1307
  
  QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p",
    req->sId, req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle);
D
dapan1121 已提交
1308

D
dapan1121 已提交
1309 1310 1311 1312
  qwReleaseScheduler(QW_READ, mgmt);

_return:

S
shm  
Shengliang Guan 已提交
1313
  qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
D
dapan1121 已提交
1314
  QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
1315
  
D
dapan1121 已提交
1316
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1317 1318 1319 1320
}


void qwProcessHbTimerEvent(void *param, void *tmrId) {
D
dapan1121 已提交
1321 1322 1323
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)param;
  SQWSchStatus *sch = NULL;
  int32_t taskNum = 0;
D
dapan1121 已提交
1324
  SQWHbInfo *rspList = NULL;
D
dapan1121 已提交
1325 1326
  int32_t code = 0;

D
dapan1121 已提交
1327 1328
  qwDbgDumpMgmtInfo(mgmt);

D
dapan1121 已提交
1329 1330 1331 1332 1333
  QW_LOCK(QW_READ, &mgmt->schLock);

  int32_t schNum = taosHashGetSize(mgmt->schHash);
  if (schNum <= 0) {
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
1334 1335
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
    return;
D
dapan1121 已提交
1336 1337
  }

wafwerar's avatar
wafwerar 已提交
1338
  rspList = taosMemoryCalloc(schNum, sizeof(SQWHbInfo));
D
dapan1121 已提交
1339 1340
  if (NULL == rspList) {
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
1341 1342 1343
    QW_ELOG("calloc %d SQWHbInfo failed", schNum);
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
    return;
D
dapan1121 已提交
1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366
  }

  void *key = NULL;
  size_t keyLen = 0;
  int32_t i = 0;

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
    code = qwGenerateSchHbRsp(mgmt, (SQWSchStatus *)pIter, &rspList[i]);
    if (code) {
      taosHashCancelIterate(mgmt->schHash, pIter);
      QW_ERR_JRET(code);
    }

    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

_return:

  QW_UNLOCK(QW_READ, &mgmt->schLock);

  for (int32_t j = 0; j < i; ++j) {
S
shm  
Shengliang Guan 已提交
1367
    qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code);
1368 1369
    QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),
            (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));
D
dapan1121 已提交
1370
    tFreeSSchedulerHbRsp(&rspList[j].rsp);
D
dapan1121 已提交
1371 1372
  }

wafwerar's avatar
wafwerar 已提交
1373
  taosMemoryFreeClear(rspList);
D
dapan1121 已提交
1374

D
dapan1121 已提交
1375
  taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);  
D
dapan1121 已提交
1376 1377
}

S
Shengliang Guan 已提交
1378 1379
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
  if (NULL == qWorkerMgmt || pMsgCb->pWrapper == NULL) {
D
dapan1121 已提交
1380 1381 1382
    qError("invalid param to init qworker");
    QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
S
Shengliang 已提交
1383

D
dapan1121 已提交
1384
  int32_t code = 0;
wafwerar's avatar
wafwerar 已提交
1385
  SQWorkerMgmt *mgmt = taosMemoryCalloc(1, sizeof(SQWorkerMgmt));
D
dapan1121 已提交
1386 1387
  if (NULL == mgmt) {
    qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt));
D
dapan1121 已提交
1388
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1389 1390 1391 1392
  }

  if (cfg) {
    mgmt->cfg = *cfg;
D
dapan1121 已提交
1393
    if (0 == mgmt->cfg.maxSchedulerNum) {
D
dapan1121 已提交
1394
      mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
D
dapan1121 已提交
1395 1396
    }
    if (0 == mgmt->cfg.maxTaskNum) {
D
dapan1121 已提交
1397
      mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
D
dapan1121 已提交
1398 1399
    }
    if (0 == mgmt->cfg.maxSchTaskNum) {
D
dapan1121 已提交
1400
      mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
1401
    }
D
dapan1121 已提交
1402
  } else {
D
dapan1121 已提交
1403 1404 1405
    mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
    mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
    mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
1406 1407
  }

1408
  mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1409
  if (NULL == mgmt->schHash) {
wafwerar's avatar
wafwerar 已提交
1410
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1411
    qError("init %d scheduler hash failed", mgmt->cfg.maxSchedulerNum);
D
dapan1121 已提交
1412
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1413 1414
  }

1415
  mgmt->ctxHash = taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1416
  if (NULL == mgmt->ctxHash) {
D
dapan1121 已提交
1417
    qError("init %d task ctx hash failed", mgmt->cfg.maxTaskNum);
D
dapan1121 已提交
1418 1419 1420 1421 1422 1423 1424 1425 1426
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  mgmt->timer = taosTmrInit(0, 0, 0, "qworker");
  if (NULL == mgmt->timer) {
    qError("init timer failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1427
  mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, mgmt, mgmt->timer);
D
dapan1121 已提交
1428 1429 1430
  if (NULL == mgmt->hbTimer) {
    qError("start hb timer failed");
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1431 1432
  }

D
dapan1121 已提交
1433 1434
  mgmt->nodeType = nodeType;
  mgmt->nodeId = nodeId;
S
Shengliang Guan 已提交
1435
  mgmt->msgCb = *pMsgCb;
D
dapan1121 已提交
1436

D
dapan1121 已提交
1437 1438
  *qWorkerMgmt = mgmt;

D
dapan1121 已提交
1439 1440
  qDebug("qworker initialized for node, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt);

D
dapan1121 已提交
1441
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1442 1443 1444 1445 1446 1447 1448 1449

_return:

  taosHashCleanup(mgmt->schHash);
  taosHashCleanup(mgmt->ctxHash);

  taosTmrCleanUp(mgmt->timer);
  
wafwerar's avatar
wafwerar 已提交
1450
  taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1451 1452

  QW_RET(code);
D
dapan1121 已提交
1453
}
D
dapan1121 已提交
1454 1455 1456 1457

void qWorkerDestroy(void **qWorkerMgmt) {
  if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
    return;
D
dapan1121 已提交
1458
  }
D
dapan 已提交
1459

D
dapan1121 已提交
1460
  SQWorkerMgmt *mgmt = *qWorkerMgmt;
D
dapan1121 已提交
1461 1462 1463

  taosTmrStopA(&mgmt->hbTimer);
  taosTmrCleanUp(mgmt->timer);
D
dapan1121 已提交
1464
  
D
dapan1121 已提交
1465
  //TODO STOP ALL QUERY
D
dapan1121 已提交
1466

D
dapan1121 已提交
1467
  //TODO FREE ALL
D
dapan1121 已提交
1468

wafwerar's avatar
wafwerar 已提交
1469
  taosMemoryFreeClear(*qWorkerMgmt);
D
dapan1121 已提交
1470
}
D
dapan1121 已提交
1471

D
dapan1121 已提交
1472
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) {
D
dapan1121 已提交
1473
/*
D
dapan1121 已提交
1474 1475
  SQWSchStatus *sch = NULL;
  int32_t taskNum = 0;
1476

D
dapan1121 已提交
1477
  QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch));
D
dapan1121 已提交
1478 1479
  
  sch->lastAccessTs = taosGetTimestampSec();
1480

D
dapan1121 已提交
1481 1482 1483 1484 1485
  QW_LOCK(QW_READ, &sch->tasksLock);
  
  taskNum = taosHashGetSize(sch->tasksHash);
  
  int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum;
wafwerar's avatar
wafwerar 已提交
1486
  *rsp = taosMemoryCalloc(1, size);
D
dapan1121 已提交
1487
  if (NULL == *rsp) {
D
dapan1121 已提交
1488
    QW_SCH_ELOG("calloc %d failed", size);
D
dapan1121 已提交
1489 1490 1491 1492 1493
    QW_UNLOCK(QW_READ, &sch->tasksLock);
    qwReleaseScheduler(QW_READ, mgmt);
    
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
1494

D
dapan1121 已提交
1495 1496 1497
  void *key = NULL;
  size_t keyLen = 0;
  int32_t i = 0;
1498

D
dapan1121 已提交
1499 1500 1501 1502
  void *pIter = taosHashIterate(sch->tasksHash, NULL);
  while (pIter) {
    SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
    taosHashGetKey(pIter, &key, &keyLen);
D
dapan1121 已提交
1503

D
dapan1121 已提交
1504 1505 1506
    QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId);
    (*rsp)->status[i].status = taskStatus->status;
    
D
dapan1121 已提交
1507
    ++i;
D
dapan1121 已提交
1508
    pIter = taosHashIterate(sch->tasksHash, pIter);
D
dapan1121 已提交
1509
  }  
D
dapan1121 已提交
1510

D
dapan1121 已提交
1511 1512 1513 1514
  QW_UNLOCK(QW_READ, &sch->tasksLock);
  qwReleaseScheduler(QW_READ, mgmt);

  (*rsp)->num = taskNum;
D
dapan1121 已提交
1515
*/
D
dapan1121 已提交
1516 1517 1518 1519
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
1520

D
dapan1121 已提交
1521 1522
int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
  SQWSchStatus *sch = NULL;
D
dapan1121 已提交
1523

D
dapan1121 已提交
1524
/*
D
dapan1121 已提交
1525
  QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
D
dapan1121 已提交
1526

D
dapan1121 已提交
1527
  sch->lastAccessTs = taosGetTimestampSec();
D
dapan1121 已提交
1528

D
dapan1121 已提交
1529
  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
1530
*/
D
dapan1121 已提交
1531 1532 1533 1534
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
1535 1536 1537 1538
int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t *taskStatus) {
  SQWSchStatus *sch = NULL;
  SQWTaskStatus *task = NULL;
  int32_t code = 0;
D
dapan1121 已提交
1539 1540

/*  
D
dapan1121 已提交
1541 1542 1543 1544
  if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch)) {
    *taskStatus = JOB_TASK_STATUS_NULL;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1545

D
dapan1121 已提交
1546 1547 1548 1549 1550 1551
  if (qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)) {
    qwReleaseScheduler(QW_READ, mgmt);
    
    *taskStatus = JOB_TASK_STATUS_NULL;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1552

D
dapan1121 已提交
1553
  *taskStatus = task->status;
D
dapan1121 已提交
1554

D
dapan1121 已提交
1555 1556
  qwReleaseTask(QW_READ, sch);
  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
1557
*/
D
dapan1121 已提交
1558 1559 1560 1561 1562

  QW_RET(code);
}


D
dapan1121 已提交
1563 1564 1565
int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
  SQWSchStatus *sch = NULL;
  SQWTaskStatus *task = NULL;
D
dapan1121 已提交
1566
  int32_t code = 0;
D
dapan1121 已提交
1567

D
dapan1121 已提交
1568
/*
D
dapan1121 已提交
1569
  QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch));
D
dapan1121 已提交
1570

D
dapan1121 已提交
1571
  QW_ERR_JRET(qwAcquireAddTask(mgmt, QW_READ, sch, qId, tId, JOB_TASK_STATUS_NOT_START, &task));
D
dapan1121 已提交
1572

D
dapan1121 已提交
1573

D
dapan1121 已提交
1574
  QW_LOCK(QW_WRITE, &task->lock);
D
dapan1121 已提交
1575

D
dapan1121 已提交
1576 1577 1578 1579 1580 1581 1582
  task->cancel = true;
  
  int8_t oriStatus = task->status;
  int8_t newStatus = 0;
  
  if (task->status == JOB_TASK_STATUS_CANCELLED || task->status == JOB_TASK_STATUS_NOT_START || task->status == JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING) {
    QW_UNLOCK(QW_WRITE, &task->lock);
D
dapan1121 已提交
1583

D
dapan1121 已提交
1584 1585 1586 1587 1588 1589 1590 1591
    qwReleaseTask(QW_READ, sch);
    qwReleaseScheduler(QW_READ, mgmt);
    
    return TSDB_CODE_SUCCESS;
  } else if (task->status == JOB_TASK_STATUS_FAILED || task->status == JOB_TASK_STATUS_SUCCEED || task->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
    QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
  } else {
    QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLING));
D
dapan1121 已提交
1592 1593
  }

D
dapan1121 已提交
1594 1595 1596 1597
  QW_UNLOCK(QW_WRITE, &task->lock);
  
  qwReleaseTask(QW_READ, sch);
  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
1598

D
dapan1121 已提交
1599 1600 1601 1602 1603
  if (oriStatus == JOB_TASK_STATUS_EXECUTING) {
    //TODO call executer to cancel subquery async
  }
  
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1604 1605

_return:
D
dapan1121 已提交
1606

D
dapan1121 已提交
1607 1608 1609 1610 1611 1612 1613 1614 1615
  if (task) {
    QW_UNLOCK(QW_WRITE, &task->lock);
    
    qwReleaseTask(QW_READ, sch);
  }

  if (sch) {
    qwReleaseScheduler(QW_READ, mgmt);
  }
D
dapan1121 已提交
1616
*/
D
dapan1121 已提交
1617

D
dapan1121 已提交
1618
  QW_RET(code);
D
dapan1121 已提交
1619 1620
}

D
dapan1121 已提交
1621