qworker.c 35.9 KB
Newer Older
1 2
#include "qworker.h"

dengyihao's avatar
dengyihao 已提交
3
#include "dataSinkMgt.h"
4
#include "executor.h"
D
dapan1121 已提交
5
#include "planner.h"
H
Haojun Liao 已提交
6
#include "query.h"
D
dapan1121 已提交
7 8
#include "qwInt.h"
#include "qwMsg.h"
dengyihao's avatar
dengyihao 已提交
9
#include "tcommon.h"
H
Haojun Liao 已提交
10
#include "tmsg.h"
11
#include "tname.h"
H
Haojun Liao 已提交
12
#include "tdatablock.h"
D
dapan1121 已提交
13

D
dapan1121 已提交
14
SQWorkerMgmt gQwMgmt = {
15 16 17
    .lock = 0,
    .qwRef = -1,
    .qwNum = 0,
D
dapan1121 已提交
18
};
19

H
Haojun Liao 已提交
20 21 22 23 24
static void freeBlock(void* param) {
  SSDataBlock* pBlock = *(SSDataBlock**)param;
  blockDataDestroy(pBlock);
}

D
dapan1121 已提交
25 26 27 28
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
  int32_t         code = 0;
  SSchedulerHbRsp rsp = {0};
  SQWSchStatus   *sch = NULL;
D
dapan1121 已提交
29

D
dapan1121 已提交
30
  QW_ERR_RET(qwAcquireScheduler(mgmt, req->sId, QW_READ, &sch));
D
dapan1121 已提交
31

D
dapan1121 已提交
32
  QW_LOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
33

D
dapan1121 已提交
34
  sch->hbBrokenTs = taosGetTimestampMs();
35

D
dapan1121 已提交
36 37 38 39
  if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
    tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
    sch->hbConnInfo.handle = NULL;
    sch->hbConnInfo.ahandle = NULL;
D
dapan1121 已提交
40

D
dapan1121 已提交
41 42 43
    QW_DLOG("release hb handle due to connection broken, handle:%p", qwMsg->connInfo.handle);
  } else {
    QW_DLOG("ignore hb connection broken, handle:%p, currentHandle:%p", qwMsg->connInfo.handle, sch->hbConnInfo.handle);
D
dapan1121 已提交
44
  }
D
dapan1121 已提交
45

D
dapan1121 已提交
46
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
47

D
dapan1121 已提交
48
  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
49

D
dapan1121 已提交
50
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
51 52
}

53 54
static void freeItem(void *param) {
  SExplainExecInfo *pInfo = param;
H
Haojun Liao 已提交
55 56 57
  taosMemoryFree(pInfo->verboseInfo);
}

D
dapan1121 已提交
58
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
D
dapan1121 已提交
59
  qTaskInfo_t taskHandle = ctx->taskHandle;
D
dapan1121 已提交
60

D
dapan1121 已提交
61
  if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) {
D
dapan1121 已提交
62
    if (ctx->explain) {
63
      SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo));
H
Haojun Liao 已提交
64
      QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList));
65

D
dapan1121 已提交
66
      if (ctx->localExec) {
67 68 69 70 71 72 73 74 75 76 77
        SExplainLocalRsp localRsp = {0};
        localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList);
        SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo));
        memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo));
        localRsp.rsp.subplanInfo = pExec;
        localRsp.qId = qId;
        localRsp.tId = tId;
        localRsp.rId = rId;
        localRsp.eId = eId;
        taosArrayPush(ctx->explainRes, &localRsp);
        taosArrayDestroy(execInfoList);
D
dapan1121 已提交
78 79 80 81 82 83 84
      } else {
        SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
        connInfo.ahandle = NULL;
        int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList);
        taosArrayDestroyEx(execInfoList, freeItem);
        QW_ERR_RET(code);
      }
D
dapan1121 已提交
85
    }
D
dapan1121 已提交
86 87 88 89

    if (!ctx->needFetch) {
      dsGetDataLength(ctx->sinkHandle, &ctx->affectedRows, NULL);
    }
D
dapan1121 已提交
90 91 92 93 94
  }

  return TSDB_CODE_SUCCESS;
}

95
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
dengyihao's avatar
dengyihao 已提交
96 97 98 99 100
  int32_t        code = 0;
  bool           qcontinue = true;
  uint64_t       useconds = 0;
  int32_t        i = 0;
  int32_t        execNum = 0;
D
dapan1121 已提交
101
  qTaskInfo_t    taskHandle = ctx->taskHandle;
D
dapan1121 已提交
102
  DataSinkHandle sinkHandle = ctx->sinkHandle;
D
dapan1121 已提交
103
  SLocalFetch    localFetch = {(void*)mgmt, ctx->localExec, qWorkerProcessLocalFetch, ctx->explainRes};
dengyihao's avatar
dengyihao 已提交
104

105
  SArray *pResList = taosArrayInit(4, POINTER_BYTES);
D
dapan1121 已提交
106
  while (true) {
H
Haojun Liao 已提交
107
    QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
D
dapan1121 已提交
108

D
dapan1121 已提交
109 110
    // if *taskHandle is NULL, it's killed right now
    if (taskHandle) {
D
dapan1121 已提交
111
      qwDbgSimulateSleep();
D
dapan1121 已提交
112
      code = qExecTaskOpt(taskHandle, pResList, &useconds, &localFetch);
D
dapan1121 已提交
113
      if (code) {
114 115 116 117 118
        if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
          QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
        } else {
          QW_TASK_DLOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
        }
119
        QW_ERR_JRET(code);
D
dapan1121 已提交
120
      }
D
dapan1121 已提交
121 122
    }

D
dapan1121 已提交
123 124
    ++execNum;

H
Haojun Liao 已提交
125
    if (taosArrayGetSize(pResList) == 0) {
dengyihao's avatar
dengyihao 已提交
126
      QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
D
dapan1121 已提交
127
      dsEndPut(sinkHandle, useconds);
D
dapan1121 已提交
128

129
      QW_ERR_JRET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
D
dapan1121 已提交
130

131 132
      if (queryStop) {
        *queryStop = true;
D
dapan1121 已提交
133
      }
dengyihao's avatar
dengyihao 已提交
134

D
dapan1121 已提交
135 136 137
      break;
    }

138
    for (int32_t j = 0; j < taosArrayGetSize(pResList); ++j) {
H
Haojun Liao 已提交
139 140
      SSDataBlock *pRes = taosArrayGetP(pResList, j);
      ASSERT(pRes->info.rows > 0);
H
Haojun Liao 已提交
141

H
Haojun Liao 已提交
142 143 144 145
      SInputData inputData = {.pData = pRes};
      code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
      if (code) {
        QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code));
146
        QW_ERR_JRET(code);
H
Haojun Liao 已提交
147
      }
148

H
Haojun Liao 已提交
149
      QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue);
D
dapan1121 已提交
150
    }
D
dapan1121 已提交
151

D
dapan1121 已提交
152
    if (!qcontinue) {
153 154 155
      if (queryStop) {
        *queryStop = true;
      }
156

D
dapan1121 已提交
157 158 159
      break;
    }

D
dapan1121 已提交
160
    if (ctx->needFetch && (!ctx->queryRsped) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) {
D
dapan1121 已提交
161 162 163
      break;
    }

D
dapan1121 已提交
164
    if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
165 166
      break;
    }
D
dapan1121 已提交
167

D
dapan1121 已提交
168 169 170
    if (atomic_load_32(&ctx->rspCode)) {
      break;
    }
D
dapan1121 已提交
171 172
  }

173
_return:
H
Haojun Liao 已提交
174
  taosArrayDestroyEx(pResList, freeBlock);
D
dapan1121 已提交
175
  QW_RET(code);
D
dapan1121 已提交
176
}
D
dapan1121 已提交
177

D
dapan1121 已提交
178
int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
D
dapan1121 已提交
179 180
  int32_t taskNum = 0;

D
dapan1121 已提交
181
  hbInfo->connInfo = sch->hbConnInfo;
D
dapan1121 已提交
182
  hbInfo->rsp.epId = sch->hbEpId;
D
dapan1121 已提交
183

D
dapan1121 已提交
184
  QW_LOCK(QW_READ, &sch->tasksLock);
dengyihao's avatar
dengyihao 已提交
185

D
dapan1121 已提交
186
  taskNum = taosHashGetSize(sch->tasksHash);
D
dapan1121 已提交
187 188 189

  hbInfo->rsp.taskStatus = taosArrayInit(taskNum, sizeof(STaskStatus));
  if (NULL == hbInfo->rsp.taskStatus) {
D
dapan1121 已提交
190
    QW_UNLOCK(QW_READ, &sch->tasksLock);
D
dapan1121 已提交
191
    QW_ELOG("taosArrayInit taskStatus failed, num:%d", taskNum);
D
dapan1121 已提交
192 193 194
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }

195
  void       *key = NULL;
dengyihao's avatar
dengyihao 已提交
196 197
  size_t      keyLen = 0;
  int32_t     i = 0;
D
dapan1121 已提交
198
  STaskStatus status = {0};
D
dapan1121 已提交
199 200 201 202

  void *pIter = taosHashIterate(sch->tasksHash, NULL);
  while (pIter) {
    SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
D
dapan1121 已提交
203
    key = taosHashGetKey(pIter, &keyLen);
D
dapan1121 已提交
204

dengyihao's avatar
dengyihao 已提交
205
    // TODO GET EXECUTOR API TO GET MORE INFO
D
dapan1121 已提交
206

D
dapan1121 已提交
207
    QW_GET_QTID(key, status.queryId, status.taskId, status.execId);
D
dapan1121 已提交
208 209
    status.status = taskStatus->status;
    status.refId = taskStatus->refId;
dengyihao's avatar
dengyihao 已提交
210

D
dapan1121 已提交
211
    taosArrayPush(hbInfo->rsp.taskStatus, &status);
dengyihao's avatar
dengyihao 已提交
212

D
dapan1121 已提交
213 214
    ++i;
    pIter = taosHashIterate(sch->tasksHash, pIter);
dengyihao's avatar
dengyihao 已提交
215
  }
D
dapan1121 已提交
216 217 218 219 220 221

  QW_UNLOCK(QW_READ, &sch->tasksLock);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
222
int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
D
dapan1121 已提交
223
  int64_t            len = 0;
D
dapan1121 已提交
224
  SRetrieveTableRsp *rsp = NULL;
dengyihao's avatar
dengyihao 已提交
225 226
  bool               queryEnd = false;
  int32_t            code = 0;
227
  SOutputData        output = {0};
D
dapan1121 已提交
228

229
  *dataLen = 0;
D
dapan1121 已提交
230

231 232
  while (true) {
    dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
D
dapan1121 已提交
233

234 235 236 237
    if (len < 0) {
      QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
      QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
    }
dengyihao's avatar
dengyihao 已提交
238

239 240 241 242 243 244 245
    if (len == 0) {
      if (queryEnd) {
        code = dsGetDataBlock(ctx->sinkHandle, &output);
        if (code) {
          QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
          QW_ERR_RET(code);
        }
H
Haojun Liao 已提交
246

247 248
        QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %d", pOutput->numOfBlocks,
                     pOutput->numOfRows);
249

250 251
        qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
        if (NULL == rsp) {
D
dapan1121 已提交
252
          QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &rsp));
253 254 255 256 257 258 259 260 261 262 263 264 265
          *pOutput = output;
        } else {
          pOutput->queryEnd = output.queryEnd;
          pOutput->bufStatus = output.bufStatus;
          pOutput->useconds = output.useconds;
        }

        break;
      }

      pOutput->bufStatus = DS_BUF_EMPTY;

      break;
D
dapan1121 已提交
266
    }
D
dapan1121 已提交
267

268 269
    // Got data from sink
    QW_TASK_DLOG("there are data in sink, dataLength:%d", len);
D
dapan1121 已提交
270

271
    *dataLen += len;
D
dapan1121 已提交
272

D
dapan1121 已提交
273
    QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &rsp));
D
dapan1121 已提交
274

275 276 277 278 279 280
    output.pData = rsp->data + *dataLen - len;
    code = dsGetDataBlock(ctx->sinkHandle, &output);
    if (code) {
      QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
      QW_ERR_RET(code);
    }
dengyihao's avatar
dengyihao 已提交
281

282 283 284 285 286 287 288 289 290 291
    pOutput->queryEnd = output.queryEnd;
    pOutput->precision = output.precision;
    pOutput->bufStatus = output.bufStatus;
    pOutput->useconds = output.useconds;
    pOutput->compressed = output.compressed;
    pOutput->numOfCols = output.numOfCols;
    pOutput->numOfRows += output.numOfRows;
    pOutput->numOfBlocks++;

    if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
292 293
      QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %d", pOutput->numOfBlocks,
                   pOutput->numOfRows);
294 295 296
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
      break;
    }
dengyihao's avatar
dengyihao 已提交
297

298 299 300 301
    if (0 == ctx->level) {
      QW_TASK_DLOG("task fetched blocks %d rows %d, level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level);
      break;
    }
D
dapan1121 已提交
302

303 304 305 306
    if (pOutput->numOfRows >= QW_MIN_RES_ROWS) {
      QW_TASK_DLOG("task fetched blocks %d rows %d reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows);
      break;
    }
D
dapan1121 已提交
307 308
  }

309 310
  *rspMsg = rsp;

D
dapan1121 已提交
311
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
312 313
}

314
int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes) {
315 316 317 318
  int64_t     len = 0;
  bool        queryEnd = false;
  int32_t     code = 0;
  SOutputData output = {0};
D
dapan1121 已提交
319 320 321

  dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);

D
dapan1121 已提交
322
  if (len <= 0 || len != sizeof(SDeleterRes)) {
D
dapan1121 已提交
323
    QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len);
D
dapan1121 已提交
324 325 326 327 328 329 330
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  output.pData = taosMemoryCalloc(1, len);
  if (NULL == output.pData) {
    QW_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
331

D
dapan1121 已提交
332 333 334 335 336 337 338
  code = dsGetDataBlock(ctx->sinkHandle, &output);
  if (code) {
    QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
    taosMemoryFree(output.pData);
    QW_ERR_RET(code);
  }

339 340
  SDeleterRes *pDelRes = (SDeleterRes *)output.pData;

D
dapan1121 已提交
341
  pRes->suid = pDelRes->suid;
D
dapan1121 已提交
342 343 344
  pRes->uidList = pDelRes->uidList;
  pRes->skey = pDelRes->skey;
  pRes->ekey = pDelRes->ekey;
345
  pRes->affectedRows = pDelRes->affectedRows;
346 347
  strcpy(pRes->tableFName, pDelRes->tableName);
  strcpy(pRes->tsColName, pDelRes->tsColName);
D
dapan1121 已提交
348
  taosMemoryFree(output.pData);
349

D
dapan1121 已提交
350 351 352
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
353
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
354 355
  int32_t     code = 0;
  SQWTaskCtx *ctx = NULL;
D
dapan1121 已提交
356

D
dapan1121 已提交
357
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
358

D
dapan1121 已提交
359
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
360

D
dapan1121 已提交
361
  QW_LOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
362

D
dapan1121 已提交
363
  QW_SET_PHASE(ctx, phase);
D
dapan1121 已提交
364

dengyihao's avatar
dengyihao 已提交
365
  if (atomic_load_8((int8_t *)&ctx->queryEnd)) {
D
dapan1121 已提交
366 367 368
    QW_TASK_ELOG_E("query already end");
    QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
  }
D
dapan1121 已提交
369

D
dapan1121 已提交
370 371
  switch (phase) {
    case QW_PHASE_PRE_QUERY: {
D
dapan1121 已提交
372
      if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
H
Haojun Liao 已提交
373
        QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
374
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
D
dapan1121 已提交
375 376
        break;
      }
D
dapan1121 已提交
377

D
dapan1121 已提交
378
      if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
379
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
380

381 382
        // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
383

D
dapan1121 已提交
384
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan 已提交
385
        break;
D
dapan1121 已提交
386
      }
D
dapan1121 已提交
387

D
dapan1121 已提交
388
      QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC));
D
dapan1121 已提交
389 390
      break;
    }
D
dapan1121 已提交
391
    case QW_PHASE_PRE_FETCH: {
D
dapan1121 已提交
392
      if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
393
        QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
394 395
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
      }
D
dapan1121 已提交
396

D
dapan1121 已提交
397
      if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
398
        QW_TASK_WLOG("last fetch still not processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
399 400 401
        QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
      }

D
dapan1121 已提交
402
      if (!ctx->queryRsped) {
D
dapan1121 已提交
403
        QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
404
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
D
dapan1121 已提交
405 406
      }
      break;
dengyihao's avatar
dengyihao 已提交
407
    }
D
dapan1121 已提交
408
    case QW_PHASE_PRE_CQUERY: {
D
dapan1121 已提交
409
      if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
410
        QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
411
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
412
      }
D
dapan1121 已提交
413

D
dapan1121 已提交
414
      if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
415
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
H
Haojun Liao 已提交
416

417 418
        // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
419

D
dapan1121 已提交
420
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
421
      }
D
dapan1121 已提交
422

D
dapan1121 已提交
423
      break;
D
dapan1121 已提交
424 425 426 427 428 429 430
    }
    default:
      QW_TASK_ELOG("invalid phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

  if (ctx->rspCode) {
431
    QW_TASK_ELOG("task already failed at phase %s, code:%s", qwPhaseStr(phase), tstrerror(ctx->rspCode));
D
dapan1121 已提交
432
    QW_ERR_JRET(ctx->rspCode);
D
dapan1121 已提交
433
  }
D
dapan1121 已提交
434

D
dapan1121 已提交
435
_return:
D
dapan1121 已提交
436

D
dapan1121 已提交
437
  if (ctx) {
D
dapan1121 已提交
438
    QW_UPDATE_RSP_CODE(ctx, code);
dengyihao's avatar
dengyihao 已提交
439

D
dapan1121 已提交
440
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
441 442
    qwReleaseTaskCtx(mgmt, ctx);
  }
D
dapan1121 已提交
443

444 445 446 447 448
  if (code != TSDB_CODE_SUCCESS) {
    QW_TASK_ELOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
  } else {
    QW_TASK_DLOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
  }
D
dapan1121 已提交
449 450 451 452 453

  QW_RET(code);
}

int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
454 455 456
  int32_t        code = 0;
  SQWTaskCtx    *ctx = NULL;
  SRpcHandleInfo connInfo = {0};
D
dapan1121 已提交
457

D
dapan1121 已提交
458
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
dengyihao's avatar
dengyihao 已提交
459

D
dapan1121 已提交
460
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
461

D
dapan1121 已提交
462 463
  QW_LOCK(QW_WRITE, &ctx->lock);

D
dapan1121 已提交
464
  if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
465
    QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
466 467 468
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
  }

D
dapan1121 已提交
469
  if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
470 471 472 473
    if (QW_PHASE_POST_FETCH == phase) {
      QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
D
dapan1121 已提交
474

475 476
    // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
    // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
477

D
dapan1121 已提交
478 479
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
480 481 482
  }

  if (ctx->rspCode) {
dengyihao's avatar
dengyihao 已提交
483 484
    QW_TASK_ELOG("task already failed, phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode,
                 tstrerror(ctx->rspCode));
D
dapan1121 已提交
485
    QW_ERR_JRET(ctx->rspCode);
dengyihao's avatar
dengyihao 已提交
486
  }
D
dapan1121 已提交
487

D
dapan1121 已提交
488
  QW_ERR_JRET(input->code);
D
dapan1121 已提交
489 490 491

_return:

D
dapan1121 已提交
492
  if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
D
dapan1121 已提交
493
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC);
D
dapan1121 已提交
494 495
  }

D
dapan1121 已提交
496
  if (QW_PHASE_POST_QUERY == phase && ctx) {
D
dapan1121 已提交
497 498 499 500 501 502 503 504 505
    if (!ctx->localExec) {
      bool   rsped = false;
      SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo};
      qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
      qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
      if (!rsped) {
        qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx);
        QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
      }
506
    }
D
dapan1121 已提交
507 508
    
    ctx->queryRsped = true;
509 510
  }

D
dapan1121 已提交
511
  if (ctx) {
D
dapan1121 已提交
512
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
513

D
dapan1121 已提交
514
    QW_SET_PHASE(ctx, phase);
dengyihao's avatar
dengyihao 已提交
515

D
dapan1121 已提交
516
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
517
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
518 519
  }

D
dapan1121 已提交
520
  if (code) {
D
dapan1121 已提交
521
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL);
D
dapan1121 已提交
522 523
  }

D
dapan1121 已提交
524
  QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
D
dapan1121 已提交
525

D
dapan1121 已提交
526 527 528
  QW_RET(code);
}

D
dapan1121 已提交
529 530 531 532 533 534
int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) {
  QW_ERR_RET(qwDropTask(QW_FPARAMS()));

  QW_RET(TSDB_CODE_SUCCESS);
}

D
dapan1121 已提交
535
int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
536 537
  int32_t        code = 0;
  SQWTaskCtx    *ctx = NULL;
D
dapan1121 已提交
538

D
dapan1121 已提交
539
  QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
D
dapan1121 已提交
540

D
dapan1121 已提交
541 542 543
  QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS()));

  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
544

D
dapan1121 已提交
545 546
  ctx->ctrlConnInfo = qwMsg->connInfo;

D
dapan1121 已提交
547
  QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
D
dapan1121 已提交
548 549 550 551 552 553 554 555 556 557 558

_return:

  if (ctx) {
    QW_UPDATE_RSP_CODE(ctx, code);
    qwReleaseTaskCtx(mgmt, ctx);
  }

  QW_RET(TSDB_CODE_SUCCESS);
}

559
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
D
dapan1121 已提交
560 561 562 563 564 565 566 567
  int32_t        code = 0;
  bool           queryRsped = false;
  SSubplan      *plan = NULL;
  SQWPhaseInput  input = {0};
  qTaskInfo_t    pTaskInfo = NULL;
  DataSinkHandle sinkHandle = NULL;
  SQWTaskCtx    *ctx = NULL;

D
dapan1121 已提交
568
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
D
dapan1121 已提交
569 570

  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
571

D
dapan1121 已提交
572 573 574
  ctx->taskType = qwMsg->msgInfo.taskType;
  ctx->explain = qwMsg->msgInfo.explain;
  ctx->needFetch = qwMsg->msgInfo.needFetch;
D
dapan1121 已提交
575
  ctx->msgType = qwMsg->msgType;
D
dapan1121 已提交
576
  ctx->localExec = false;
X
Xiaoyu Wang 已提交
577

578
  // QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
X
Xiaoyu Wang 已提交
579

580
  code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan);
D
dapan1121 已提交
581
  if (TSDB_CODE_SUCCESS != code) {
582 583
    code = TSDB_CODE_INVALID_MSG;
    QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
584
    QW_ERR_JRET(code);
D
dapan1121 已提交
585
  }
dengyihao's avatar
dengyihao 已提交
586

587
  code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
588
  sql = NULL;
D
dapan1121 已提交
589
  if (code) {
D
dapan1121 已提交
590
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
591
    QW_ERR_JRET(code);
D
dapan1121 已提交
592
  }
D
dapan1121 已提交
593

H
Haojun Liao 已提交
594
  if (NULL == sinkHandle || NULL == pTaskInfo) {
D
dapan1121 已提交
595 596 597 598
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

599
  ctx->level = plan->level;
D
dapan1121 已提交
600 601 602
  atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
  atomic_store_ptr(&ctx->sinkHandle, sinkHandle);

D
dapan1121 已提交
603 604
  qwSaveTbVersionInfo(pTaskInfo, ctx);
  QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
dengyihao's avatar
dengyihao 已提交
605

D
dapan1121 已提交
606 607
_return:

D
dapan1121 已提交
608
  taosMemoryFree(sql);
609

D
dapan1121 已提交
610
  input.code = code;
D
dapan1121 已提交
611
  input.msgType = qwMsg->msgType;
D
dapan1121 已提交
612
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
613

D
dapan1121 已提交
614
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
615 616
}

D
dapan1121 已提交
617
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
618
  SQWTaskCtx   *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
619
  int32_t       code = 0;
620
  SQWPhaseInput input = {0};
621
  void         *rsp = NULL;
dengyihao's avatar
dengyihao 已提交
622
  int32_t       dataLen = 0;
623
  bool          queryStop = false;
dengyihao's avatar
dengyihao 已提交
624

D
dapan1121 已提交
625
  do {
D
dapan1121 已提交
626
    QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
D
dapan1121 已提交
627

D
dapan1121 已提交
628
    QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
629

dengyihao's avatar
dengyihao 已提交
630 631
    atomic_store_8((int8_t *)&ctx->queryInQueue, 0);
    atomic_store_8((int8_t *)&ctx->queryContinue, 0);
D
dapan1121 已提交
632

633
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop));
D
dapan1121 已提交
634

D
dapan1121 已提交
635
    if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
636
      SOutputData sOutput = {0};
D
dapan1121 已提交
637
      QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
dengyihao's avatar
dengyihao 已提交
638 639

      if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
640
        QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
dengyihao's avatar
dengyihao 已提交
641 642

        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
643
      }
dengyihao's avatar
dengyihao 已提交
644

D
dapan1121 已提交
645
      if (rsp) {
D
dapan1121 已提交
646
        bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
647

D
dapan1121 已提交
648
        qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
649
        if (qComplete) {
dengyihao's avatar
dengyihao 已提交
650
          atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
651
        }
H
Haojun Liao 已提交
652

D
dapan1121 已提交
653
        qwMsg->connInfo = ctx->dataConnInfo;
dengyihao's avatar
dengyihao 已提交
654 655
        QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);

D
dapan1121 已提交
656
        qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code);
D
dapan1121 已提交
657
        rsp = NULL;
658

dengyihao's avatar
dengyihao 已提交
659 660
        QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
                     tstrerror(code), dataLen);
D
dapan1121 已提交
661
      } else {
dengyihao's avatar
dengyihao 已提交
662
        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
663 664 665
      }
    }

dengyihao's avatar
dengyihao 已提交
666
  _return:
667

D
dapan1121 已提交
668 669 670 671
    if (NULL == ctx) {
      break;
    }

D
dapan1121 已提交
672
    if (code && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
dengyihao's avatar
dengyihao 已提交
673
      QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
674 675
      qwFreeFetchRsp(rsp);
      rsp = NULL;
dengyihao's avatar
dengyihao 已提交
676

D
dapan1121 已提交
677
      qwMsg->connInfo = ctx->dataConnInfo;
D
dapan1121 已提交
678
      qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, NULL, 0, code);
dengyihao's avatar
dengyihao 已提交
679 680
      QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
                   0);
681
    }
D
dapan1121 已提交
682

D
dapan1121 已提交
683
    QW_LOCK(QW_WRITE, &ctx->lock);
684
    if (queryStop || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
D
dapan1121 已提交
685 686
      // Note: query is not running anymore
      QW_SET_PHASE(ctx, 0);
dengyihao's avatar
dengyihao 已提交
687
      QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
688 689
      break;
    }
dengyihao's avatar
dengyihao 已提交
690
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
691
  } while (true);
D
dapan1121 已提交
692

D
dapan1121 已提交
693
  input.code = code;
dengyihao's avatar
dengyihao 已提交
694
  qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL);
D
dapan1121 已提交
695

dengyihao's avatar
dengyihao 已提交
696
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
697
}
D
dapan1121 已提交
698

D
dapan1121 已提交
699
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
700 701 702
  int32_t       code = 0;
  int32_t       dataLen = 0;
  bool          locked = false;
703 704
  SQWTaskCtx   *ctx = NULL;
  void         *rsp = NULL;
D
dapan1121 已提交
705
  SQWPhaseInput input = {0};
D
dapan1121 已提交
706

D
dapan1121 已提交
707
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
708

709
  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
710

D
dapan1121 已提交
711
  ctx->msgType = qwMsg->msgType;
D
dapan1121 已提交
712
  ctx->dataConnInfo = qwMsg->connInfo;
D
dapan1121 已提交
713

D
dapan 已提交
714
  SOutputData sOutput = {0};
D
dapan1121 已提交
715
  QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
D
dapan1121 已提交
716

717 718
  if (NULL == rsp) {
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
D
dapan1121 已提交
719
  } else {
D
dapan1121 已提交
720
    bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
721

D
dapan1121 已提交
722
    qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
723
    if (qComplete) {
dengyihao's avatar
dengyihao 已提交
724
      atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
725
    }
D
dapan1121 已提交
726 727
  }

dengyihao's avatar
dengyihao 已提交
728
  if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
729
    QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
D
dapan1121 已提交
730

D
dapan1121 已提交
731 732
    QW_LOCK(QW_WRITE, &ctx->lock);
    locked = true;
733

D
dapan1121 已提交
734
    // RC WARNING
D
dapan1121 已提交
735
    if (QW_QUERY_RUNNING(ctx)) {
dengyihao's avatar
dengyihao 已提交
736 737
      atomic_store_8((int8_t *)&ctx->queryContinue, 1);
    } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
D
dapan1121 已提交
738
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC);
D
dapan1121 已提交
739

dengyihao's avatar
dengyihao 已提交
740 741
      atomic_store_8((int8_t *)&ctx->queryInQueue, 1);

D
dapan1121 已提交
742
      QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
743
    }
D
dapan 已提交
744
  }
dengyihao's avatar
dengyihao 已提交
745

D
dapan1121 已提交
746
_return:
D
dapan1121 已提交
747

D
dapan1121 已提交
748 749 750 751 752
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

  input.code = code;
D
dapan1121 已提交
753
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL);
D
dapan1121 已提交
754

D
dapan 已提交
755 756 757
  if (code) {
    qwFreeFetchRsp(rsp);
    rsp = NULL;
D
dapan1121 已提交
758
    dataLen = 0;
D
dapan1121 已提交
759 760 761
  }

  if (code || rsp) {
D
dapan1121 已提交
762
    bool rsped = false;
D
dapan1121 已提交
763
    if (ctx) {
764
      qwDbgSimulateRedirect(qwMsg, ctx, &rsped);
D
dapan1121 已提交
765 766
      qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
    }
D
dapan1121 已提交
767 768
    if (!rsped) {
      qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
769 770
      QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1),
                   qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
D
dapan1121 已提交
771
    }
D
dapan1121 已提交
772 773
  }

D
dapan1121 已提交
774
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
775
}
D
dapan1121 已提交
776

D
dapan1121 已提交
777
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
778
  int32_t     code = 0;
D
dapan1121 已提交
779
  bool        dropped = false;
D
dapan1121 已提交
780
  SQWTaskCtx *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
781
  bool        locked = false;
D
dapan1121 已提交
782

D
dapan1121 已提交
783
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
784

D
dapan1121 已提交
785 786 787 788
  QW_LOCK(QW_WRITE, &ctx->lock);

  locked = true;

D
dapan1121 已提交
789
  if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
790
    QW_TASK_WLOG_E("task already dropping");
D
dapan1121 已提交
791 792 793
    QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
  }

D
dapan1121 已提交
794
  if (QW_QUERY_RUNNING(ctx)) {
D
dapan1121 已提交
795
    QW_ERR_JRET(qwKillTaskHandle(ctx));
D
dapan1121 已提交
796
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP);
D
dapan1121 已提交
797
  } else {
D
dapan1121 已提交
798
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
799
    dropped = true;
D
dapan1121 已提交
800
  }
D
dapan1121 已提交
801

D
dapan1121 已提交
802
  if (!dropped) {
D
dapan1121 已提交
803 804
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
  }
805

D
dapan1121 已提交
806
_return:
D
dapan1121 已提交
807

D
dapan1121 已提交
808
  if (code) {
D
dapan1121 已提交
809 810 811
    if (ctx) {
      QW_UPDATE_RSP_CODE(ctx, code);
    }
H
Haojun Liao 已提交
812

D
dapan1121 已提交
813
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL);
D
dapan1121 已提交
814 815
  }

D
dapan 已提交
816 817 818 819
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

D
dapan1121 已提交
820
  if (ctx) {
D
dapan1121 已提交
821
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
822 823
  }

D
dapan1121 已提交
824 825 826
  QW_RET(TSDB_CODE_SUCCESS);
}

D
dapan1121 已提交
827
int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
dengyihao's avatar
dengyihao 已提交
828
  int32_t         code = 0;
D
dapan1121 已提交
829
  SSchedulerHbRsp rsp = {0};
830
  SQWSchStatus   *sch = NULL;
D
dapan1121 已提交
831

D
dapan1121 已提交
832 833 834
  if (qwMsg->code) {
    QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req));
  }
D
dapan1121 已提交
835 836

  QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
D
dapan1121 已提交
837 838
  QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo));

D
dapan1121 已提交
839 840
  sch->hbBrokenTs = 0;

D
dapan1121 已提交
841
  QW_LOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
842

D
dapan1121 已提交
843
  if (sch->hbConnInfo.handle) {
844
    tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
S
Shengliang Guan 已提交
845
    sch->hbConnInfo.handle = NULL;
D
dapan1121 已提交
846
  }
D
dapan1121 已提交
847

D
dapan1121 已提交
848
  memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
D
dapan1121 已提交
849
  memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId));
dengyihao's avatar
dengyihao 已提交
850

D
dapan1121 已提交
851
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
dengyihao's avatar
dengyihao 已提交
852 853 854

  QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", req->sId,
          req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle);
D
dapan1121 已提交
855

D
dapan1121 已提交
856 857 858 859
  qwReleaseScheduler(QW_READ, mgmt);

_return:

D
dapan1121 已提交
860 861
  memcpy(&rsp.epId, &req->epId, sizeof(req->epId));

S
shm  
Shengliang Guan 已提交
862
  qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
D
dapan1121 已提交
863 864

  if (code) {
865
    tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER);
S
Shengliang Guan 已提交
866
    qwMsg->connInfo.handle = NULL;
D
dapan1121 已提交
867
  }
dengyihao's avatar
dengyihao 已提交
868

869
  /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));*/
dengyihao's avatar
dengyihao 已提交
870

D
dapan1121 已提交
871
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
872 873 874
}

void qwProcessHbTimerEvent(void *param, void *tmrId) {
875
  SQWHbParam *hbParam = (SQWHbParam *)param;
D
dapan1121 已提交
876 877 878 879
  if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) {
    return;
  }

880
  int64_t   refId = hbParam->refId;
D
dapan1121 已提交
881 882 883 884 885
  SQWorker *mgmt = qwAcquire(refId);
  if (NULL == mgmt) {
    QW_DLOG("qwAcquire %" PRIx64 "failed", refId);
    return;
  }
886

D
dapan1121 已提交
887
  SQWSchStatus *sch = NULL;
dengyihao's avatar
dengyihao 已提交
888
  int32_t       taskNum = 0;
889
  SQWHbInfo    *rspList = NULL;
D
dapan1121 已提交
890
  SArray       *pExpiredSch = NULL;
dengyihao's avatar
dengyihao 已提交
891
  int32_t       code = 0;
D
dapan1121 已提交
892

D
dapan1121 已提交
893 894
  qwDbgDumpMgmtInfo(mgmt);

D
dapan1121 已提交
895 896 897 898 899
  QW_LOCK(QW_READ, &mgmt->schLock);

  int32_t schNum = taosHashGetSize(mgmt->schHash);
  if (schNum <= 0) {
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
900
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
901
    qwRelease(refId);
D
dapan1121 已提交
902
    return;
D
dapan1121 已提交
903 904
  }

wafwerar's avatar
wafwerar 已提交
905
  rspList = taosMemoryCalloc(schNum, sizeof(SQWHbInfo));
D
dapan1121 已提交
906 907
  pExpiredSch = taosArrayInit(schNum, sizeof(uint64_t));
  if (NULL == rspList || NULL == pExpiredSch) {
D
dapan1121 已提交
908
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
909 910
    taosMemoryFree(rspList);
    taosArrayDestroy(pExpiredSch);
D
dapan1121 已提交
911 912
    QW_ELOG("calloc %d SQWHbInfo failed", schNum);
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
913
    qwRelease(refId);
D
dapan1121 已提交
914
    return;
D
dapan1121 已提交
915 916
  }

917
  void   *key = NULL;
dengyihao's avatar
dengyihao 已提交
918
  size_t  keyLen = 0;
D
dapan1121 已提交
919
  int32_t i = 0;
D
dapan1121 已提交
920
  int64_t currentMs = taosGetTimestampMs();
D
dapan1121 已提交
921 922 923

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
H
Haojun Liao 已提交
924 925
    SQWSchStatus *sch1 = (SQWSchStatus *)pIter;
    if (NULL == sch1->hbConnInfo.handle) {
D
dapan1121 已提交
926
      uint64_t *sId = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
927
      QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
D
dapan1121 已提交
928

H
Haojun Liao 已提交
929 930
      if (sch1->hbBrokenTs > 0 && ((currentMs - sch1->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) &&
          taosHashGetSize(sch1->tasksHash) <= 0) {
D
dapan1121 已提交
931 932
        taosArrayPush(pExpiredSch, sId);
      }
933

D
dapan1121 已提交
934 935 936
      pIter = taosHashIterate(mgmt->schHash, pIter);
      continue;
    }
dengyihao's avatar
dengyihao 已提交
937

D
dapan1121 已提交
938 939 940 941 942 943 944 945 946 947 948 949 950 951 952
    code = qwGenerateSchHbRsp(mgmt, (SQWSchStatus *)pIter, &rspList[i]);
    if (code) {
      taosHashCancelIterate(mgmt->schHash, pIter);
      QW_ERR_JRET(code);
    }

    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

_return:

  QW_UNLOCK(QW_READ, &mgmt->schLock);

  for (int32_t j = 0; j < i; ++j) {
S
shm  
Shengliang Guan 已提交
953
    qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code);
954 955
    /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),*/
    /*(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));*/
D
dapan1121 已提交
956
    tFreeSSchedulerHbRsp(&rspList[j].rsp);
D
dapan1121 已提交
957 958
  }

D
dapan1121 已提交
959
  if (taosArrayGetSize(pExpiredSch) > 0) {
D
dapan1121 已提交
960
    qwClearExpiredSch(mgmt, pExpiredSch);
D
dapan1121 已提交
961 962
  }

wafwerar's avatar
wafwerar 已提交
963
  taosMemoryFreeClear(rspList);
D
dapan1121 已提交
964
  taosArrayDestroy(pExpiredSch);
D
dapan1121 已提交
965

dengyihao's avatar
dengyihao 已提交
966
  taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
967
  qwRelease(refId);
D
dapan1121 已提交
968 969
}

970
int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
D
dapan1121 已提交
971 972 973 974 975 976
  int32_t        code = 0;
  SSubplan      *plan = NULL;
  qTaskInfo_t    pTaskInfo = NULL;
  DataSinkHandle sinkHandle = NULL;
  SQWTaskCtx     ctx = {0};

977
  code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan);
D
dapan1121 已提交
978 979 980 981 982 983
  if (TSDB_CODE_SUCCESS != code) {
    code = TSDB_CODE_INVALID_MSG;
    QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
    QW_ERR_JRET(code);
  }

984
  code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
985 986 987 988 989 990 991 992 993 994
  if (code) {
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
    QW_ERR_JRET(code);
  }

  if (NULL == sinkHandle || NULL == pTaskInfo) {
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

D
dapan1121 已提交
995 996
  ctx.taskHandle = pTaskInfo;
  ctx.sinkHandle = sinkHandle;
D
dapan1121 已提交
997

D
dapan1121 已提交
998
  QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx, NULL));
D
dapan1121 已提交
999

1000
  QW_ERR_JRET(qwGetDeleteResFromSink(QW_FPARAMS(), &ctx, pRes));
D
dapan1121 已提交
1001 1002 1003

_return:

D
dapan1121 已提交
1004
  qwFreeTaskCtx(&ctx);
D
dapan1121 已提交
1005 1006 1007 1008

  QW_RET(TSDB_CODE_SUCCESS);
}

D
dapan1121 已提交
1009 1010
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
  if (NULL == qWorkerMgmt || (pMsgCb && pMsgCb->mgmt == NULL)) {
D
dapan1121 已提交
1011 1012 1013
    qError("invalid param to init qworker");
    QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
S
Shengliang 已提交
1014

D
dapan1121 已提交
1015 1016 1017 1018
  int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1);
  if (1 == qwNum) {
    memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param));
  }
D
dapan1121 已提交
1019 1020 1021 1022 1023 1024 1025 1026

  int32_t code = qwOpenRef();
  if (code) {
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
    QW_RET(code);
  }

  SQWorker *mgmt = taosMemoryCalloc(1, sizeof(SQWorker));
D
dapan1121 已提交
1027
  if (NULL == mgmt) {
D
dapan1121 已提交
1028 1029
    qError("calloc %d failed", (int32_t)sizeof(SQWorker));
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
D
dapan1121 已提交
1030
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1031 1032
  }

D
dapan1121 已提交
1033 1034 1035
  mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
  mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
  mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
1036

dengyihao's avatar
dengyihao 已提交
1037 1038
  mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                               HASH_ENTRY_LOCK);
D
dapan1121 已提交
1039
  if (NULL == mgmt->schHash) {
wafwerar's avatar
wafwerar 已提交
1040
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1041
    qError("init %d scheduler hash failed", mgmt->cfg.maxSchedulerNum);
D
dapan1121 已提交
1042
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1043 1044
  }

dengyihao's avatar
dengyihao 已提交
1045 1046
  mgmt->ctxHash =
      taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1047
  if (NULL == mgmt->ctxHash) {
D
dapan1121 已提交
1048
    qError("init %d task ctx hash failed", mgmt->cfg.maxTaskNum);
D
dapan1121 已提交
1049 1050 1051 1052 1053 1054 1055 1056 1057
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  mgmt->timer = taosTmrInit(0, 0, 0, "qworker");
  if (NULL == mgmt->timer) {
    qError("init timer failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1058 1059
  mgmt->nodeType = nodeType;
  mgmt->nodeId = nodeId;
D
dapan1121 已提交
1060 1061 1062 1063 1064
  if (pMsgCb) {
    mgmt->msgCb = *pMsgCb;
  } else {
    memset(&mgmt->msgCb, 0, sizeof(mgmt->msgCb));
  }
D
dapan1121 已提交
1065

D
dapan1121 已提交
1066 1067 1068 1069 1070 1071
  mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt);
  if (mgmt->refId < 0) {
    qError("taosAddRef qw failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(terrno);
  }

D
dapan1121 已提交
1072 1073 1074
  SQWHbParam *param = NULL;
  qwSetHbParam(mgmt->refId, &param);

1075
  mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void *)param, mgmt->timer);
D
dapan1121 已提交
1076 1077 1078 1079
  if (NULL == mgmt->hbTimer) {
    qError("start hb timer failed");
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
1080

D
dapan1121 已提交
1081 1082
  *qWorkerMgmt = mgmt;

S
Shengliang Guan 已提交
1083
  qDebug("qworker initialized, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt);
D
dapan1121 已提交
1084

D
dapan1121 已提交
1085
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1086 1087 1088

_return:

D
dapan1121 已提交
1089 1090 1091 1092 1093 1094 1095
  if (mgmt->refId >= 0) {
    qwRelease(mgmt->refId);
  } else {
    taosHashCleanup(mgmt->schHash);
    taosHashCleanup(mgmt->ctxHash);
    taosTmrCleanUp(mgmt->timer);
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1096

1097
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
D
dapan1121 已提交
1098
  }
1099

D
dapan1121 已提交
1100
  QW_RET(code);
D
dapan1121 已提交
1101
}
D
dapan1121 已提交
1102 1103 1104 1105

void qWorkerDestroy(void **qWorkerMgmt) {
  if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
    return;
D
dapan1121 已提交
1106
  }
D
dapan 已提交
1107

D
dapan1121 已提交
1108
  int32_t destroyed = 0;
D
dapan1121 已提交
1109
  SQWorker *mgmt = *qWorkerMgmt;
D
dapan1121 已提交
1110 1111
  mgmt->destroyed = &destroyed;
  
D
dapan1121 已提交
1112 1113
  if (taosRemoveRef(gQwMgmt.qwRef, mgmt->refId)) {
    qError("remove qw from ref list failed, refId:%" PRIx64, mgmt->refId);
D
dapan1121 已提交
1114 1115 1116 1117 1118
    return;
  }

  while (0 == destroyed) {
    taosMsleep(2);
D
dapan1121 已提交
1119
  }
D
dapan1121 已提交
1120
}
D
dapan1121 已提交
1121

D
dapan1121 已提交
1122 1123 1124 1125 1126
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat) {
  if (NULL == handle || NULL == qWorkerMgmt || NULL == pStat) {
    QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

1127
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
1128
  SDataSinkStat sinkStat = {0};
1129

D
dapan1121 已提交
1130 1131
  dsDataSinkGetCacheSize(&sinkStat);
  pStat->cacheDataSize = sinkStat.cachedSize;
1132

D
dapan1121 已提交
1133 1134 1135 1136 1137
  pStat->queryProcessed = QW_STAT_GET(mgmt->stat.msgStat.queryProcessed);
  pStat->cqueryProcessed = QW_STAT_GET(mgmt->stat.msgStat.cqueryProcessed);
  pStat->fetchProcessed = QW_STAT_GET(mgmt->stat.msgStat.fetchProcessed);
  pStat->dropProcessed = QW_STAT_GET(mgmt->stat.msgStat.dropProcessed);
  pStat->hbProcessed = QW_STAT_GET(mgmt->stat.msgStat.hbProcessed);
D
dapan1121 已提交
1138
  pStat->deleteProcessed = QW_STAT_GET(mgmt->stat.msgStat.deleteProcessed);
D
dapan1121 已提交
1139 1140 1141 1142 1143 1144 1145

  pStat->numOfQueryInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, QUERY_QUEUE);
  pStat->numOfFetchInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, FETCH_QUEUE);
  pStat->timeInQueryQueue = qwGetTimeInQueue((SQWorker *)qWorkerMgmt, QUERY_QUEUE);
  pStat->timeInFetchQueue = qwGetTimeInQueue((SQWorker *)qWorkerMgmt, FETCH_QUEUE);

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1146
}
D
dapan1121 已提交
1147

D
dapan1121 已提交
1148 1149
int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg, SArray *explainRes) {
  SQWorker      *mgmt = (SQWorker*)pMgmt;
D
dapan1121 已提交
1150 1151 1152 1153 1154 1155
  int32_t        code = 0;
  SQWTaskCtx    *ctx = NULL;
  SSubplan      *plan = (SSubplan *)qwMsg->msg;
  SQWPhaseInput  input = {0};
  qTaskInfo_t    pTaskInfo = NULL;
  DataSinkHandle sinkHandle = NULL;
D
dapan1121 已提交
1156
  SReadHandle rHandle = {0};
D
dapan1121 已提交
1157 1158

  QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS()));
D
dapan1121 已提交
1159
  QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
D
dapan1121 已提交
1160

D
dapan1121 已提交
1161
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
D
dapan1121 已提交
1162 1163 1164 1165 1166 1167
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));

  ctx->taskType = qwMsg->msgInfo.taskType;
  ctx->explain = qwMsg->msgInfo.explain;
  ctx->needFetch = qwMsg->msgInfo.needFetch;
  ctx->msgType = qwMsg->msgType;
D
dapan1121 已提交
1168 1169
  ctx->localExec = true;
  ctx->explainRes = explainRes;
D
dapan1121 已提交
1170

D
dapan1121 已提交
1171 1172 1173 1174
  rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb));
  rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle;
  
  code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192
  if (code) {
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
    QW_ERR_JRET(code);
  }

  if (NULL == sinkHandle || NULL == pTaskInfo) {
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

  ctx->level = plan->level;
  atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
  atomic_store_ptr(&ctx->sinkHandle, sinkHandle);

  QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));

_return:

D
dapan1121 已提交
1193 1194 1195 1196 1197 1198
  taosMemoryFree(rHandle.pMsgCb);
  
  input.code = code;
  input.msgType = qwMsg->msgType;
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);

D
dapan1121 已提交
1199 1200 1201 1202 1203 1204 1205 1206
  if (ctx) {
    QW_UPDATE_RSP_CODE(ctx, code);
    qwReleaseTaskCtx(mgmt, ctx);
  }

  QW_RET(code);
}

D
dapan1121 已提交
1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254
int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp, SArray* explainRes) {
  SQWorker      *mgmt = (SQWorker*)pMgmt;
  int32_t       code = 0;
  int32_t       dataLen = 0;
  SQWTaskCtx   *ctx = NULL;
  void         *rsp = NULL;
  bool          queryStop = false;

  SQWPhaseInput input = {0};

  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));

  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));

  ctx->msgType = TDMT_SCH_MERGE_FETCH;
  ctx->explainRes = explainRes;
  
  SOutputData sOutput = {0};
  
  while (true) {
    QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));

    if (NULL == rsp) {
      QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop));
      
      continue;
    } else {
      bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);

      qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
      if (qComplete) {
        atomic_store_8((int8_t *)&ctx->queryEnd, true);
      }

      break;
    }
  }

_return:

  *pRsp = rsp;

  input.code = code;
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL);

  QW_RET(code);
}

D
dapan1121 已提交
1255