qworker.c 40.3 KB
Newer Older
1 2
#include "qworker.h"

dengyihao's avatar
dengyihao 已提交
3
#include "dataSinkMgt.h"
4
#include "executor.h"
D
dapan1121 已提交
5
#include "planner.h"
H
Haojun Liao 已提交
6
#include "query.h"
D
dapan1121 已提交
7 8
#include "qwInt.h"
#include "qwMsg.h"
dengyihao's avatar
dengyihao 已提交
9
#include "tcommon.h"
H
Hongze Cheng 已提交
10
#include "tdatablock.h"
dengyihao's avatar
dengyihao 已提交
11
#include "tglobal.h"
H
Haojun Liao 已提交
12
#include "tmsg.h"
13
#include "tname.h"
D
dapan1121 已提交
14

D
dapan1121 已提交
15
SQWorkerMgmt gQwMgmt = {
16 17 18
    .lock = 0,
    .qwRef = -1,
    .qwNum = 0,
D
dapan1121 已提交
19
};
20

21 22 23 24 25
int32_t qwStopAllTasks(SQWorker *mgmt) {
  uint64_t qId, tId, sId;
  int32_t  eId;
  int64_t  rId = 0;

X
Xiaoyu Wang 已提交
26
  void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
  while (pIter) {
    SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
    void       *key = taosHashGetKey(pIter, NULL);
    QW_GET_QTID(key, qId, tId, eId);

    QW_LOCK(QW_WRITE, &ctx->lock);

    sId = ctx->sId;

    QW_TASK_DLOG_E("start to force stop task");

    if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
      QW_TASK_WLOG_E("task already dropping");
      QW_UNLOCK(QW_WRITE, &ctx->lock);

      pIter = taosHashIterate(mgmt->ctxHash, pIter);
      continue;
    }

    if (QW_QUERY_RUNNING(ctx)) {
      qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
      QW_TASK_DLOG_E("task running, async killed");
    } else if (QW_FETCH_RUNNING(ctx)) {
      QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
X
Xiaoyu Wang 已提交
51
      QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
52 53 54 55 56 57 58 59 60 61 62 63 64
      QW_TASK_DLOG_E("task fetching, update drop received");
    } else {
      qwDropTask(QW_FPARAMS());
    }

    QW_UNLOCK(QW_WRITE, &ctx->lock);

    pIter = taosHashIterate(mgmt->ctxHash, pIter);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
65 66 67 68
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
  int32_t         code = 0;
  SSchedulerHbRsp rsp = {0};
  SQWSchStatus   *sch = NULL;
D
dapan1121 已提交
69

D
dapan1121 已提交
70
  QW_ERR_RET(qwAcquireScheduler(mgmt, req->sId, QW_READ, &sch));
D
dapan1121 已提交
71

D
dapan1121 已提交
72
  QW_LOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
73

D
dapan1121 已提交
74
  sch->hbBrokenTs = taosGetTimestampMs();
75

D
dapan1121 已提交
76 77 78 79
  if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
    tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
    sch->hbConnInfo.handle = NULL;
    sch->hbConnInfo.ahandle = NULL;
D
dapan1121 已提交
80

D
dapan1121 已提交
81 82 83
    QW_DLOG("release hb handle due to connection broken, handle:%p", qwMsg->connInfo.handle);
  } else {
    QW_DLOG("ignore hb connection broken, handle:%p, currentHandle:%p", qwMsg->connInfo.handle, sch->hbConnInfo.handle);
D
dapan1121 已提交
84
  }
D
dapan1121 已提交
85

D
dapan1121 已提交
86
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
87

D
dapan1121 已提交
88
  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
89

D
dapan1121 已提交
90
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
91 92
}

93 94
static void freeItem(void *param) {
  SExplainExecInfo *pInfo = param;
H
Haojun Liao 已提交
95 96 97
  taosMemoryFree(pInfo->verboseInfo);
}

D
dapan1121 已提交
98
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
D
dapan1121 已提交
99
  qTaskInfo_t taskHandle = ctx->taskHandle;
D
dapan1121 已提交
100

D
dapan1121 已提交
101 102
  ctx->queryExecDone = true;

D
dapan1121 已提交
103
  if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) {
D
dapan1121 已提交
104
    if (ctx->explain) {
105
      SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo));
H
Haojun Liao 已提交
106
      QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList));
107

D
dapan1121 已提交
108
      if (ctx->localExec) {
109 110 111 112 113 114 115 116 117 118 119
        SExplainLocalRsp localRsp = {0};
        localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList);
        SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo));
        memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo));
        localRsp.rsp.subplanInfo = pExec;
        localRsp.qId = qId;
        localRsp.tId = tId;
        localRsp.rId = rId;
        localRsp.eId = eId;
        taosArrayPush(ctx->explainRes, &localRsp);
        taosArrayDestroy(execInfoList);
D
dapan1121 已提交
120 121 122 123 124 125 126
      } else {
        SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
        connInfo.ahandle = NULL;
        int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList);
        taosArrayDestroyEx(execInfoList, freeItem);
        QW_ERR_RET(code);
      }
D
dapan1121 已提交
127
    }
D
dapan1121 已提交
128 129 130 131

    if (!ctx->needFetch) {
      dsGetDataLength(ctx->sinkHandle, &ctx->affectedRows, NULL);
    }
D
dapan1121 已提交
132 133 134 135 136
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
137 138
int32_t qwSendQueryRsp(QW_FPARAMS_DEF, int32_t msgType, SQWTaskCtx *ctx, int32_t rspCode, bool quickRsp) {
  if ((!quickRsp) || QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy) {
139 140
    if (!ctx->localExec) {
      qwBuildAndSendQueryRsp(msgType, &ctx->ctrlConnInfo, rspCode, ctx);
D
dapan1121 已提交
141
      QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, rspCode, tstrerror(rspCode));
142 143
    }

D
dapan1121 已提交
144 145 146 147 148 149
    ctx->queryRsped = true;
  }

  return TSDB_CODE_SUCCESS;
}

150
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
dengyihao's avatar
dengyihao 已提交
151 152 153 154 155
  int32_t        code = 0;
  bool           qcontinue = true;
  uint64_t       useconds = 0;
  int32_t        i = 0;
  int32_t        execNum = 0;
D
dapan1121 已提交
156
  qTaskInfo_t    taskHandle = ctx->taskHandle;
D
dapan1121 已提交
157
  DataSinkHandle sinkHandle = ctx->sinkHandle;
H
Hongze Cheng 已提交
158
  SLocalFetch    localFetch = {(void *)mgmt, ctx->localExec, qWorkerProcessLocalFetch, ctx->explainRes};
dengyihao's avatar
dengyihao 已提交
159

D
dapan1121 已提交
160 161 162 163
  if (ctx->queryExecDone) {
    if (queryStop) {
      *queryStop = true;
    }
dengyihao's avatar
dengyihao 已提交
164

D
dapan1121 已提交
165 166 167
    return TSDB_CODE_SUCCESS;
  }

168
  SArray *pResList = taosArrayInit(4, POINTER_BYTES);
D
dapan1121 已提交
169
  while (true) {
H
Haojun Liao 已提交
170
    QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
D
dapan1121 已提交
171

D
dapan1121 已提交
172
    // if *taskHandle is NULL, it's killed right now
173 174
    bool hasMore = false;

D
dapan1121 已提交
175
    if (taskHandle) {
D
dapan1121 已提交
176
      qwDbgSimulateSleep();
177 178

      code = qExecTaskOpt(taskHandle, pResList, &useconds, &hasMore, &localFetch);
D
dapan1121 已提交
179
      if (code) {
180 181 182 183 184
        if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
          QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
        } else {
          QW_TASK_DLOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
        }
185
        QW_ERR_JRET(code);
D
dapan1121 已提交
186
      }
D
dapan1121 已提交
187 188
    }

D
dapan1121 已提交
189 190
    ++execNum;

191 192
    size_t numOfResBlock = taosArrayGetSize(pResList);
    for (int32_t j = 0; j < numOfResBlock; ++j) {
H
Haojun Liao 已提交
193
      SSDataBlock *pRes = taosArrayGetP(pResList, j);
H
Haojun Liao 已提交
194

H
Haojun Liao 已提交
195 196 197 198
      SInputData inputData = {.pData = pRes};
      code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
      if (code) {
        QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code));
199
        QW_ERR_JRET(code);
H
Haojun Liao 已提交
200
      }
201

D
dapan1121 已提交
202
      QW_TASK_DLOG("data put into sink, rows:%" PRId64 ", continueExecTask:%d", pRes->info.rows, qcontinue);
D
dapan1121 已提交
203
    }
D
dapan1121 已提交
204

205 206 207
    if (numOfResBlock == 0 || (hasMore == false)) {
      if (numOfResBlock == 0) {
        QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
H
Haojun Liao 已提交
208 209
      } else {
        QW_TASK_DLOG("qExecTask done, useconds:%" PRIu64, useconds);
210 211 212 213 214 215 216 217 218 219 220 221
      }

      dsEndPut(sinkHandle, useconds);
      QW_ERR_JRET(qwHandleTaskComplete(QW_FPARAMS(), ctx));

      if (queryStop) {
        *queryStop = true;
      }

      break;
    }

D
dapan1121 已提交
222
    if (!qcontinue) {
223 224 225
      if (queryStop) {
        *queryStop = true;
      }
226

D
dapan1121 已提交
227 228 229
      break;
    }

D
dapan1121 已提交
230
    if (ctx->needFetch && (!ctx->queryRsped) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) {
D
dapan1121 已提交
231 232 233
      break;
    }

D
dapan1121 已提交
234
    if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
235 236
      break;
    }
D
dapan1121 已提交
237

D
dapan1121 已提交
238 239 240
    if (atomic_load_32(&ctx->rspCode)) {
      break;
    }
D
dapan1121 已提交
241 242
  }

243
_return:
H
Haojun Liao 已提交
244
  taosArrayDestroy(pResList);
D
dapan1121 已提交
245
  QW_RET(code);
D
dapan1121 已提交
246
}
D
dapan1121 已提交
247

248
bool qwTaskNotInExec(SQWTaskCtx *ctx) {
X
Xiaoyu Wang 已提交
249
  qTaskInfo_t taskHandle = ctx->taskHandle;
250 251 252 253 254 255 256
  if (NULL == taskHandle || !qTaskIsExecuting(taskHandle)) {
    return true;
  }

  return false;
}

D
dapan1121 已提交
257

D
dapan1121 已提交
258
int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
D
dapan1121 已提交
259 260
  int32_t taskNum = 0;

D
dapan1121 已提交
261
  hbInfo->connInfo = sch->hbConnInfo;
D
dapan1121 已提交
262
  hbInfo->rsp.epId = sch->hbEpId;
D
dapan1121 已提交
263

D
dapan1121 已提交
264
  QW_LOCK(QW_READ, &sch->tasksLock);
dengyihao's avatar
dengyihao 已提交
265

D
dapan1121 已提交
266
  taskNum = taosHashGetSize(sch->tasksHash);
D
dapan1121 已提交
267 268 269

  hbInfo->rsp.taskStatus = taosArrayInit(taskNum, sizeof(STaskStatus));
  if (NULL == hbInfo->rsp.taskStatus) {
D
dapan1121 已提交
270
    QW_UNLOCK(QW_READ, &sch->tasksLock);
D
dapan1121 已提交
271
    QW_ELOG("taosArrayInit taskStatus failed, num:%d", taskNum);
S
Shengliang Guan 已提交
272
    return TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
273 274
  }

275
  void       *key = NULL;
dengyihao's avatar
dengyihao 已提交
276 277
  size_t      keyLen = 0;
  int32_t     i = 0;
D
dapan1121 已提交
278
  STaskStatus status = {0};
D
dapan1121 已提交
279 280 281 282

  void *pIter = taosHashIterate(sch->tasksHash, NULL);
  while (pIter) {
    SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
D
dapan1121 已提交
283
    key = taosHashGetKey(pIter, &keyLen);
D
dapan1121 已提交
284

dengyihao's avatar
dengyihao 已提交
285
    // TODO GET EXECUTOR API TO GET MORE INFO
D
dapan1121 已提交
286

D
dapan1121 已提交
287
    QW_GET_QTID(key, status.queryId, status.taskId, status.execId);
D
dapan1121 已提交
288 289
    status.status = taskStatus->status;
    status.refId = taskStatus->refId;
dengyihao's avatar
dengyihao 已提交
290

D
dapan1121 已提交
291
    taosArrayPush(hbInfo->rsp.taskStatus, &status);
dengyihao's avatar
dengyihao 已提交
292

D
dapan1121 已提交
293 294
    ++i;
    pIter = taosHashIterate(sch->tasksHash, pIter);
dengyihao's avatar
dengyihao 已提交
295
  }
D
dapan1121 已提交
296 297 298 299 300 301

  QW_UNLOCK(QW_READ, &sch->tasksLock);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
302
int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
D
dapan1121 已提交
303
  int64_t            len = 0;
D
dapan1121 已提交
304
  SRetrieveTableRsp *rsp = NULL;
dengyihao's avatar
dengyihao 已提交
305 306
  bool               queryEnd = false;
  int32_t            code = 0;
307
  SOutputData        output = {0};
D
dapan1121 已提交
308

309
  if (NULL == ctx->sinkHandle) {
D
dapan1121 已提交
310
    pOutput->queryEnd = true;
311 312 313
    return TSDB_CODE_SUCCESS;
  }

314
  *dataLen = 0;
D
dapan1121 已提交
315

316 317
  while (true) {
    dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
D
dapan1121 已提交
318

319
    if (len < 0) {
320
      QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64 "", len);
321 322
      QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
    }
dengyihao's avatar
dengyihao 已提交
323

324 325 326 327 328 329 330
    if (len == 0) {
      if (queryEnd) {
        code = dsGetDataBlock(ctx->sinkHandle, &output);
        if (code) {
          QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
          QW_ERR_RET(code);
        }
H
Haojun Liao 已提交
331

dengyihao's avatar
dengyihao 已提交
332
        QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks,
333
                     pOutput->numOfRows);
334

335 336
        qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
        if (NULL == rsp) {
D
dapan1121 已提交
337
          QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &rsp));
338 339 340 341 342 343 344 345 346 347 348 349 350
          *pOutput = output;
        } else {
          pOutput->queryEnd = output.queryEnd;
          pOutput->bufStatus = output.bufStatus;
          pOutput->useconds = output.useconds;
        }

        break;
      }

      pOutput->bufStatus = DS_BUF_EMPTY;

      break;
D
dapan1121 已提交
351
    }
D
dapan1121 已提交
352

353
    // Got data from sink
354
    QW_TASK_DLOG("there are data in sink, dataLength:%" PRId64 "", len);
D
dapan1121 已提交
355

356
    *dataLen += len;
D
dapan1121 已提交
357

D
dapan1121 已提交
358
    QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &rsp));
D
dapan1121 已提交
359

360 361 362 363 364 365
    output.pData = rsp->data + *dataLen - len;
    code = dsGetDataBlock(ctx->sinkHandle, &output);
    if (code) {
      QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
      QW_ERR_RET(code);
    }
dengyihao's avatar
dengyihao 已提交
366

367 368 369 370 371 372 373 374 375 376
    pOutput->queryEnd = output.queryEnd;
    pOutput->precision = output.precision;
    pOutput->bufStatus = output.bufStatus;
    pOutput->useconds = output.useconds;
    pOutput->compressed = output.compressed;
    pOutput->numOfCols = output.numOfCols;
    pOutput->numOfRows += output.numOfRows;
    pOutput->numOfBlocks++;

    if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
377
      QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks,
378
                   pOutput->numOfRows);
379 380 381
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
      break;
    }
dengyihao's avatar
dengyihao 已提交
382

383
    if (0 == ctx->level) {
dengyihao's avatar
dengyihao 已提交
384 385
      QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 ", level %d", pOutput->numOfBlocks, pOutput->numOfRows,
                   ctx->level);
386 387
      break;
    }
D
dapan1121 已提交
388

389
    if (pOutput->numOfRows >= QW_MIN_RES_ROWS) {
dengyihao's avatar
dengyihao 已提交
390 391
      QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks,
                   pOutput->numOfRows);
392 393
      break;
    }
D
dapan1121 已提交
394 395
  }

396 397
  *rspMsg = rsp;

D
dapan1121 已提交
398
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
399 400
}

401
int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes) {
402 403 404 405
  int64_t     len = 0;
  bool        queryEnd = false;
  int32_t     code = 0;
  SOutputData output = {0};
D
dapan1121 已提交
406 407 408

  dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);

D
dapan1121 已提交
409
  if (len <= 0 || len != sizeof(SDeleterRes)) {
D
dapan1121 已提交
410
    QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len);
D
dapan1121 已提交
411 412 413 414 415 416 417
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  output.pData = taosMemoryCalloc(1, len);
  if (NULL == output.pData) {
    QW_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
418

D
dapan1121 已提交
419 420 421 422 423 424 425
  code = dsGetDataBlock(ctx->sinkHandle, &output);
  if (code) {
    QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
    taosMemoryFree(output.pData);
    QW_ERR_RET(code);
  }

426 427
  SDeleterRes *pDelRes = (SDeleterRes *)output.pData;

D
dapan1121 已提交
428
  pRes->suid = pDelRes->suid;
D
dapan1121 已提交
429
  pRes->uidList = pDelRes->uidList;
430
  pRes->ctimeMs = taosGetTimestampMs();
D
dapan1121 已提交
431 432
  pRes->skey = pDelRes->skey;
  pRes->ekey = pDelRes->ekey;
433
  pRes->affectedRows = pDelRes->affectedRows;
434 435
  strcpy(pRes->tableFName, pDelRes->tableName);
  strcpy(pRes->tsColName, pDelRes->tsColName);
D
dapan1121 已提交
436
  taosMemoryFree(output.pData);
437

D
dapan1121 已提交
438 439 440
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
441 442

int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx    * ctx, SQWMsg *qwMsg, int32_t code) {
D
dapan1121 已提交
443 444 445 446 447 448 449 450
  if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL) {
    if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
      void       *rsp = NULL;
      int32_t     dataLen = 0;
      SOutputData sOutput = {0};
      if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) {
        return TSDB_CODE_SUCCESS;
      }
D
dapan1121 已提交
451

D
dapan1121 已提交
452 453
      if (rsp) {
        bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
D
dapan1121 已提交
454

D
dapan1121 已提交
455 456 457 458
        qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
        if (qComplete) {
          atomic_store_8((int8_t *)&ctx->queryEnd, true);
        }
D
dapan1121 已提交
459

D
dapan1121 已提交
460 461
        qwMsg->connInfo = ctx->dataConnInfo;
        QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
D
dapan1121 已提交
462

D
dapan1121 已提交
463
        qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
D
dapan1121 已提交
464
        rsp = NULL;
D
dapan1121 已提交
465

D
dapan1121 已提交
466 467 468
        QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
                     dataLen);
      }
D
dapan1121 已提交
469
    } 
D
dapan1121 已提交
470 471 472 473 474
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
475
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
476 477
  int32_t     code = 0;
  SQWTaskCtx *ctx = NULL;
D
dapan1121 已提交
478

D
dapan1121 已提交
479
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
480

D
dapan1121 已提交
481
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
482

D
dapan1121 已提交
483
  QW_LOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
484

D
dapan1121 已提交
485
  QW_SET_PHASE(ctx, phase);
D
dapan1121 已提交
486

dengyihao's avatar
dengyihao 已提交
487
  if (atomic_load_8((int8_t *)&ctx->queryEnd)) {
D
dapan1121 已提交
488 489 490
    QW_TASK_ELOG_E("query already end");
    QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
  }
D
dapan1121 已提交
491

D
dapan1121 已提交
492 493
  switch (phase) {
    case QW_PHASE_PRE_QUERY: {
D
dapan1121 已提交
494
      if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
H
Haojun Liao 已提交
495
        QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
496
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
D
dapan1121 已提交
497
      }
D
dapan1121 已提交
498

D
dapan1121 已提交
499
      if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
500
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
501

502 503
        // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
504

D
dapan1121 已提交
505
        QW_ERR_JRET(ctx->rspCode);
D
dapan1121 已提交
506
      }
D
dapan1121 已提交
507

D
dapan1121 已提交
508
      QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC));
D
dapan1121 已提交
509 510
      break;
    }
D
dapan1121 已提交
511
    case QW_PHASE_PRE_FETCH: {
D
dapan1121 已提交
512
      if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
513
        QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
514
        QW_ERR_JRET(ctx->rspCode);
D
dapan1121 已提交
515
      }
D
dapan1121 已提交
516

D
dapan1121 已提交
517
      if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
518
        QW_TASK_WLOG("last fetch still not processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
519 520 521
        QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
      }

D
dapan1121 已提交
522 523 524 525 526
      if (ctx->rspCode) {
        QW_TASK_ELOG("task already failed cause of %s, phase:%s", tstrerror(ctx->rspCode), qwPhaseStr(phase));
        QW_ERR_JRET(ctx->rspCode);
      }

D
dapan1121 已提交
527
      if (!ctx->queryRsped) {
D
dapan1121 已提交
528
        QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
529
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
D
dapan1121 已提交
530 531
      }
      break;
dengyihao's avatar
dengyihao 已提交
532
    }
D
dapan1121 已提交
533
    case QW_PHASE_PRE_CQUERY: {
D
dapan1121 已提交
534
      if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
535
        QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
536
        QW_ERR_JRET(ctx->rspCode);
D
dapan1121 已提交
537
      }
D
dapan1121 已提交
538

D
dapan1121 已提交
539 540 541 542 543
      if (ctx->rspCode) {
        QW_TASK_ELOG("task already failed cause of %s, phase:%s", tstrerror(ctx->rspCode), qwPhaseStr(phase));
        QW_ERR_JRET(ctx->rspCode);
      }

D
dapan1121 已提交
544
      if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
545
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
H
Haojun Liao 已提交
546

547 548
        // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
549

D
dapan1121 已提交
550
        QW_ERR_JRET(ctx->rspCode);
D
dapan1121 已提交
551
      }
D
dapan1121 已提交
552

D
dapan1121 已提交
553
      break;
D
dapan1121 已提交
554 555 556
    }
    default:
      QW_TASK_ELOG("invalid phase %s", qwPhaseStr(phase));
S
Shengliang Guan 已提交
557
      QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
558 559 560
  }

  if (ctx->rspCode) {
561
    QW_TASK_ELOG("task already failed at phase %s, code:%s", qwPhaseStr(phase), tstrerror(ctx->rspCode));
D
dapan1121 已提交
562
    QW_ERR_JRET(ctx->rspCode);
D
dapan1121 已提交
563
  }
D
dapan1121 已提交
564

D
dapan1121 已提交
565
_return:
D
dapan1121 已提交
566

D
dapan1121 已提交
567
  if (ctx) {
D
dapan1121 已提交
568
    QW_UPDATE_RSP_CODE(ctx, code);
dengyihao's avatar
dengyihao 已提交
569

D
dapan1121 已提交
570
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
571 572
    qwReleaseTaskCtx(mgmt, ctx);
  }
D
dapan1121 已提交
573

574 575 576 577 578
  if (code != TSDB_CODE_SUCCESS) {
    QW_TASK_ELOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
  } else {
    QW_TASK_DLOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
  }
D
dapan1121 已提交
579 580 581 582 583

  QW_RET(code);
}

int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
584 585 586
  int32_t        code = 0;
  SQWTaskCtx    *ctx = NULL;
  SRpcHandleInfo connInfo = {0};
D
dapan1121 已提交
587

D
dapan1121 已提交
588
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
dengyihao's avatar
dengyihao 已提交
589

D
dapan1121 已提交
590
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
591

D
dapan1121 已提交
592 593
  QW_LOCK(QW_WRITE, &ctx->lock);

D
dapan1121 已提交
594
  if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
595
    QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
596
    QW_ERR_JRET(ctx->rspCode);
D
dapan1121 已提交
597 598
  }

D
dapan1121 已提交
599
  if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
600
    if (QW_PHASE_POST_FETCH != phase || ((!QW_QUERY_RUNNING(ctx)) && qwTaskNotInExec(ctx))) {
601 602 603
      QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
      QW_ERR_JRET(ctx->rspCode);
    }
D
dapan1121 已提交
604 605 606
  }

  if (ctx->rspCode) {
dengyihao's avatar
dengyihao 已提交
607 608
    QW_TASK_ELOG("task already failed, phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode,
                 tstrerror(ctx->rspCode));
D
dapan1121 已提交
609
    QW_ERR_JRET(ctx->rspCode);
dengyihao's avatar
dengyihao 已提交
610
  }
D
dapan1121 已提交
611

D
dapan1121 已提交
612
  QW_ERR_JRET(input->code);
D
dapan1121 已提交
613 614 615

_return:

D
dapan1121 已提交
616
  if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
D
dapan1121 已提交
617
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC);
618
    ctx->queryGotData = true;
D
dapan1121 已提交
619 620
  }

621
  if (QW_PHASE_POST_QUERY == phase && ctx && !ctx->queryRsped) {
D
dapan1121 已提交
622
    bool   rsped = false;
D
dapan1121 已提交
623
    SQWMsg qwMsg = {.msgType = ctx->queryMsgType, .connInfo = ctx->ctrlConnInfo};
D
dapan1121 已提交
624 625
    qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
    qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
dengyihao's avatar
dengyihao 已提交
626
    if (!rsped) {
D
dapan1121 已提交
627
      qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false);
628
    }
629 630
  }

D
dapan1121 已提交
631
  if (ctx) {
D
dapan1121 已提交
632
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
633

634 635 636
    if (QW_PHASE_POST_CQUERY != phase) {
      QW_SET_PHASE(ctx, phase);
    }
dengyihao's avatar
dengyihao 已提交
637

D
dapan1121 已提交
638
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
639
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
640 641
  }

D
dapan1121 已提交
642
  if (code) {
D
dapan1121 已提交
643
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL);
D
dapan1121 已提交
644 645
  }

D
dapan1121 已提交
646
  QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
D
dapan1121 已提交
647

D
dapan1121 已提交
648 649 650
  QW_RET(code);
}

D
dapan1121 已提交
651 652 653 654 655 656
int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) {
  QW_ERR_RET(qwDropTask(QW_FPARAMS()));

  QW_RET(TSDB_CODE_SUCCESS);
}

D
dapan1121 已提交
657
int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
H
Hongze Cheng 已提交
658 659
  int32_t     code = 0;
  SQWTaskCtx *ctx = NULL;
D
dapan1121 已提交
660

D
dapan1121 已提交
661
  QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
D
dapan1121 已提交
662

D
dapan1121 已提交
663 664 665
  QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS()));

  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
666

D
dapan1121 已提交
667
  ctx->ctrlConnInfo = qwMsg->connInfo;
668
  ctx->sId = sId;
D
dapan1121 已提交
669
  ctx->phase = -1;
D
dapan1121 已提交
670

D
dapan1121 已提交
671
  QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
D
dapan1121 已提交
672

D
dapan1121 已提交
673 674
  qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true);

D
dapan1121 已提交
675 676 677 678 679 680 681 682 683 684
_return:

  if (ctx) {
    QW_UPDATE_RSP_CODE(ctx, code);
    qwReleaseTaskCtx(mgmt, ctx);
  }

  QW_RET(TSDB_CODE_SUCCESS);
}

685
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
D
dapan1121 已提交
686 687 688 689 690 691 692 693
  int32_t        code = 0;
  bool           queryRsped = false;
  SSubplan      *plan = NULL;
  SQWPhaseInput  input = {0};
  qTaskInfo_t    pTaskInfo = NULL;
  DataSinkHandle sinkHandle = NULL;
  SQWTaskCtx    *ctx = NULL;

D
dapan1121 已提交
694
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
D
dapan1121 已提交
695 696

  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
697

D
dapan1121 已提交
698 699 700
  ctx->taskType = qwMsg->msgInfo.taskType;
  ctx->explain = qwMsg->msgInfo.explain;
  ctx->needFetch = qwMsg->msgInfo.needFetch;
D
dapan1121 已提交
701
  ctx->queryMsgType = qwMsg->msgType;
D
dapan1121 已提交
702
  ctx->localExec = false;
X
Xiaoyu Wang 已提交
703

704
  // QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
X
Xiaoyu Wang 已提交
705

706
  code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan);
D
dapan1121 已提交
707
  if (TSDB_CODE_SUCCESS != code) {
708 709
    code = TSDB_CODE_INVALID_MSG;
    QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
710
    QW_ERR_JRET(code);
D
dapan1121 已提交
711
  }
dengyihao's avatar
dengyihao 已提交
712

713
  code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
714
  sql = NULL;
D
dapan1121 已提交
715
  if (code) {
D
dapan1121 已提交
716
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
717
    QW_ERR_JRET(code);
D
dapan1121 已提交
718
  }
D
dapan1121 已提交
719

H
Haojun Liao 已提交
720
  if (NULL == sinkHandle || NULL == pTaskInfo) {
D
dapan1121 已提交
721
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
S
Shengliang Guan 已提交
722
    QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
723 724
  }

D
dapan1121 已提交
725
  //qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true);
D
dapan1121 已提交
726

727
  ctx->level = plan->level;
D
dapan1121 已提交
728 729 730
  atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
  atomic_store_ptr(&ctx->sinkHandle, sinkHandle);

D
dapan1121 已提交
731 732
  qwSaveTbVersionInfo(pTaskInfo, ctx);
  QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
dengyihao's avatar
dengyihao 已提交
733

D
dapan1121 已提交
734 735
_return:

D
dapan1121 已提交
736
  taosMemoryFree(sql);
737

D
dapan1121 已提交
738
  input.code = code;
D
dapan1121 已提交
739
  input.msgType = qwMsg->msgType;
D
dapan1121 已提交
740
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
741

D
dapan1121 已提交
742
  qwQuickRspFetchReq(QW_FPARAMS(), ctx, qwMsg, code);
743

D
dapan1121 已提交
744
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
745 746
}

D
dapan1121 已提交
747
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
748
  SQWTaskCtx   *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
749
  int32_t       code = 0;
750
  SQWPhaseInput input = {0};
751
  void         *rsp = NULL;
dengyihao's avatar
dengyihao 已提交
752
  int32_t       dataLen = 0;
753
  bool          queryStop = false;
D
dapan1121 已提交
754
  bool          qComplete = false;
dengyihao's avatar
dengyihao 已提交
755

D
dapan1121 已提交
756
  do {
D
dapan1121 已提交
757
    ctx = NULL;
dengyihao's avatar
dengyihao 已提交
758

D
dapan1121 已提交
759
    QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
D
dapan1121 已提交
760

D
dapan1121 已提交
761
    QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
762

dengyihao's avatar
dengyihao 已提交
763 764
    atomic_store_8((int8_t *)&ctx->queryInQueue, 0);
    atomic_store_8((int8_t *)&ctx->queryContinue, 0);
D
dapan1121 已提交
765

D
dapan1121 已提交
766 767 768 769
    if (!queryStop) {
      QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop));
    }
    
D
dapan1121 已提交
770
    if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
771
      SOutputData sOutput = {0};
D
dapan1121 已提交
772
      QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
dengyihao's avatar
dengyihao 已提交
773 774

      if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
775
        QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
dengyihao's avatar
dengyihao 已提交
776 777

        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
778
      }
dengyihao's avatar
dengyihao 已提交
779

D
dapan1121 已提交
780
      if (rsp) {
D
dapan1121 已提交
781
        qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
782

D
dapan1121 已提交
783
        qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
784
        if (qComplete) {
dengyihao's avatar
dengyihao 已提交
785
          atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
786
          atomic_store_8((int8_t *)&ctx->queryContinue, 0);
D
dapan1121 已提交
787
        }
H
Haojun Liao 已提交
788

D
dapan1121 已提交
789
        qwMsg->connInfo = ctx->dataConnInfo;
dengyihao's avatar
dengyihao 已提交
790 791
        QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);

D
dapan1121 已提交
792
        qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
D
dapan1121 已提交
793
        rsp = NULL;
794

dengyihao's avatar
dengyihao 已提交
795 796
        QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
                     tstrerror(code), dataLen);
D
dapan1121 已提交
797
      } else {
dengyihao's avatar
dengyihao 已提交
798
        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
799 800 801
      }
    }

dengyihao's avatar
dengyihao 已提交
802
  _return:
803

D
dapan1121 已提交
804 805 806 807
    if (NULL == ctx) {
      break;
    }

D
dapan1121 已提交
808
    if (code && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
dengyihao's avatar
dengyihao 已提交
809
      QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
810 811
      qwFreeFetchRsp(rsp);
      rsp = NULL;
dengyihao's avatar
dengyihao 已提交
812

D
dapan1121 已提交
813
      qwMsg->connInfo = ctx->dataConnInfo;
D
dapan1121 已提交
814
      qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code);
dengyihao's avatar
dengyihao 已提交
815 816
      QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
                   0);
817
    }
D
dapan1121 已提交
818

D
dapan1121 已提交
819
    QW_LOCK(QW_WRITE, &ctx->lock);
X
Xiaoyu Wang 已提交
820 821
    if (atomic_load_8((int8_t *)&ctx->queryEnd) || (queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) ||
        code) {
D
dapan1121 已提交
822
      // Note: query is not running anymore
823
      QW_SET_PHASE(ctx, QW_PHASE_POST_CQUERY);
dengyihao's avatar
dengyihao 已提交
824
      QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
825 826
      break;
    }
dengyihao's avatar
dengyihao 已提交
827
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
828
    queryStop = false;
D
dapan1121 已提交
829
  } while (true);
D
dapan1121 已提交
830

D
dapan1121 已提交
831
  input.code = code;
dengyihao's avatar
dengyihao 已提交
832
  qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL);
D
dapan1121 已提交
833

dengyihao's avatar
dengyihao 已提交
834
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
835
}
D
dapan1121 已提交
836

D
dapan1121 已提交
837
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
838 839 840
  int32_t       code = 0;
  int32_t       dataLen = 0;
  bool          locked = false;
841 842
  SQWTaskCtx   *ctx = NULL;
  void         *rsp = NULL;
D
dapan1121 已提交
843
  SQWPhaseInput input = {0};
D
dapan1121 已提交
844

D
dapan1121 已提交
845
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
846

847
  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
848

D
dapan1121 已提交
849
  ctx->fetchMsgType = qwMsg->msgType;
D
dapan1121 已提交
850
  ctx->dataConnInfo = qwMsg->connInfo;
D
dapan1121 已提交
851

D
dapan 已提交
852
  SOutputData sOutput = {0};
D
dapan1121 已提交
853
  QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
D
dapan1121 已提交
854

855 856
  if (NULL == rsp) {
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
D
dapan1121 已提交
857
  } else {
D
dapan1121 已提交
858
    bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
859

D
dapan1121 已提交
860
    qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
861
    if (qComplete) {
dengyihao's avatar
dengyihao 已提交
862
      atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
863
    }
D
dapan1121 已提交
864 865
  }

dengyihao's avatar
dengyihao 已提交
866
  if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
867
    QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
D
dapan1121 已提交
868

D
dapan1121 已提交
869 870
    QW_LOCK(QW_WRITE, &ctx->lock);
    locked = true;
871

D
dapan1121 已提交
872
    // RC WARNING
873 874 875
    if (-1 == ctx->phase || false == ctx->queryGotData) {
      QW_TASK_DLOG_E("task query unfinished");
    } else if (QW_QUERY_RUNNING(ctx)) {
dengyihao's avatar
dengyihao 已提交
876 877
      atomic_store_8((int8_t *)&ctx->queryContinue, 1);
    } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
D
dapan1121 已提交
878
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC);
D
dapan1121 已提交
879

dengyihao's avatar
dengyihao 已提交
880 881
      atomic_store_8((int8_t *)&ctx->queryInQueue, 1);

D
dapan1121 已提交
882
      QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
883
    }
D
dapan 已提交
884
  }
dengyihao's avatar
dengyihao 已提交
885

D
dapan1121 已提交
886
_return:
D
dapan1121 已提交
887

D
dapan1121 已提交
888 889 890 891 892
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

  input.code = code;
D
dapan1121 已提交
893
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL);
D
dapan1121 已提交
894

D
dapan 已提交
895 896 897
  if (code) {
    qwFreeFetchRsp(rsp);
    rsp = NULL;
D
dapan1121 已提交
898
    dataLen = 0;
D
dapan1121 已提交
899 900 901
  }

  if (code || rsp) {
D
dapan1121 已提交
902
    bool rsped = false;
D
dapan1121 已提交
903
    if (ctx) {
904
      qwDbgSimulateRedirect(qwMsg, ctx, &rsped);
D
dapan1121 已提交
905 906
      qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
    }
D
dapan1121 已提交
907 908
    if (!rsped) {
      qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
909 910
      QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1),
                   qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
911 912 913
    } else {
      qwFreeFetchRsp(rsp);
      rsp = NULL;
D
dapan1121 已提交
914
    }
D
dapan1121 已提交
915
  } else {    
D
dapan1121 已提交
916
    //qwQuickRspFetchReq(QW_FPARAMS(), ctx, qwMsg, code);
D
dapan1121 已提交
917 918
  }

D
dapan1121 已提交
919
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
920
}
D
dapan1121 已提交
921

D
dapan1121 已提交
922
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
923
  int32_t     code = 0;
D
dapan1121 已提交
924
  bool        dropped = false;
D
dapan1121 已提交
925
  SQWTaskCtx *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
926
  bool        locked = false;
D
dapan1121 已提交
927

D
dapan1121 已提交
928
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
929

D
dapan1121 已提交
930 931 932 933
  QW_LOCK(QW_WRITE, &ctx->lock);

  locked = true;

D
dapan1121 已提交
934
  if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
935
    QW_TASK_WLOG_E("task already dropping");
D
dapan1121 已提交
936 937 938
    QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
  }

D
dapan1121 已提交
939
  if (QW_QUERY_RUNNING(ctx)) {
D
dapan1121 已提交
940
    QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED));
D
dapan1121 已提交
941
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP);
D
dapan1121 已提交
942
  } else {
D
dapan1121 已提交
943
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
944
    dropped = true;
D
dapan1121 已提交
945
  }
D
dapan1121 已提交
946

D
dapan1121 已提交
947
  if (!dropped) {
D
dapan1121 已提交
948
    QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_TSC_QUERY_CANCELLED);
D
dapan1121 已提交
949 950
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
  }
951

D
dapan1121 已提交
952
_return:
D
dapan1121 已提交
953

D
dapan1121 已提交
954
  if (code) {
D
dapan1121 已提交
955 956 957
    if (ctx) {
      QW_UPDATE_RSP_CODE(ctx, code);
    }
H
Haojun Liao 已提交
958

D
dapan1121 已提交
959
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL);
D
dapan1121 已提交
960 961
  }

D
dapan 已提交
962 963 964 965
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

D
dapan1121 已提交
966
  if (ctx) {
D
dapan1121 已提交
967
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
968 969
  }

D
dapan1121 已提交
970 971 972
  QW_RET(TSDB_CODE_SUCCESS);
}

D
dapan1121 已提交
973
int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
dengyihao's avatar
dengyihao 已提交
974
  int32_t         code = 0;
D
dapan1121 已提交
975
  SSchedulerHbRsp rsp = {0};
976
  SQWSchStatus   *sch = NULL;
D
dapan1121 已提交
977

D
dapan1121 已提交
978 979 980
  if (qwMsg->code) {
    QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req));
  }
D
dapan1121 已提交
981 982

  QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
D
dapan1121 已提交
983 984
  QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo));

D
dapan1121 已提交
985 986
  sch->hbBrokenTs = 0;

D
dapan1121 已提交
987
  QW_LOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
988

D
dapan1121 已提交
989
  if (sch->hbConnInfo.handle) {
990
    tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
S
Shengliang Guan 已提交
991
    sch->hbConnInfo.handle = NULL;
D
dapan1121 已提交
992
  }
D
dapan1121 已提交
993

D
dapan1121 已提交
994
  memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
D
dapan1121 已提交
995
  memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId));
dengyihao's avatar
dengyihao 已提交
996

D
dapan1121 已提交
997
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
dengyihao's avatar
dengyihao 已提交
998 999 1000

  QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", req->sId,
          req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle);
D
dapan1121 已提交
1001

D
dapan1121 已提交
1002 1003 1004 1005
  qwReleaseScheduler(QW_READ, mgmt);

_return:

D
dapan1121 已提交
1006 1007
  memcpy(&rsp.epId, &req->epId, sizeof(req->epId));

S
shm  
Shengliang Guan 已提交
1008
  qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
D
dapan1121 已提交
1009 1010

  if (code) {
1011
    tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER);
S
Shengliang Guan 已提交
1012
    qwMsg->connInfo.handle = NULL;
D
dapan1121 已提交
1013
  }
dengyihao's avatar
dengyihao 已提交
1014

1015
  /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));*/
dengyihao's avatar
dengyihao 已提交
1016

D
dapan1121 已提交
1017
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1018 1019 1020
}

void qwProcessHbTimerEvent(void *param, void *tmrId) {
1021
  SQWHbParam *hbParam = (SQWHbParam *)param;
D
dapan1121 已提交
1022 1023 1024 1025
  if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) {
    return;
  }

1026
  int64_t   refId = hbParam->refId;
D
dapan1121 已提交
1027 1028 1029 1030 1031
  SQWorker *mgmt = qwAcquire(refId);
  if (NULL == mgmt) {
    QW_DLOG("qwAcquire %" PRIx64 "failed", refId);
    return;
  }
1032

D
dapan1121 已提交
1033
  SQWSchStatus *sch = NULL;
dengyihao's avatar
dengyihao 已提交
1034
  int32_t       taskNum = 0;
1035
  SQWHbInfo    *rspList = NULL;
D
dapan1121 已提交
1036
  SArray       *pExpiredSch = NULL;
dengyihao's avatar
dengyihao 已提交
1037
  int32_t       code = 0;
D
dapan1121 已提交
1038

D
dapan1121 已提交
1039 1040
  qwDbgDumpMgmtInfo(mgmt);

1041 1042 1043 1044
  if (gQWDebug.forceStop) {
    (void)qwStopAllTasks(mgmt);
  }

D
dapan1121 已提交
1045 1046 1047 1048 1049
  QW_LOCK(QW_READ, &mgmt->schLock);

  int32_t schNum = taosHashGetSize(mgmt->schHash);
  if (schNum <= 0) {
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
1050
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
1051
    qwRelease(refId);
D
dapan1121 已提交
1052
    return;
D
dapan1121 已提交
1053 1054
  }

wafwerar's avatar
wafwerar 已提交
1055
  rspList = taosMemoryCalloc(schNum, sizeof(SQWHbInfo));
D
dapan1121 已提交
1056 1057
  pExpiredSch = taosArrayInit(schNum, sizeof(uint64_t));
  if (NULL == rspList || NULL == pExpiredSch) {
D
dapan1121 已提交
1058
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
1059 1060
    taosMemoryFree(rspList);
    taosArrayDestroy(pExpiredSch);
D
dapan1121 已提交
1061 1062
    QW_ELOG("calloc %d SQWHbInfo failed", schNum);
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
1063
    qwRelease(refId);
D
dapan1121 已提交
1064
    return;
D
dapan1121 已提交
1065 1066
  }

1067
  void   *key = NULL;
dengyihao's avatar
dengyihao 已提交
1068
  size_t  keyLen = 0;
D
dapan1121 已提交
1069
  int32_t i = 0;
D
dapan1121 已提交
1070
  int64_t currentMs = taosGetTimestampMs();
D
dapan1121 已提交
1071 1072 1073

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
H
Haojun Liao 已提交
1074 1075
    SQWSchStatus *sch1 = (SQWSchStatus *)pIter;
    if (NULL == sch1->hbConnInfo.handle) {
D
dapan1121 已提交
1076
      uint64_t *sId = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
1077
      QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
D
dapan1121 已提交
1078

H
Haojun Liao 已提交
1079 1080
      if (sch1->hbBrokenTs > 0 && ((currentMs - sch1->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) &&
          taosHashGetSize(sch1->tasksHash) <= 0) {
D
dapan1121 已提交
1081 1082
        taosArrayPush(pExpiredSch, sId);
      }
1083

D
dapan1121 已提交
1084 1085 1086
      pIter = taosHashIterate(mgmt->schHash, pIter);
      continue;
    }
dengyihao's avatar
dengyihao 已提交
1087

D
dapan1121 已提交
1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102
    code = qwGenerateSchHbRsp(mgmt, (SQWSchStatus *)pIter, &rspList[i]);
    if (code) {
      taosHashCancelIterate(mgmt->schHash, pIter);
      QW_ERR_JRET(code);
    }

    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

_return:

  QW_UNLOCK(QW_READ, &mgmt->schLock);

  for (int32_t j = 0; j < i; ++j) {
S
shm  
Shengliang Guan 已提交
1103
    qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code);
1104 1105
    /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),*/
    /*(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));*/
D
dapan1121 已提交
1106
    tFreeSSchedulerHbRsp(&rspList[j].rsp);
D
dapan1121 已提交
1107 1108
  }

D
dapan1121 已提交
1109
  if (taosArrayGetSize(pExpiredSch) > 0) {
D
dapan1121 已提交
1110
    qwClearExpiredSch(mgmt, pExpiredSch);
D
dapan1121 已提交
1111 1112
  }

wafwerar's avatar
wafwerar 已提交
1113
  taosMemoryFreeClear(rspList);
D
dapan1121 已提交
1114
  taosArrayDestroy(pExpiredSch);
D
dapan1121 已提交
1115

dengyihao's avatar
dengyihao 已提交
1116
  taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
1117
  qwRelease(refId);
D
dapan1121 已提交
1118 1119
}

1120
int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
D
dapan1121 已提交
1121 1122 1123 1124 1125 1126
  int32_t        code = 0;
  SSubplan      *plan = NULL;
  qTaskInfo_t    pTaskInfo = NULL;
  DataSinkHandle sinkHandle = NULL;
  SQWTaskCtx     ctx = {0};

1127
  code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan);
D
dapan1121 已提交
1128 1129 1130 1131 1132 1133
  if (TSDB_CODE_SUCCESS != code) {
    code = TSDB_CODE_INVALID_MSG;
    QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
    QW_ERR_JRET(code);
  }

1134
  code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
1135 1136 1137 1138 1139 1140 1141
  if (code) {
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
    QW_ERR_JRET(code);
  }

  if (NULL == sinkHandle || NULL == pTaskInfo) {
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
S
Shengliang Guan 已提交
1142
    QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
1143 1144
  }

D
dapan1121 已提交
1145 1146
  ctx.taskHandle = pTaskInfo;
  ctx.sinkHandle = sinkHandle;
D
dapan1121 已提交
1147

D
dapan1121 已提交
1148
  QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx, NULL));
D
dapan1121 已提交
1149

1150
  QW_ERR_JRET(qwGetDeleteResFromSink(QW_FPARAMS(), &ctx, pRes));
D
dapan1121 已提交
1151 1152 1153

_return:

D
dapan1121 已提交
1154
  qwFreeTaskCtx(&ctx);
D
dapan1121 已提交
1155 1156 1157 1158

  QW_RET(TSDB_CODE_SUCCESS);
}

D
dapan1121 已提交
1159 1160
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
  if (NULL == qWorkerMgmt || (pMsgCb && pMsgCb->mgmt == NULL)) {
D
dapan1121 已提交
1161 1162 1163
    qError("invalid param to init qworker");
    QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
S
Shengliang 已提交
1164

D
dapan1121 已提交
1165 1166 1167 1168
  int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1);
  if (1 == qwNum) {
    memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param));
  }
D
dapan1121 已提交
1169 1170 1171 1172 1173 1174 1175 1176

  int32_t code = qwOpenRef();
  if (code) {
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
    QW_RET(code);
  }

  SQWorker *mgmt = taosMemoryCalloc(1, sizeof(SQWorker));
D
dapan1121 已提交
1177
  if (NULL == mgmt) {
D
dapan1121 已提交
1178 1179
    qError("calloc %d failed", (int32_t)sizeof(SQWorker));
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
S
Shengliang Guan 已提交
1180
    QW_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1181 1182
  }

D
dapan1121 已提交
1183 1184 1185
  mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
  mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
  mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
1186

dengyihao's avatar
dengyihao 已提交
1187 1188
  mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                               HASH_ENTRY_LOCK);
D
dapan1121 已提交
1189
  if (NULL == mgmt->schHash) {
wafwerar's avatar
wafwerar 已提交
1190
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1191
    qError("init %d scheduler hash failed", mgmt->cfg.maxSchedulerNum);
S
Shengliang Guan 已提交
1192
    QW_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1193 1194
  }

dengyihao's avatar
dengyihao 已提交
1195 1196
  mgmt->ctxHash =
      taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1197
  if (NULL == mgmt->ctxHash) {
D
dapan1121 已提交
1198
    qError("init %d task ctx hash failed", mgmt->cfg.maxTaskNum);
S
Shengliang Guan 已提交
1199
    QW_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1200 1201 1202 1203 1204
  }

  mgmt->timer = taosTmrInit(0, 0, 0, "qworker");
  if (NULL == mgmt->timer) {
    qError("init timer failed, error:%s", tstrerror(terrno));
S
Shengliang Guan 已提交
1205
    QW_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1206 1207
  }

D
dapan1121 已提交
1208 1209
  mgmt->nodeType = nodeType;
  mgmt->nodeId = nodeId;
D
dapan1121 已提交
1210 1211 1212 1213 1214
  if (pMsgCb) {
    mgmt->msgCb = *pMsgCb;
  } else {
    memset(&mgmt->msgCb, 0, sizeof(mgmt->msgCb));
  }
D
dapan1121 已提交
1215

D
dapan1121 已提交
1216 1217 1218 1219 1220 1221
  mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt);
  if (mgmt->refId < 0) {
    qError("taosAddRef qw failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(terrno);
  }

D
dapan1121 已提交
1222 1223 1224
  SQWHbParam *param = NULL;
  qwSetHbParam(mgmt->refId, &param);

1225
  mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void *)param, mgmt->timer);
D
dapan1121 已提交
1226 1227
  if (NULL == mgmt->hbTimer) {
    qError("start hb timer failed");
S
Shengliang Guan 已提交
1228
    QW_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1229
  }
1230

D
dapan1121 已提交
1231 1232
  *qWorkerMgmt = mgmt;

S
Shengliang Guan 已提交
1233
  qDebug("qworker initialized, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt);
D
dapan1121 已提交
1234

D
dapan1121 已提交
1235
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1236 1237 1238

_return:

D
dapan1121 已提交
1239 1240 1241 1242 1243 1244 1245
  if (mgmt->refId >= 0) {
    qwRelease(mgmt->refId);
  } else {
    taosHashCleanup(mgmt->schHash);
    taosHashCleanup(mgmt->ctxHash);
    taosTmrCleanUp(mgmt->timer);
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1246

1247
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
D
dapan1121 已提交
1248
  }
1249

D
dapan1121 已提交
1250
  QW_RET(code);
D
dapan1121 已提交
1251
}
D
dapan1121 已提交
1252

D
dapan1121 已提交
1253
void qWorkerStopAllTasks(void *qWorkerMgmt) {
D
dapan1121 已提交
1254 1255 1256
  SQWorker *mgmt = (SQWorker *)qWorkerMgmt;

  QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash));
D
dapan1121 已提交
1257

1258 1259
  atomic_store_8(&mgmt->nodeStopped, 1);

1260
  (void)qwStopAllTasks(mgmt);
D
dapan1121 已提交
1261 1262
}

D
dapan1121 已提交
1263 1264 1265
void qWorkerDestroy(void **qWorkerMgmt) {
  if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
    return;
D
dapan1121 已提交
1266
  }
D
dapan 已提交
1267

H
Hongze Cheng 已提交
1268
  int32_t   destroyed = 0;
D
dapan1121 已提交
1269
  SQWorker *mgmt = *qWorkerMgmt;
D
dapan1121 已提交
1270
  mgmt->destroyed = &destroyed;
D
dapan1121 已提交
1271

D
dapan1121 已提交
1272 1273
  if (taosRemoveRef(gQwMgmt.qwRef, mgmt->refId)) {
    qError("remove qw from ref list failed, refId:%" PRIx64, mgmt->refId);
D
dapan1121 已提交
1274 1275 1276 1277 1278
    return;
  }

  while (0 == destroyed) {
    taosMsleep(2);
D
dapan1121 已提交
1279
  }
D
dapan1121 已提交
1280
}
D
dapan1121 已提交
1281

D
dapan1121 已提交
1282 1283 1284 1285 1286
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat) {
  if (NULL == handle || NULL == qWorkerMgmt || NULL == pStat) {
    QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

1287
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
1288
  SDataSinkStat sinkStat = {0};
1289

D
dapan1121 已提交
1290 1291
  dsDataSinkGetCacheSize(&sinkStat);
  pStat->cacheDataSize = sinkStat.cachedSize;
1292

D
dapan1121 已提交
1293 1294 1295 1296 1297
  pStat->queryProcessed = QW_STAT_GET(mgmt->stat.msgStat.queryProcessed);
  pStat->cqueryProcessed = QW_STAT_GET(mgmt->stat.msgStat.cqueryProcessed);
  pStat->fetchProcessed = QW_STAT_GET(mgmt->stat.msgStat.fetchProcessed);
  pStat->dropProcessed = QW_STAT_GET(mgmt->stat.msgStat.dropProcessed);
  pStat->hbProcessed = QW_STAT_GET(mgmt->stat.msgStat.hbProcessed);
D
dapan1121 已提交
1298
  pStat->deleteProcessed = QW_STAT_GET(mgmt->stat.msgStat.deleteProcessed);
D
dapan1121 已提交
1299 1300 1301 1302 1303 1304 1305

  pStat->numOfQueryInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, QUERY_QUEUE);
  pStat->numOfFetchInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, FETCH_QUEUE);
  pStat->timeInQueryQueue = qwGetTimeInQueue((SQWorker *)qWorkerMgmt, QUERY_QUEUE);
  pStat->timeInFetchQueue = qwGetTimeInQueue((SQWorker *)qWorkerMgmt, FETCH_QUEUE);

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1306
}
D
dapan1121 已提交
1307

H
Hongze Cheng 已提交
1308 1309 1310
int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId,
                                 SQWMsg *qwMsg, SArray *explainRes) {
  SQWorker      *mgmt = (SQWorker *)pMgmt;
D
dapan1121 已提交
1311 1312 1313 1314 1315 1316
  int32_t        code = 0;
  SQWTaskCtx    *ctx = NULL;
  SSubplan      *plan = (SSubplan *)qwMsg->msg;
  SQWPhaseInput  input = {0};
  qTaskInfo_t    pTaskInfo = NULL;
  DataSinkHandle sinkHandle = NULL;
H
Hongze Cheng 已提交
1317
  SReadHandle    rHandle = {0};
D
dapan1121 已提交
1318 1319

  QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS()));
D
dapan1121 已提交
1320
  QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
D
dapan1121 已提交
1321

D
dapan1121 已提交
1322
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
D
dapan1121 已提交
1323 1324 1325 1326 1327
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));

  ctx->taskType = qwMsg->msgInfo.taskType;
  ctx->explain = qwMsg->msgInfo.explain;
  ctx->needFetch = qwMsg->msgInfo.needFetch;
D
dapan1121 已提交
1328
  ctx->queryMsgType = qwMsg->msgType;
D
dapan1121 已提交
1329 1330
  ctx->localExec = true;
  ctx->explainRes = explainRes;
D
dapan1121 已提交
1331

D
dapan1121 已提交
1332 1333
  rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb));
  rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle;
H
Haojun Liao 已提交
1334

D
dapan1121 已提交
1335
  code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
1336 1337 1338 1339 1340 1341 1342
  if (code) {
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
    QW_ERR_JRET(code);
  }

  if (NULL == sinkHandle || NULL == pTaskInfo) {
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
S
Shengliang Guan 已提交
1343
    QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
1344 1345 1346 1347 1348 1349 1350 1351 1352 1353
  }

  ctx->level = plan->level;
  atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
  atomic_store_ptr(&ctx->sinkHandle, sinkHandle);

  QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));

_return:

D
dapan1121 已提交
1354
  taosMemoryFree(rHandle.pMsgCb);
H
Haojun Liao 已提交
1355

D
dapan1121 已提交
1356 1357 1358 1359
  input.code = code;
  input.msgType = qwMsg->msgType;
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);

D
dapan1121 已提交
1360 1361 1362 1363 1364 1365 1366 1367
  if (ctx) {
    QW_UPDATE_RSP_CODE(ctx, code);
    qwReleaseTaskCtx(mgmt, ctx);
  }

  QW_RET(code);
}

H
Hongze Cheng 已提交
1368 1369 1370 1371 1372 1373 1374 1375
int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId,
                                 void **pRsp, SArray *explainRes) {
  SQWorker   *mgmt = (SQWorker *)pMgmt;
  int32_t     code = 0;
  int32_t     dataLen = 0;
  SQWTaskCtx *ctx = NULL;
  void       *rsp = NULL;
  bool        queryStop = false;
D
dapan1121 已提交
1376 1377 1378 1379 1380 1381 1382

  SQWPhaseInput input = {0};

  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));

  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));

D
dapan1121 已提交
1383
  ctx->fetchMsgType = TDMT_SCH_MERGE_FETCH;
D
dapan1121 已提交
1384
  ctx->explainRes = explainRes;
H
Haojun Liao 已提交
1385

D
dapan1121 已提交
1386
  SOutputData sOutput = {0};
H
Haojun Liao 已提交
1387

D
dapan1121 已提交
1388 1389 1390 1391 1392
  while (true) {
    QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));

    if (NULL == rsp) {
      QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop));
H
Haojun Liao 已提交
1393

D
dapan1121 已提交
1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415
      continue;
    } else {
      bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);

      qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
      if (qComplete) {
        atomic_store_8((int8_t *)&ctx->queryEnd, true);
      }

      break;
    }
  }

_return:

  *pRsp = rsp;

  input.code = code;
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL);

  QW_RET(code);
}