qworker.c 35.7 KB
Newer Older
1 2
#include "qworker.h"

dengyihao's avatar
dengyihao 已提交
3
#include "dataSinkMgt.h"
4
#include "executor.h"
D
dapan1121 已提交
5
#include "planner.h"
H
Haojun Liao 已提交
6
#include "query.h"
D
dapan1121 已提交
7 8
#include "qwInt.h"
#include "qwMsg.h"
dengyihao's avatar
dengyihao 已提交
9
#include "tcommon.h"
H
Haojun Liao 已提交
10
#include "tmsg.h"
11
#include "tname.h"
D
dapan1121 已提交
12

D
dapan1121 已提交
13
SQWorkerMgmt gQwMgmt = {
14 15 16
    .lock = 0,
    .qwRef = -1,
    .qwNum = 0,
D
dapan1121 已提交
17
};
18

D
dapan1121 已提交
19 20 21 22
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
  int32_t         code = 0;
  SSchedulerHbRsp rsp = {0};
  SQWSchStatus   *sch = NULL;
D
dapan1121 已提交
23

D
dapan1121 已提交
24
  QW_ERR_RET(qwAcquireScheduler(mgmt, req->sId, QW_READ, &sch));
D
dapan1121 已提交
25

D
dapan1121 已提交
26
  QW_LOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
27

D
dapan1121 已提交
28
  sch->hbBrokenTs = taosGetTimestampMs();
29

D
dapan1121 已提交
30 31 32 33
  if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
    tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
    sch->hbConnInfo.handle = NULL;
    sch->hbConnInfo.ahandle = NULL;
D
dapan1121 已提交
34

D
dapan1121 已提交
35 36 37
    QW_DLOG("release hb handle due to connection broken, handle:%p", qwMsg->connInfo.handle);
  } else {
    QW_DLOG("ignore hb connection broken, handle:%p, currentHandle:%p", qwMsg->connInfo.handle, sch->hbConnInfo.handle);
D
dapan1121 已提交
38
  }
D
dapan1121 已提交
39

D
dapan1121 已提交
40
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
41

D
dapan1121 已提交
42
  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
43

D
dapan1121 已提交
44
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
45 46
}

47 48
static void freeItem(void *param) {
  SExplainExecInfo *pInfo = param;
H
Haojun Liao 已提交
49 50 51
  taosMemoryFree(pInfo->verboseInfo);
}

D
dapan1121 已提交
52
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
D
dapan1121 已提交
53
  qTaskInfo_t taskHandle = ctx->taskHandle;
D
dapan1121 已提交
54

D
dapan1121 已提交
55
  if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) {
D
dapan1121 已提交
56
    if (ctx->explain) {
57
      SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo));
H
Haojun Liao 已提交
58
      QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList));
59

D
dapan1121 已提交
60
      if (ctx->localExec) {
61 62 63 64 65 66 67 68 69 70 71
        SExplainLocalRsp localRsp = {0};
        localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList);
        SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo));
        memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo));
        localRsp.rsp.subplanInfo = pExec;
        localRsp.qId = qId;
        localRsp.tId = tId;
        localRsp.rId = rId;
        localRsp.eId = eId;
        taosArrayPush(ctx->explainRes, &localRsp);
        taosArrayDestroy(execInfoList);
D
dapan1121 已提交
72 73 74 75 76 77 78
      } else {
        SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
        connInfo.ahandle = NULL;
        int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList);
        taosArrayDestroyEx(execInfoList, freeItem);
        QW_ERR_RET(code);
      }
D
dapan1121 已提交
79
    }
D
dapan1121 已提交
80 81 82 83

    if (!ctx->needFetch) {
      dsGetDataLength(ctx->sinkHandle, &ctx->affectedRows, NULL);
    }
D
dapan1121 已提交
84 85 86 87 88
  }

  return TSDB_CODE_SUCCESS;
}

89
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
dengyihao's avatar
dengyihao 已提交
90 91 92 93 94
  int32_t        code = 0;
  bool           qcontinue = true;
  uint64_t       useconds = 0;
  int32_t        i = 0;
  int32_t        execNum = 0;
D
dapan1121 已提交
95
  qTaskInfo_t    taskHandle = ctx->taskHandle;
D
dapan1121 已提交
96
  DataSinkHandle sinkHandle = ctx->sinkHandle;
97
  SLocalFetch    localFetch = {(void*)mgmt, qWorkerProcessLocalFetch, ctx->explainRes};
dengyihao's avatar
dengyihao 已提交
98

99
  SArray *pResList = taosArrayInit(4, POINTER_BYTES);
D
dapan1121 已提交
100
  while (true) {
H
Haojun Liao 已提交
101
    QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
D
dapan1121 已提交
102

D
dapan1121 已提交
103 104
    // if *taskHandle is NULL, it's killed right now
    if (taskHandle) {
D
dapan1121 已提交
105
      qwDbgSimulateSleep();
D
dapan1121 已提交
106
      code = qExecTaskOpt(taskHandle, pResList, &useconds, &localFetch);
D
dapan1121 已提交
107
      if (code) {
108 109 110 111 112
        if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
          QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
        } else {
          QW_TASK_DLOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
        }
113
        QW_ERR_JRET(code);
D
dapan1121 已提交
114
      }
D
dapan1121 已提交
115 116
    }

D
dapan1121 已提交
117 118
    ++execNum;

H
Haojun Liao 已提交
119
    if (taosArrayGetSize(pResList) == 0) {
dengyihao's avatar
dengyihao 已提交
120
      QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
D
dapan1121 已提交
121
      dsEndPut(sinkHandle, useconds);
D
dapan1121 已提交
122

123
      QW_ERR_JRET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
D
dapan1121 已提交
124

125 126
      if (queryStop) {
        *queryStop = true;
D
dapan1121 已提交
127
      }
dengyihao's avatar
dengyihao 已提交
128

D
dapan1121 已提交
129 130 131
      break;
    }

132
    for (int32_t j = 0; j < taosArrayGetSize(pResList); ++j) {
H
Haojun Liao 已提交
133 134
      SSDataBlock *pRes = taosArrayGetP(pResList, j);
      ASSERT(pRes->info.rows > 0);
H
Haojun Liao 已提交
135

H
Haojun Liao 已提交
136 137 138 139
      SInputData inputData = {.pData = pRes};
      code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
      if (code) {
        QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code));
140
        QW_ERR_JRET(code);
H
Haojun Liao 已提交
141
      }
142

H
Haojun Liao 已提交
143
      QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue);
D
dapan1121 已提交
144
    }
D
dapan1121 已提交
145

D
dapan1121 已提交
146
    if (!qcontinue) {
147 148 149
      if (queryStop) {
        *queryStop = true;
      }
150

D
dapan1121 已提交
151 152 153
      break;
    }

D
dapan1121 已提交
154
    if (ctx->needFetch && (!ctx->queryRsped) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) {
D
dapan1121 已提交
155 156 157
      break;
    }

D
dapan1121 已提交
158
    if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
159 160
      break;
    }
D
dapan1121 已提交
161

D
dapan1121 已提交
162 163 164
    if (atomic_load_32(&ctx->rspCode)) {
      break;
    }
D
dapan1121 已提交
165 166
  }

167 168
_return:

D
dapan1121 已提交
169 170
  taosArrayDestroy(pResList);
  QW_RET(code);
D
dapan1121 已提交
171
}
D
dapan1121 已提交
172

D
dapan1121 已提交
173
int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
D
dapan1121 已提交
174 175
  int32_t taskNum = 0;

D
dapan1121 已提交
176
  hbInfo->connInfo = sch->hbConnInfo;
D
dapan1121 已提交
177
  hbInfo->rsp.epId = sch->hbEpId;
D
dapan1121 已提交
178

D
dapan1121 已提交
179
  QW_LOCK(QW_READ, &sch->tasksLock);
dengyihao's avatar
dengyihao 已提交
180

D
dapan1121 已提交
181
  taskNum = taosHashGetSize(sch->tasksHash);
D
dapan1121 已提交
182 183 184

  hbInfo->rsp.taskStatus = taosArrayInit(taskNum, sizeof(STaskStatus));
  if (NULL == hbInfo->rsp.taskStatus) {
D
dapan1121 已提交
185
    QW_UNLOCK(QW_READ, &sch->tasksLock);
D
dapan1121 已提交
186
    QW_ELOG("taosArrayInit taskStatus failed, num:%d", taskNum);
D
dapan1121 已提交
187 188 189
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }

190
  void       *key = NULL;
dengyihao's avatar
dengyihao 已提交
191 192
  size_t      keyLen = 0;
  int32_t     i = 0;
D
dapan1121 已提交
193
  STaskStatus status = {0};
D
dapan1121 已提交
194 195 196 197

  void *pIter = taosHashIterate(sch->tasksHash, NULL);
  while (pIter) {
    SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
D
dapan1121 已提交
198
    key = taosHashGetKey(pIter, &keyLen);
D
dapan1121 已提交
199

dengyihao's avatar
dengyihao 已提交
200
    // TODO GET EXECUTOR API TO GET MORE INFO
D
dapan1121 已提交
201

D
dapan1121 已提交
202
    QW_GET_QTID(key, status.queryId, status.taskId, status.execId);
D
dapan1121 已提交
203 204
    status.status = taskStatus->status;
    status.refId = taskStatus->refId;
dengyihao's avatar
dengyihao 已提交
205

D
dapan1121 已提交
206
    taosArrayPush(hbInfo->rsp.taskStatus, &status);
dengyihao's avatar
dengyihao 已提交
207

D
dapan1121 已提交
208 209
    ++i;
    pIter = taosHashIterate(sch->tasksHash, pIter);
dengyihao's avatar
dengyihao 已提交
210
  }
D
dapan1121 已提交
211 212 213 214 215 216

  QW_UNLOCK(QW_READ, &sch->tasksLock);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
217
int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
D
dapan1121 已提交
218
  int64_t            len = 0;
D
dapan1121 已提交
219
  SRetrieveTableRsp *rsp = NULL;
dengyihao's avatar
dengyihao 已提交
220 221
  bool               queryEnd = false;
  int32_t            code = 0;
222
  SOutputData        output = {0};
D
dapan1121 已提交
223

224
  *dataLen = 0;
D
dapan1121 已提交
225

226 227
  while (true) {
    dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
D
dapan1121 已提交
228

229 230 231 232
    if (len < 0) {
      QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
      QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
    }
dengyihao's avatar
dengyihao 已提交
233

234 235 236 237 238 239 240
    if (len == 0) {
      if (queryEnd) {
        code = dsGetDataBlock(ctx->sinkHandle, &output);
        if (code) {
          QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
          QW_ERR_RET(code);
        }
H
Haojun Liao 已提交
241

242 243
        QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %d", pOutput->numOfBlocks,
                     pOutput->numOfRows);
244

245 246
        qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
        if (NULL == rsp) {
D
dapan1121 已提交
247
          QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &rsp));
248 249 250 251 252 253 254 255 256 257 258 259 260
          *pOutput = output;
        } else {
          pOutput->queryEnd = output.queryEnd;
          pOutput->bufStatus = output.bufStatus;
          pOutput->useconds = output.useconds;
        }

        break;
      }

      pOutput->bufStatus = DS_BUF_EMPTY;

      break;
D
dapan1121 已提交
261
    }
D
dapan1121 已提交
262

263 264
    // Got data from sink
    QW_TASK_DLOG("there are data in sink, dataLength:%d", len);
D
dapan1121 已提交
265

266
    *dataLen += len;
D
dapan1121 已提交
267

D
dapan1121 已提交
268
    QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &rsp));
D
dapan1121 已提交
269

270 271 272 273 274 275
    output.pData = rsp->data + *dataLen - len;
    code = dsGetDataBlock(ctx->sinkHandle, &output);
    if (code) {
      QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
      QW_ERR_RET(code);
    }
dengyihao's avatar
dengyihao 已提交
276

277 278 279 280 281 282 283 284 285 286
    pOutput->queryEnd = output.queryEnd;
    pOutput->precision = output.precision;
    pOutput->bufStatus = output.bufStatus;
    pOutput->useconds = output.useconds;
    pOutput->compressed = output.compressed;
    pOutput->numOfCols = output.numOfCols;
    pOutput->numOfRows += output.numOfRows;
    pOutput->numOfBlocks++;

    if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
287 288
      QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %d", pOutput->numOfBlocks,
                   pOutput->numOfRows);
289 290 291
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
      break;
    }
dengyihao's avatar
dengyihao 已提交
292

293 294 295 296
    if (0 == ctx->level) {
      QW_TASK_DLOG("task fetched blocks %d rows %d, level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level);
      break;
    }
D
dapan1121 已提交
297

298 299 300 301
    if (pOutput->numOfRows >= QW_MIN_RES_ROWS) {
      QW_TASK_DLOG("task fetched blocks %d rows %d reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows);
      break;
    }
D
dapan1121 已提交
302 303
  }

304 305
  *rspMsg = rsp;

D
dapan1121 已提交
306
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
307 308
}

309
int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes) {
310 311 312 313
  int64_t     len = 0;
  bool        queryEnd = false;
  int32_t     code = 0;
  SOutputData output = {0};
D
dapan1121 已提交
314 315 316

  dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);

D
dapan1121 已提交
317
  if (len <= 0 || len != sizeof(SDeleterRes)) {
D
dapan1121 已提交
318
    QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len);
D
dapan1121 已提交
319 320 321 322 323 324 325
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  output.pData = taosMemoryCalloc(1, len);
  if (NULL == output.pData) {
    QW_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
326

D
dapan1121 已提交
327 328 329 330 331 332 333
  code = dsGetDataBlock(ctx->sinkHandle, &output);
  if (code) {
    QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
    taosMemoryFree(output.pData);
    QW_ERR_RET(code);
  }

334 335
  SDeleterRes *pDelRes = (SDeleterRes *)output.pData;

D
dapan1121 已提交
336
  pRes->suid = pDelRes->suid;
D
dapan1121 已提交
337 338 339
  pRes->uidList = pDelRes->uidList;
  pRes->skey = pDelRes->skey;
  pRes->ekey = pDelRes->ekey;
340
  pRes->affectedRows = pDelRes->affectedRows;
341 342
  strcpy(pRes->tableFName, pDelRes->tableName);
  strcpy(pRes->tsColName, pDelRes->tsColName);
D
dapan1121 已提交
343
  taosMemoryFree(output.pData);
344

D
dapan1121 已提交
345 346 347
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
348
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
349 350
  int32_t     code = 0;
  SQWTaskCtx *ctx = NULL;
D
dapan1121 已提交
351

D
dapan1121 已提交
352
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
353

D
dapan1121 已提交
354
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
355

D
dapan1121 已提交
356
  QW_LOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
357

D
dapan1121 已提交
358
  QW_SET_PHASE(ctx, phase);
D
dapan1121 已提交
359

dengyihao's avatar
dengyihao 已提交
360
  if (atomic_load_8((int8_t *)&ctx->queryEnd)) {
D
dapan1121 已提交
361 362 363
    QW_TASK_ELOG_E("query already end");
    QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
  }
D
dapan1121 已提交
364

D
dapan1121 已提交
365 366
  switch (phase) {
    case QW_PHASE_PRE_QUERY: {
D
dapan1121 已提交
367
      if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
H
Haojun Liao 已提交
368
        QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
369
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
D
dapan1121 已提交
370 371
        break;
      }
D
dapan1121 已提交
372

D
dapan1121 已提交
373
      if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
374
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
375

376 377
        // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
378

D
dapan1121 已提交
379
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan 已提交
380
        break;
D
dapan1121 已提交
381
      }
D
dapan1121 已提交
382

D
dapan1121 已提交
383
      QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC));
D
dapan1121 已提交
384 385
      break;
    }
D
dapan1121 已提交
386
    case QW_PHASE_PRE_FETCH: {
D
dapan1121 已提交
387
      if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
388
        QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
389 390
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
      }
D
dapan1121 已提交
391

D
dapan1121 已提交
392
      if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
393
        QW_TASK_WLOG("last fetch still not processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
394 395 396
        QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
      }

D
dapan1121 已提交
397
      if (!ctx->queryRsped) {
D
dapan1121 已提交
398
        QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
399
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
D
dapan1121 已提交
400 401
      }
      break;
dengyihao's avatar
dengyihao 已提交
402
    }
D
dapan1121 已提交
403
    case QW_PHASE_PRE_CQUERY: {
D
dapan1121 已提交
404
      if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
405
        QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
406
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
407
      }
D
dapan1121 已提交
408

D
dapan1121 已提交
409
      if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
410
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
H
Haojun Liao 已提交
411

412 413
        // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
414

D
dapan1121 已提交
415
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
416
      }
D
dapan1121 已提交
417

D
dapan1121 已提交
418
      break;
D
dapan1121 已提交
419 420 421 422 423 424 425
    }
    default:
      QW_TASK_ELOG("invalid phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

  if (ctx->rspCode) {
426
    QW_TASK_ELOG("task already failed at phase %s, code:%s", qwPhaseStr(phase), tstrerror(ctx->rspCode));
D
dapan1121 已提交
427
    QW_ERR_JRET(ctx->rspCode);
D
dapan1121 已提交
428
  }
D
dapan1121 已提交
429

D
dapan1121 已提交
430
_return:
D
dapan1121 已提交
431

D
dapan1121 已提交
432
  if (ctx) {
D
dapan1121 已提交
433
    QW_UPDATE_RSP_CODE(ctx, code);
dengyihao's avatar
dengyihao 已提交
434

D
dapan1121 已提交
435
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
436 437
    qwReleaseTaskCtx(mgmt, ctx);
  }
D
dapan1121 已提交
438

439 440 441 442 443
  if (code != TSDB_CODE_SUCCESS) {
    QW_TASK_ELOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
  } else {
    QW_TASK_DLOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
  }
D
dapan1121 已提交
444 445 446 447 448

  QW_RET(code);
}

int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
449 450 451
  int32_t        code = 0;
  SQWTaskCtx    *ctx = NULL;
  SRpcHandleInfo connInfo = {0};
D
dapan1121 已提交
452

D
dapan1121 已提交
453
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
dengyihao's avatar
dengyihao 已提交
454

D
dapan1121 已提交
455
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
456

D
dapan1121 已提交
457 458
  QW_LOCK(QW_WRITE, &ctx->lock);

D
dapan1121 已提交
459
  if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
460
    QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
461 462 463
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
  }

D
dapan1121 已提交
464
  if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
465 466 467 468
    if (QW_PHASE_POST_FETCH == phase) {
      QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
D
dapan1121 已提交
469

470 471
    // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
    // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
472

D
dapan1121 已提交
473 474
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
475 476 477
  }

  if (ctx->rspCode) {
dengyihao's avatar
dengyihao 已提交
478 479
    QW_TASK_ELOG("task already failed, phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode,
                 tstrerror(ctx->rspCode));
D
dapan1121 已提交
480
    QW_ERR_JRET(ctx->rspCode);
dengyihao's avatar
dengyihao 已提交
481
  }
D
dapan1121 已提交
482

D
dapan1121 已提交
483
  QW_ERR_JRET(input->code);
D
dapan1121 已提交
484 485 486

_return:

D
dapan1121 已提交
487
  if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
D
dapan1121 已提交
488
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC);
D
dapan1121 已提交
489 490
  }

D
dapan1121 已提交
491
  if (QW_PHASE_POST_QUERY == phase && ctx) {
D
dapan1121 已提交
492 493 494 495 496 497 498 499 500
    if (!ctx->localExec) {
      bool   rsped = false;
      SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo};
      qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
      qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
      if (!rsped) {
        qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx);
        QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
      }
501
    }
D
dapan1121 已提交
502 503
    
    ctx->queryRsped = true;
504 505
  }

D
dapan1121 已提交
506
  if (ctx) {
D
dapan1121 已提交
507
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
508

D
dapan1121 已提交
509
    QW_SET_PHASE(ctx, phase);
dengyihao's avatar
dengyihao 已提交
510

D
dapan1121 已提交
511
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
512
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
513 514
  }

D
dapan1121 已提交
515
  if (code) {
D
dapan1121 已提交
516
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL);
D
dapan1121 已提交
517 518
  }

D
dapan1121 已提交
519
  QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
D
dapan1121 已提交
520

D
dapan1121 已提交
521 522 523
  QW_RET(code);
}

D
dapan1121 已提交
524 525 526 527 528 529
int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) {
  QW_ERR_RET(qwDropTask(QW_FPARAMS()));

  QW_RET(TSDB_CODE_SUCCESS);
}

D
dapan1121 已提交
530
int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
531 532
  int32_t        code = 0;
  SQWTaskCtx    *ctx = NULL;
D
dapan1121 已提交
533

D
dapan1121 已提交
534
  QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
D
dapan1121 已提交
535

D
dapan1121 已提交
536 537 538
  QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS()));

  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
539

D
dapan1121 已提交
540 541
  ctx->ctrlConnInfo = qwMsg->connInfo;

D
dapan1121 已提交
542
  QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
D
dapan1121 已提交
543 544 545 546 547 548 549 550 551 552 553

_return:

  if (ctx) {
    QW_UPDATE_RSP_CODE(ctx, code);
    qwReleaseTaskCtx(mgmt, ctx);
  }

  QW_RET(TSDB_CODE_SUCCESS);
}

554
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
D
dapan1121 已提交
555 556 557 558 559 560 561 562
  int32_t        code = 0;
  bool           queryRsped = false;
  SSubplan      *plan = NULL;
  SQWPhaseInput  input = {0};
  qTaskInfo_t    pTaskInfo = NULL;
  DataSinkHandle sinkHandle = NULL;
  SQWTaskCtx    *ctx = NULL;

D
dapan1121 已提交
563
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
D
dapan1121 已提交
564 565

  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
566

D
dapan1121 已提交
567 568 569
  ctx->taskType = qwMsg->msgInfo.taskType;
  ctx->explain = qwMsg->msgInfo.explain;
  ctx->needFetch = qwMsg->msgInfo.needFetch;
D
dapan1121 已提交
570
  ctx->msgType = qwMsg->msgType;
D
dapan1121 已提交
571
  ctx->localExec = false;
X
Xiaoyu Wang 已提交
572

573
  // QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
X
Xiaoyu Wang 已提交
574

575
  code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan);
D
dapan1121 已提交
576
  if (TSDB_CODE_SUCCESS != code) {
577 578
    code = TSDB_CODE_INVALID_MSG;
    QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
579
    QW_ERR_JRET(code);
D
dapan1121 已提交
580
  }
dengyihao's avatar
dengyihao 已提交
581

582
  code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
583
  sql = NULL;
D
dapan1121 已提交
584
  if (code) {
D
dapan1121 已提交
585
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
586
    QW_ERR_JRET(code);
D
dapan1121 已提交
587
  }
D
dapan1121 已提交
588

H
Haojun Liao 已提交
589
  if (NULL == sinkHandle || NULL == pTaskInfo) {
D
dapan1121 已提交
590 591 592 593
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

594
  ctx->level = plan->level;
D
dapan1121 已提交
595 596 597
  atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
  atomic_store_ptr(&ctx->sinkHandle, sinkHandle);

D
dapan1121 已提交
598 599
  qwSaveTbVersionInfo(pTaskInfo, ctx);
  QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
dengyihao's avatar
dengyihao 已提交
600

D
dapan1121 已提交
601 602
_return:

D
dapan1121 已提交
603
  taosMemoryFree(sql);
604

D
dapan1121 已提交
605
  input.code = code;
D
dapan1121 已提交
606
  input.msgType = qwMsg->msgType;
D
dapan1121 已提交
607
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
608

D
dapan1121 已提交
609
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
610 611
}

D
dapan1121 已提交
612
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
613
  SQWTaskCtx   *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
614
  int32_t       code = 0;
615
  SQWPhaseInput input = {0};
616
  void         *rsp = NULL;
dengyihao's avatar
dengyihao 已提交
617
  int32_t       dataLen = 0;
618
  bool          queryStop = false;
dengyihao's avatar
dengyihao 已提交
619

D
dapan1121 已提交
620
  do {
D
dapan1121 已提交
621
    QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
D
dapan1121 已提交
622

D
dapan1121 已提交
623
    QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
624

dengyihao's avatar
dengyihao 已提交
625 626
    atomic_store_8((int8_t *)&ctx->queryInQueue, 0);
    atomic_store_8((int8_t *)&ctx->queryContinue, 0);
D
dapan1121 已提交
627

628
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop));
D
dapan1121 已提交
629

D
dapan1121 已提交
630
    if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
631
      SOutputData sOutput = {0};
D
dapan1121 已提交
632
      QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
dengyihao's avatar
dengyihao 已提交
633 634

      if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
635
        QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
dengyihao's avatar
dengyihao 已提交
636 637

        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
638
      }
dengyihao's avatar
dengyihao 已提交
639

D
dapan1121 已提交
640
      if (rsp) {
D
dapan1121 已提交
641
        bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
642

D
dapan1121 已提交
643
        qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
644
        if (qComplete) {
dengyihao's avatar
dengyihao 已提交
645
          atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
646
        }
H
Haojun Liao 已提交
647

D
dapan1121 已提交
648
        qwMsg->connInfo = ctx->dataConnInfo;
dengyihao's avatar
dengyihao 已提交
649 650
        QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);

D
dapan1121 已提交
651
        qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code);
D
dapan1121 已提交
652
        rsp = NULL;
653

dengyihao's avatar
dengyihao 已提交
654 655
        QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
                     tstrerror(code), dataLen);
D
dapan1121 已提交
656
      } else {
dengyihao's avatar
dengyihao 已提交
657
        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
658 659 660
      }
    }

dengyihao's avatar
dengyihao 已提交
661
  _return:
662

D
dapan1121 已提交
663 664 665 666
    if (NULL == ctx) {
      break;
    }

D
dapan1121 已提交
667
    if (code && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
dengyihao's avatar
dengyihao 已提交
668
      QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
669 670
      qwFreeFetchRsp(rsp);
      rsp = NULL;
dengyihao's avatar
dengyihao 已提交
671

D
dapan1121 已提交
672
      qwMsg->connInfo = ctx->dataConnInfo;
D
dapan1121 已提交
673
      qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, NULL, 0, code);
dengyihao's avatar
dengyihao 已提交
674 675
      QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
                   0);
676
    }
D
dapan1121 已提交
677

D
dapan1121 已提交
678
    QW_LOCK(QW_WRITE, &ctx->lock);
679
    if (queryStop || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
D
dapan1121 已提交
680 681
      // Note: query is not running anymore
      QW_SET_PHASE(ctx, 0);
dengyihao's avatar
dengyihao 已提交
682
      QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
683 684
      break;
    }
dengyihao's avatar
dengyihao 已提交
685
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
686
  } while (true);
D
dapan1121 已提交
687

D
dapan1121 已提交
688
  input.code = code;
dengyihao's avatar
dengyihao 已提交
689
  qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL);
D
dapan1121 已提交
690

dengyihao's avatar
dengyihao 已提交
691
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
692
}
D
dapan1121 已提交
693

D
dapan1121 已提交
694
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
695 696 697
  int32_t       code = 0;
  int32_t       dataLen = 0;
  bool          locked = false;
698 699
  SQWTaskCtx   *ctx = NULL;
  void         *rsp = NULL;
D
dapan1121 已提交
700
  SQWPhaseInput input = {0};
D
dapan1121 已提交
701

D
dapan1121 已提交
702
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
703

704
  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
705

D
dapan1121 已提交
706
  ctx->msgType = qwMsg->msgType;
D
dapan1121 已提交
707
  ctx->dataConnInfo = qwMsg->connInfo;
D
dapan1121 已提交
708

D
dapan 已提交
709
  SOutputData sOutput = {0};
D
dapan1121 已提交
710
  QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
D
dapan1121 已提交
711

712 713
  if (NULL == rsp) {
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
D
dapan1121 已提交
714
  } else {
D
dapan1121 已提交
715
    bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
716

D
dapan1121 已提交
717
    qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
718
    if (qComplete) {
dengyihao's avatar
dengyihao 已提交
719
      atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
720
    }
D
dapan1121 已提交
721 722
  }

dengyihao's avatar
dengyihao 已提交
723
  if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
724
    QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
D
dapan1121 已提交
725

D
dapan1121 已提交
726 727
    QW_LOCK(QW_WRITE, &ctx->lock);
    locked = true;
728

D
dapan1121 已提交
729
    // RC WARNING
D
dapan1121 已提交
730
    if (QW_QUERY_RUNNING(ctx)) {
dengyihao's avatar
dengyihao 已提交
731 732
      atomic_store_8((int8_t *)&ctx->queryContinue, 1);
    } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
D
dapan1121 已提交
733
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC);
D
dapan1121 已提交
734

dengyihao's avatar
dengyihao 已提交
735 736
      atomic_store_8((int8_t *)&ctx->queryInQueue, 1);

D
dapan1121 已提交
737
      QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
738
    }
D
dapan 已提交
739
  }
dengyihao's avatar
dengyihao 已提交
740

D
dapan1121 已提交
741
_return:
D
dapan1121 已提交
742

D
dapan1121 已提交
743 744 745 746 747
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

  input.code = code;
D
dapan1121 已提交
748
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL);
D
dapan1121 已提交
749

D
dapan 已提交
750 751 752
  if (code) {
    qwFreeFetchRsp(rsp);
    rsp = NULL;
D
dapan1121 已提交
753
    dataLen = 0;
D
dapan1121 已提交
754 755 756
  }

  if (code || rsp) {
D
dapan1121 已提交
757
    bool rsped = false;
D
dapan1121 已提交
758
    if (ctx) {
759
      qwDbgSimulateRedirect(qwMsg, ctx, &rsped);
D
dapan1121 已提交
760 761
      qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
    }
D
dapan1121 已提交
762 763
    if (!rsped) {
      qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
764 765
      QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1),
                   qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
D
dapan1121 已提交
766
    }
D
dapan1121 已提交
767 768
  }

D
dapan1121 已提交
769
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
770
}
D
dapan1121 已提交
771

D
dapan1121 已提交
772
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
773
  int32_t     code = 0;
D
dapan1121 已提交
774
  bool        dropped = false;
D
dapan1121 已提交
775
  SQWTaskCtx *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
776
  bool        locked = false;
D
dapan1121 已提交
777

D
dapan1121 已提交
778
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
779

D
dapan1121 已提交
780 781 782 783
  QW_LOCK(QW_WRITE, &ctx->lock);

  locked = true;

D
dapan1121 已提交
784
  if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
785
    QW_TASK_WLOG_E("task already dropping");
D
dapan1121 已提交
786 787 788
    QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
  }

D
dapan1121 已提交
789
  if (QW_QUERY_RUNNING(ctx)) {
D
dapan1121 已提交
790
    QW_ERR_JRET(qwKillTaskHandle(ctx));
D
dapan1121 已提交
791
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP);
D
dapan1121 已提交
792
  } else {
D
dapan1121 已提交
793
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
794
    dropped = true;
D
dapan1121 已提交
795
  }
D
dapan1121 已提交
796

D
dapan1121 已提交
797
  if (!dropped) {
D
dapan1121 已提交
798 799
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
  }
800

D
dapan1121 已提交
801
_return:
D
dapan1121 已提交
802

D
dapan1121 已提交
803
  if (code) {
D
dapan1121 已提交
804 805 806
    if (ctx) {
      QW_UPDATE_RSP_CODE(ctx, code);
    }
H
Haojun Liao 已提交
807

D
dapan1121 已提交
808
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL);
D
dapan1121 已提交
809 810
  }

D
dapan 已提交
811 812 813 814
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

D
dapan1121 已提交
815
  if (ctx) {
D
dapan1121 已提交
816
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
817 818
  }

D
dapan1121 已提交
819 820 821
  QW_RET(TSDB_CODE_SUCCESS);
}

D
dapan1121 已提交
822
int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
dengyihao's avatar
dengyihao 已提交
823
  int32_t         code = 0;
D
dapan1121 已提交
824
  SSchedulerHbRsp rsp = {0};
825
  SQWSchStatus   *sch = NULL;
D
dapan1121 已提交
826

D
dapan1121 已提交
827 828 829
  if (qwMsg->code) {
    QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req));
  }
D
dapan1121 已提交
830 831

  QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
D
dapan1121 已提交
832 833
  QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo));

D
dapan1121 已提交
834 835
  sch->hbBrokenTs = 0;

D
dapan1121 已提交
836
  QW_LOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
837

D
dapan1121 已提交
838
  if (sch->hbConnInfo.handle) {
839
    tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
S
Shengliang Guan 已提交
840
    sch->hbConnInfo.handle = NULL;
D
dapan1121 已提交
841
  }
D
dapan1121 已提交
842

D
dapan1121 已提交
843
  memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
D
dapan1121 已提交
844
  memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId));
dengyihao's avatar
dengyihao 已提交
845

D
dapan1121 已提交
846
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
dengyihao's avatar
dengyihao 已提交
847 848 849

  QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", req->sId,
          req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle);
D
dapan1121 已提交
850

D
dapan1121 已提交
851 852 853 854
  qwReleaseScheduler(QW_READ, mgmt);

_return:

D
dapan1121 已提交
855 856
  memcpy(&rsp.epId, &req->epId, sizeof(req->epId));

S
shm  
Shengliang Guan 已提交
857
  qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
D
dapan1121 已提交
858 859

  if (code) {
860
    tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER);
S
Shengliang Guan 已提交
861
    qwMsg->connInfo.handle = NULL;
D
dapan1121 已提交
862
  }
dengyihao's avatar
dengyihao 已提交
863

864
  /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));*/
dengyihao's avatar
dengyihao 已提交
865

D
dapan1121 已提交
866
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
867 868 869
}

void qwProcessHbTimerEvent(void *param, void *tmrId) {
870
  SQWHbParam *hbParam = (SQWHbParam *)param;
D
dapan1121 已提交
871 872 873 874
  if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) {
    return;
  }

875
  int64_t   refId = hbParam->refId;
D
dapan1121 已提交
876 877 878 879 880
  SQWorker *mgmt = qwAcquire(refId);
  if (NULL == mgmt) {
    QW_DLOG("qwAcquire %" PRIx64 "failed", refId);
    return;
  }
881

D
dapan1121 已提交
882
  SQWSchStatus *sch = NULL;
dengyihao's avatar
dengyihao 已提交
883
  int32_t       taskNum = 0;
884
  SQWHbInfo    *rspList = NULL;
D
dapan1121 已提交
885
  SArray       *pExpiredSch = NULL;
dengyihao's avatar
dengyihao 已提交
886
  int32_t       code = 0;
D
dapan1121 已提交
887

D
dapan1121 已提交
888 889
  qwDbgDumpMgmtInfo(mgmt);

D
dapan1121 已提交
890 891 892 893 894
  QW_LOCK(QW_READ, &mgmt->schLock);

  int32_t schNum = taosHashGetSize(mgmt->schHash);
  if (schNum <= 0) {
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
895
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
896
    qwRelease(refId);
D
dapan1121 已提交
897
    return;
D
dapan1121 已提交
898 899
  }

wafwerar's avatar
wafwerar 已提交
900
  rspList = taosMemoryCalloc(schNum, sizeof(SQWHbInfo));
D
dapan1121 已提交
901 902
  pExpiredSch = taosArrayInit(schNum, sizeof(uint64_t));
  if (NULL == rspList || NULL == pExpiredSch) {
D
dapan1121 已提交
903
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
904 905
    taosMemoryFree(rspList);
    taosArrayDestroy(pExpiredSch);
D
dapan1121 已提交
906 907
    QW_ELOG("calloc %d SQWHbInfo failed", schNum);
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
908
    qwRelease(refId);
D
dapan1121 已提交
909
    return;
D
dapan1121 已提交
910 911
  }

912
  void   *key = NULL;
dengyihao's avatar
dengyihao 已提交
913
  size_t  keyLen = 0;
D
dapan1121 已提交
914
  int32_t i = 0;
D
dapan1121 已提交
915
  int64_t currentMs = taosGetTimestampMs();
D
dapan1121 已提交
916 917 918

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
D
dapan1121 已提交
919 920 921
    SQWSchStatus *sch = (SQWSchStatus *)pIter;
    if (NULL == sch->hbConnInfo.handle) {
      uint64_t *sId = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
922
      QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
D
dapan1121 已提交
923

924 925
      if (sch->hbBrokenTs > 0 && ((currentMs - sch->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) &&
          taosHashGetSize(sch->tasksHash) <= 0) {
D
dapan1121 已提交
926 927
        taosArrayPush(pExpiredSch, sId);
      }
928

D
dapan1121 已提交
929 930 931
      pIter = taosHashIterate(mgmt->schHash, pIter);
      continue;
    }
dengyihao's avatar
dengyihao 已提交
932

D
dapan1121 已提交
933 934 935 936 937 938 939 940 941 942 943 944 945 946 947
    code = qwGenerateSchHbRsp(mgmt, (SQWSchStatus *)pIter, &rspList[i]);
    if (code) {
      taosHashCancelIterate(mgmt->schHash, pIter);
      QW_ERR_JRET(code);
    }

    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

_return:

  QW_UNLOCK(QW_READ, &mgmt->schLock);

  for (int32_t j = 0; j < i; ++j) {
S
shm  
Shengliang Guan 已提交
948
    qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code);
949 950
    /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),*/
    /*(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));*/
D
dapan1121 已提交
951
    tFreeSSchedulerHbRsp(&rspList[j].rsp);
D
dapan1121 已提交
952 953
  }

D
dapan1121 已提交
954
  if (taosArrayGetSize(pExpiredSch) > 0) {
D
dapan1121 已提交
955
    qwClearExpiredSch(mgmt, pExpiredSch);
D
dapan1121 已提交
956 957
  }

wafwerar's avatar
wafwerar 已提交
958
  taosMemoryFreeClear(rspList);
D
dapan1121 已提交
959
  taosArrayDestroy(pExpiredSch);
D
dapan1121 已提交
960

dengyihao's avatar
dengyihao 已提交
961
  taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
962
  qwRelease(refId);
D
dapan1121 已提交
963 964
}

965
int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
D
dapan1121 已提交
966 967 968 969 970 971
  int32_t        code = 0;
  SSubplan      *plan = NULL;
  qTaskInfo_t    pTaskInfo = NULL;
  DataSinkHandle sinkHandle = NULL;
  SQWTaskCtx     ctx = {0};

972
  code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan);
D
dapan1121 已提交
973 974 975 976 977 978
  if (TSDB_CODE_SUCCESS != code) {
    code = TSDB_CODE_INVALID_MSG;
    QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
    QW_ERR_JRET(code);
  }

979
  code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
980 981 982 983 984 985 986 987 988 989
  if (code) {
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
    QW_ERR_JRET(code);
  }

  if (NULL == sinkHandle || NULL == pTaskInfo) {
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

D
dapan1121 已提交
990 991
  ctx.taskHandle = pTaskInfo;
  ctx.sinkHandle = sinkHandle;
D
dapan1121 已提交
992

D
dapan1121 已提交
993
  QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx, NULL));
D
dapan1121 已提交
994

995
  QW_ERR_JRET(qwGetDeleteResFromSink(QW_FPARAMS(), &ctx, pRes));
D
dapan1121 已提交
996 997 998

_return:

D
dapan1121 已提交
999
  qwFreeTaskCtx(&ctx);
D
dapan1121 已提交
1000 1001 1002 1003

  QW_RET(TSDB_CODE_SUCCESS);
}

D
dapan1121 已提交
1004 1005
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
  if (NULL == qWorkerMgmt || (pMsgCb && pMsgCb->mgmt == NULL)) {
D
dapan1121 已提交
1006 1007 1008
    qError("invalid param to init qworker");
    QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
S
Shengliang 已提交
1009

D
dapan1121 已提交
1010 1011 1012 1013
  int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1);
  if (1 == qwNum) {
    memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param));
  }
D
dapan1121 已提交
1014 1015 1016 1017 1018 1019 1020 1021

  int32_t code = qwOpenRef();
  if (code) {
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
    QW_RET(code);
  }

  SQWorker *mgmt = taosMemoryCalloc(1, sizeof(SQWorker));
D
dapan1121 已提交
1022
  if (NULL == mgmt) {
D
dapan1121 已提交
1023 1024
    qError("calloc %d failed", (int32_t)sizeof(SQWorker));
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
D
dapan1121 已提交
1025
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1026 1027
  }

D
dapan1121 已提交
1028 1029 1030
  mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
  mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
  mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
1031

dengyihao's avatar
dengyihao 已提交
1032 1033
  mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                               HASH_ENTRY_LOCK);
D
dapan1121 已提交
1034
  if (NULL == mgmt->schHash) {
wafwerar's avatar
wafwerar 已提交
1035
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1036
    qError("init %d scheduler hash failed", mgmt->cfg.maxSchedulerNum);
D
dapan1121 已提交
1037
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1038 1039
  }

dengyihao's avatar
dengyihao 已提交
1040 1041
  mgmt->ctxHash =
      taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1042
  if (NULL == mgmt->ctxHash) {
D
dapan1121 已提交
1043
    qError("init %d task ctx hash failed", mgmt->cfg.maxTaskNum);
D
dapan1121 已提交
1044 1045 1046 1047 1048 1049 1050 1051 1052
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  mgmt->timer = taosTmrInit(0, 0, 0, "qworker");
  if (NULL == mgmt->timer) {
    qError("init timer failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1053 1054
  mgmt->nodeType = nodeType;
  mgmt->nodeId = nodeId;
D
dapan1121 已提交
1055 1056 1057 1058 1059
  if (pMsgCb) {
    mgmt->msgCb = *pMsgCb;
  } else {
    memset(&mgmt->msgCb, 0, sizeof(mgmt->msgCb));
  }
D
dapan1121 已提交
1060

D
dapan1121 已提交
1061 1062 1063 1064 1065 1066
  mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt);
  if (mgmt->refId < 0) {
    qError("taosAddRef qw failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(terrno);
  }

D
dapan1121 已提交
1067 1068 1069
  SQWHbParam *param = NULL;
  qwSetHbParam(mgmt->refId, &param);

1070
  mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void *)param, mgmt->timer);
D
dapan1121 已提交
1071 1072 1073 1074
  if (NULL == mgmt->hbTimer) {
    qError("start hb timer failed");
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
1075

D
dapan1121 已提交
1076 1077
  *qWorkerMgmt = mgmt;

S
Shengliang Guan 已提交
1078
  qDebug("qworker initialized, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt);
D
dapan1121 已提交
1079

D
dapan1121 已提交
1080
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1081 1082 1083

_return:

D
dapan1121 已提交
1084 1085 1086 1087 1088 1089 1090
  if (mgmt->refId >= 0) {
    qwRelease(mgmt->refId);
  } else {
    taosHashCleanup(mgmt->schHash);
    taosHashCleanup(mgmt->ctxHash);
    taosTmrCleanUp(mgmt->timer);
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1091

1092
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
D
dapan1121 已提交
1093
  }
1094

D
dapan1121 已提交
1095
  QW_RET(code);
D
dapan1121 已提交
1096
}
D
dapan1121 已提交
1097 1098 1099 1100

void qWorkerDestroy(void **qWorkerMgmt) {
  if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
    return;
D
dapan1121 已提交
1101
  }
D
dapan 已提交
1102

D
dapan1121 已提交
1103
  SQWorker *mgmt = *qWorkerMgmt;
D
dapan1121 已提交
1104

D
dapan1121 已提交
1105 1106 1107
  if (taosRemoveRef(gQwMgmt.qwRef, mgmt->refId)) {
    qError("remove qw from ref list failed, refId:%" PRIx64, mgmt->refId);
  }
D
dapan1121 已提交
1108
}
D
dapan1121 已提交
1109

D
dapan1121 已提交
1110 1111 1112 1113 1114
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat) {
  if (NULL == handle || NULL == qWorkerMgmt || NULL == pStat) {
    QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

1115
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
1116
  SDataSinkStat sinkStat = {0};
1117

D
dapan1121 已提交
1118 1119
  dsDataSinkGetCacheSize(&sinkStat);
  pStat->cacheDataSize = sinkStat.cachedSize;
1120

D
dapan1121 已提交
1121 1122 1123 1124 1125
  pStat->queryProcessed = QW_STAT_GET(mgmt->stat.msgStat.queryProcessed);
  pStat->cqueryProcessed = QW_STAT_GET(mgmt->stat.msgStat.cqueryProcessed);
  pStat->fetchProcessed = QW_STAT_GET(mgmt->stat.msgStat.fetchProcessed);
  pStat->dropProcessed = QW_STAT_GET(mgmt->stat.msgStat.dropProcessed);
  pStat->hbProcessed = QW_STAT_GET(mgmt->stat.msgStat.hbProcessed);
D
dapan1121 已提交
1126
  pStat->deleteProcessed = QW_STAT_GET(mgmt->stat.msgStat.deleteProcessed);
D
dapan1121 已提交
1127 1128 1129 1130 1131 1132 1133

  pStat->numOfQueryInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, QUERY_QUEUE);
  pStat->numOfFetchInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, FETCH_QUEUE);
  pStat->timeInQueryQueue = qwGetTimeInQueue((SQWorker *)qWorkerMgmt, QUERY_QUEUE);
  pStat->timeInFetchQueue = qwGetTimeInQueue((SQWorker *)qWorkerMgmt, FETCH_QUEUE);

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1134
}
D
dapan1121 已提交
1135

D
dapan1121 已提交
1136 1137
int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg, SArray *explainRes) {
  SQWorker      *mgmt = (SQWorker*)pMgmt;
D
dapan1121 已提交
1138 1139 1140 1141 1142 1143
  int32_t        code = 0;
  SQWTaskCtx    *ctx = NULL;
  SSubplan      *plan = (SSubplan *)qwMsg->msg;
  SQWPhaseInput  input = {0};
  qTaskInfo_t    pTaskInfo = NULL;
  DataSinkHandle sinkHandle = NULL;
D
dapan1121 已提交
1144
  SReadHandle rHandle = {0};
D
dapan1121 已提交
1145 1146

  QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS()));
D
dapan1121 已提交
1147
  QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
D
dapan1121 已提交
1148

D
dapan1121 已提交
1149
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
D
dapan1121 已提交
1150 1151 1152 1153 1154 1155
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));

  ctx->taskType = qwMsg->msgInfo.taskType;
  ctx->explain = qwMsg->msgInfo.explain;
  ctx->needFetch = qwMsg->msgInfo.needFetch;
  ctx->msgType = qwMsg->msgType;
D
dapan1121 已提交
1156 1157
  ctx->localExec = true;
  ctx->explainRes = explainRes;
D
dapan1121 已提交
1158

D
dapan1121 已提交
1159 1160 1161 1162
  rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb));
  rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle;
  
  code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180
  if (code) {
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
    QW_ERR_JRET(code);
  }

  if (NULL == sinkHandle || NULL == pTaskInfo) {
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

  ctx->level = plan->level;
  atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
  atomic_store_ptr(&ctx->sinkHandle, sinkHandle);

  QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));

_return:

D
dapan1121 已提交
1181 1182 1183 1184 1185 1186
  taosMemoryFree(rHandle.pMsgCb);
  
  input.code = code;
  input.msgType = qwMsg->msgType;
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);

D
dapan1121 已提交
1187 1188 1189 1190 1191 1192 1193 1194
  if (ctx) {
    QW_UPDATE_RSP_CODE(ctx, code);
    qwReleaseTaskCtx(mgmt, ctx);
  }

  QW_RET(code);
}

D
dapan1121 已提交
1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242
int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp, SArray* explainRes) {
  SQWorker      *mgmt = (SQWorker*)pMgmt;
  int32_t       code = 0;
  int32_t       dataLen = 0;
  SQWTaskCtx   *ctx = NULL;
  void         *rsp = NULL;
  bool          queryStop = false;

  SQWPhaseInput input = {0};

  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));

  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));

  ctx->msgType = TDMT_SCH_MERGE_FETCH;
  ctx->explainRes = explainRes;
  
  SOutputData sOutput = {0};
  
  while (true) {
    QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));

    if (NULL == rsp) {
      QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop));
      
      continue;
    } else {
      bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);

      qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
      if (qComplete) {
        atomic_store_8((int8_t *)&ctx->queryEnd, true);
      }

      break;
    }
  }

_return:

  *pRsp = rsp;

  input.code = code;
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL);

  QW_RET(code);
}

D
dapan1121 已提交
1243