qworker.c 32.8 KB
Newer Older
1 2
#include "qworker.h"

dengyihao's avatar
dengyihao 已提交
3
#include "dataSinkMgt.h"
4
#include "executor.h"
D
dapan1121 已提交
5
#include "planner.h"
H
Haojun Liao 已提交
6
#include "query.h"
D
dapan1121 已提交
7 8
#include "qwInt.h"
#include "qwMsg.h"
dengyihao's avatar
dengyihao 已提交
9
#include "tcommon.h"
H
Haojun Liao 已提交
10
#include "tmsg.h"
11
#include "tname.h"
H
Haojun Liao 已提交
12
#include "tdatablock.h"
D
dapan1121 已提交
13

D
dapan1121 已提交
14
SQWorkerMgmt gQwMgmt = {
15 16 17
    .lock = 0,
    .qwRef = -1,
    .qwNum = 0,
D
dapan1121 已提交
18
};
19

H
Haojun Liao 已提交
20 21 22 23 24
static void freeBlock(void* param) {
  SSDataBlock* pBlock = *(SSDataBlock**)param;
  blockDataDestroy(pBlock);
}

D
dapan1121 已提交
25 26 27 28
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
  int32_t         code = 0;
  SSchedulerHbRsp rsp = {0};
  SQWSchStatus   *sch = NULL;
D
dapan1121 已提交
29

D
dapan1121 已提交
30
  QW_ERR_RET(qwAcquireScheduler(mgmt, req->sId, QW_READ, &sch));
D
dapan1121 已提交
31

D
dapan1121 已提交
32
  QW_LOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
33

D
dapan1121 已提交
34
  sch->hbBrokenTs = taosGetTimestampMs();
35

D
dapan1121 已提交
36 37 38 39
  if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
    tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
    sch->hbConnInfo.handle = NULL;
    sch->hbConnInfo.ahandle = NULL;
D
dapan1121 已提交
40

D
dapan1121 已提交
41 42 43
    QW_DLOG("release hb handle due to connection broken, handle:%p", qwMsg->connInfo.handle);
  } else {
    QW_DLOG("ignore hb connection broken, handle:%p, currentHandle:%p", qwMsg->connInfo.handle, sch->hbConnInfo.handle);
D
dapan1121 已提交
44
  }
D
dapan1121 已提交
45

D
dapan1121 已提交
46
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
47

D
dapan1121 已提交
48
  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
49

D
dapan1121 已提交
50
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
51 52
}

53 54
static void freeItem(void *param) {
  SExplainExecInfo *pInfo = param;
H
Haojun Liao 已提交
55 56 57
  taosMemoryFree(pInfo->verboseInfo);
}

D
dapan1121 已提交
58
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
D
dapan1121 已提交
59
  qTaskInfo_t taskHandle = ctx->taskHandle;
D
dapan1121 已提交
60

D
dapan1121 已提交
61
  if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) {
D
dapan1121 已提交
62
    if (ctx->explain) {
63
      SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo));
H
Haojun Liao 已提交
64
      QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList));
65 66 67

      SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
      connInfo.ahandle = NULL;
H
Haojun Liao 已提交
68 69 70
      int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList);
      taosArrayDestroyEx(execInfoList, freeItem);
      QW_ERR_RET(code);
D
dapan1121 已提交
71
    }
D
dapan1121 已提交
72 73 74 75

    if (!ctx->needFetch) {
      dsGetDataLength(ctx->sinkHandle, &ctx->affectedRows, NULL);
    }
D
dapan1121 已提交
76 77 78 79
  }

  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
80

81
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
dengyihao's avatar
dengyihao 已提交
82 83 84 85 86
  int32_t        code = 0;
  bool           qcontinue = true;
  uint64_t       useconds = 0;
  int32_t        i = 0;
  int32_t        execNum = 0;
D
dapan1121 已提交
87
  qTaskInfo_t    taskHandle = ctx->taskHandle;
D
dapan1121 已提交
88
  DataSinkHandle sinkHandle = ctx->sinkHandle;
dengyihao's avatar
dengyihao 已提交
89

90
  SArray *pResList = taosArrayInit(4, POINTER_BYTES);
D
dapan1121 已提交
91
  while (true) {
H
Haojun Liao 已提交
92
    QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
D
dapan1121 已提交
93

D
dapan1121 已提交
94 95
    // if *taskHandle is NULL, it's killed right now
    if (taskHandle) {
D
dapan1121 已提交
96
      qwDbgSimulateSleep();
H
Haojun Liao 已提交
97

98
      code = qExecTaskOpt(taskHandle, pResList, &useconds);
D
dapan1121 已提交
99
      if (code) {
100 101 102 103 104
        if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
          QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
        } else {
          QW_TASK_DLOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
        }
105
        QW_ERR_JRET(code);
D
dapan1121 已提交
106
      }
D
dapan1121 已提交
107 108
    }

D
dapan1121 已提交
109 110
    ++execNum;

H
Haojun Liao 已提交
111
    if (taosArrayGetSize(pResList) == 0) {
dengyihao's avatar
dengyihao 已提交
112
      QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
D
dapan1121 已提交
113
      dsEndPut(sinkHandle, useconds);
D
dapan1121 已提交
114

115
      QW_ERR_JRET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
D
dapan1121 已提交
116

117 118
      if (queryStop) {
        *queryStop = true;
D
dapan1121 已提交
119
      }
dengyihao's avatar
dengyihao 已提交
120

D
dapan1121 已提交
121 122 123
      break;
    }

124
    for (int32_t j = 0; j < taosArrayGetSize(pResList); ++j) {
H
Haojun Liao 已提交
125 126
      SSDataBlock *pRes = taosArrayGetP(pResList, j);
      ASSERT(pRes->info.rows > 0);
H
Haojun Liao 已提交
127

H
Haojun Liao 已提交
128 129 130 131
      SInputData inputData = {.pData = pRes};
      code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
      if (code) {
        QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code));
132
        QW_ERR_JRET(code);
H
Haojun Liao 已提交
133
      }
134

H
Haojun Liao 已提交
135
      QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue);
D
dapan1121 已提交
136
    }
D
dapan1121 已提交
137

D
dapan1121 已提交
138
    if (!qcontinue) {
139 140 141
      if (queryStop) {
        *queryStop = true;
      }
142

D
dapan1121 已提交
143 144 145
      break;
    }

D
dapan1121 已提交
146
    if (ctx->needFetch && (!ctx->queryRsped) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) {
D
dapan1121 已提交
147 148 149
      break;
    }

D
dapan1121 已提交
150
    if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
151 152
      break;
    }
D
dapan1121 已提交
153

D
dapan1121 已提交
154 155 156
    if (atomic_load_32(&ctx->rspCode)) {
      break;
    }
D
dapan1121 已提交
157 158
  }

159
_return:
H
Haojun Liao 已提交
160
  taosArrayDestroyEx(pResList, freeBlock);
D
dapan1121 已提交
161
  QW_RET(code);
D
dapan1121 已提交
162
}
D
dapan1121 已提交
163

D
dapan1121 已提交
164
int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
D
dapan1121 已提交
165 166
  int32_t taskNum = 0;

D
dapan1121 已提交
167
  hbInfo->connInfo = sch->hbConnInfo;
D
dapan1121 已提交
168
  hbInfo->rsp.epId = sch->hbEpId;
D
dapan1121 已提交
169

D
dapan1121 已提交
170
  QW_LOCK(QW_READ, &sch->tasksLock);
dengyihao's avatar
dengyihao 已提交
171

D
dapan1121 已提交
172
  taskNum = taosHashGetSize(sch->tasksHash);
D
dapan1121 已提交
173 174 175

  hbInfo->rsp.taskStatus = taosArrayInit(taskNum, sizeof(STaskStatus));
  if (NULL == hbInfo->rsp.taskStatus) {
D
dapan1121 已提交
176
    QW_UNLOCK(QW_READ, &sch->tasksLock);
D
dapan1121 已提交
177
    QW_ELOG("taosArrayInit taskStatus failed, num:%d", taskNum);
D
dapan1121 已提交
178 179 180
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }

181
  void       *key = NULL;
dengyihao's avatar
dengyihao 已提交
182 183
  size_t      keyLen = 0;
  int32_t     i = 0;
D
dapan1121 已提交
184
  STaskStatus status = {0};
D
dapan1121 已提交
185 186 187 188

  void *pIter = taosHashIterate(sch->tasksHash, NULL);
  while (pIter) {
    SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
D
dapan1121 已提交
189
    key = taosHashGetKey(pIter, &keyLen);
D
dapan1121 已提交
190

dengyihao's avatar
dengyihao 已提交
191
    // TODO GET EXECUTOR API TO GET MORE INFO
D
dapan1121 已提交
192

D
dapan1121 已提交
193
    QW_GET_QTID(key, status.queryId, status.taskId, status.execId);
D
dapan1121 已提交
194 195
    status.status = taskStatus->status;
    status.refId = taskStatus->refId;
dengyihao's avatar
dengyihao 已提交
196

D
dapan1121 已提交
197
    taosArrayPush(hbInfo->rsp.taskStatus, &status);
dengyihao's avatar
dengyihao 已提交
198

D
dapan1121 已提交
199 200
    ++i;
    pIter = taosHashIterate(sch->tasksHash, pIter);
dengyihao's avatar
dengyihao 已提交
201
  }
D
dapan1121 已提交
202 203 204 205 206 207

  QW_UNLOCK(QW_READ, &sch->tasksLock);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
208
int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
D
dapan1121 已提交
209
  int64_t            len = 0;
D
dapan1121 已提交
210
  SRetrieveTableRsp *rsp = NULL;
dengyihao's avatar
dengyihao 已提交
211 212
  bool               queryEnd = false;
  int32_t            code = 0;
213
  SOutputData        output = {0};
D
dapan1121 已提交
214

215
  *dataLen = 0;
D
dapan1121 已提交
216

217 218
  while (true) {
    dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
D
dapan1121 已提交
219

220 221 222 223
    if (len < 0) {
      QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
      QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
    }
dengyihao's avatar
dengyihao 已提交
224

225 226 227 228 229 230 231
    if (len == 0) {
      if (queryEnd) {
        code = dsGetDataBlock(ctx->sinkHandle, &output);
        if (code) {
          QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
          QW_ERR_RET(code);
        }
H
Haojun Liao 已提交
232

233 234
        QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %d", pOutput->numOfBlocks,
                     pOutput->numOfRows);
235

236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
        qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
        if (NULL == rsp) {
          QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
          *pOutput = output;
        } else {
          pOutput->queryEnd = output.queryEnd;
          pOutput->bufStatus = output.bufStatus;
          pOutput->useconds = output.useconds;
        }

        break;
      }

      pOutput->bufStatus = DS_BUF_EMPTY;

      break;
D
dapan1121 已提交
252
    }
D
dapan1121 已提交
253

254 255
    // Got data from sink
    QW_TASK_DLOG("there are data in sink, dataLength:%d", len);
D
dapan1121 已提交
256

257
    *dataLen += len;
D
dapan1121 已提交
258

259
    QW_ERR_RET(qwMallocFetchRsp(*dataLen, &rsp));
D
dapan1121 已提交
260

261 262 263 264 265 266
    output.pData = rsp->data + *dataLen - len;
    code = dsGetDataBlock(ctx->sinkHandle, &output);
    if (code) {
      QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
      QW_ERR_RET(code);
    }
dengyihao's avatar
dengyihao 已提交
267

268 269 270 271 272 273 274 275 276 277
    pOutput->queryEnd = output.queryEnd;
    pOutput->precision = output.precision;
    pOutput->bufStatus = output.bufStatus;
    pOutput->useconds = output.useconds;
    pOutput->compressed = output.compressed;
    pOutput->numOfCols = output.numOfCols;
    pOutput->numOfRows += output.numOfRows;
    pOutput->numOfBlocks++;

    if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
278 279
      QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %d", pOutput->numOfBlocks,
                   pOutput->numOfRows);
280 281 282
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
      break;
    }
dengyihao's avatar
dengyihao 已提交
283

284 285 286 287
    if (0 == ctx->level) {
      QW_TASK_DLOG("task fetched blocks %d rows %d, level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level);
      break;
    }
D
dapan1121 已提交
288

289 290 291 292
    if (pOutput->numOfRows >= QW_MIN_RES_ROWS) {
      QW_TASK_DLOG("task fetched blocks %d rows %d reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows);
      break;
    }
D
dapan1121 已提交
293 294
  }

295 296
  *rspMsg = rsp;

D
dapan1121 已提交
297
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
298 299
}

300
int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes) {
301 302 303 304
  int64_t     len = 0;
  bool        queryEnd = false;
  int32_t     code = 0;
  SOutputData output = {0};
D
dapan1121 已提交
305 306 307

  dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);

D
dapan1121 已提交
308
  if (len <= 0 || len != sizeof(SDeleterRes)) {
D
dapan1121 已提交
309
    QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len);
D
dapan1121 已提交
310 311 312 313 314 315 316
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  output.pData = taosMemoryCalloc(1, len);
  if (NULL == output.pData) {
    QW_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
317

D
dapan1121 已提交
318 319 320 321 322 323 324
  code = dsGetDataBlock(ctx->sinkHandle, &output);
  if (code) {
    QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
    taosMemoryFree(output.pData);
    QW_ERR_RET(code);
  }

325 326
  SDeleterRes *pDelRes = (SDeleterRes *)output.pData;

D
dapan1121 已提交
327
  pRes->suid = pDelRes->suid;
D
dapan1121 已提交
328 329 330
  pRes->uidList = pDelRes->uidList;
  pRes->skey = pDelRes->skey;
  pRes->ekey = pDelRes->ekey;
331
  pRes->affectedRows = pDelRes->affectedRows;
332 333
  strcpy(pRes->tableFName, pDelRes->tableName);
  strcpy(pRes->tsColName, pDelRes->tsColName);
D
dapan1121 已提交
334
  taosMemoryFree(output.pData);
335

D
dapan1121 已提交
336 337 338
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
339
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
340 341
  int32_t     code = 0;
  SQWTaskCtx *ctx = NULL;
D
dapan1121 已提交
342

D
dapan1121 已提交
343
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
344

D
dapan1121 已提交
345
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
346

D
dapan1121 已提交
347
  QW_LOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
348

D
dapan1121 已提交
349
  QW_SET_PHASE(ctx, phase);
D
dapan1121 已提交
350

dengyihao's avatar
dengyihao 已提交
351
  if (atomic_load_8((int8_t *)&ctx->queryEnd)) {
D
dapan1121 已提交
352 353 354
    QW_TASK_ELOG_E("query already end");
    QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
  }
D
dapan1121 已提交
355

D
dapan1121 已提交
356 357
  switch (phase) {
    case QW_PHASE_PRE_QUERY: {
D
dapan1121 已提交
358
      if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
H
Haojun Liao 已提交
359
        QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
360
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
D
dapan1121 已提交
361 362
        break;
      }
D
dapan1121 已提交
363

D
dapan1121 已提交
364
      if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
365
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
366

367 368
        // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
369

D
dapan1121 已提交
370
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan 已提交
371
        break;
D
dapan1121 已提交
372
      }
D
dapan1121 已提交
373

D
dapan1121 已提交
374
      QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC));
D
dapan1121 已提交
375 376
      break;
    }
D
dapan1121 已提交
377
    case QW_PHASE_PRE_FETCH: {
D
dapan1121 已提交
378
      if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
379
        QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
380 381
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
      }
D
dapan1121 已提交
382

D
dapan1121 已提交
383
      if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
384
        QW_TASK_WLOG("last fetch still not processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
385 386 387
        QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
      }

D
dapan1121 已提交
388
      if (!ctx->queryRsped) {
D
dapan1121 已提交
389
        QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
390
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
D
dapan1121 已提交
391 392
      }
      break;
dengyihao's avatar
dengyihao 已提交
393
    }
D
dapan1121 已提交
394
    case QW_PHASE_PRE_CQUERY: {
D
dapan1121 已提交
395
      if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
396
        QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
397
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
398
      }
D
dapan1121 已提交
399

D
dapan1121 已提交
400
      if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
401
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
H
Haojun Liao 已提交
402

403 404
        // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
405

D
dapan1121 已提交
406
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
407
      }
D
dapan1121 已提交
408

D
dapan1121 已提交
409
      break;
D
dapan1121 已提交
410 411 412 413 414 415 416
    }
    default:
      QW_TASK_ELOG("invalid phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

  if (ctx->rspCode) {
417
    QW_TASK_ELOG("task already failed at phase %s, code:%s", qwPhaseStr(phase), tstrerror(ctx->rspCode));
D
dapan1121 已提交
418
    QW_ERR_JRET(ctx->rspCode);
D
dapan1121 已提交
419
  }
D
dapan1121 已提交
420

D
dapan1121 已提交
421
_return:
D
dapan1121 已提交
422

D
dapan1121 已提交
423
  if (ctx) {
D
dapan1121 已提交
424
    QW_UPDATE_RSP_CODE(ctx, code);
dengyihao's avatar
dengyihao 已提交
425

D
dapan1121 已提交
426
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
427 428
    qwReleaseTaskCtx(mgmt, ctx);
  }
D
dapan1121 已提交
429

430 431 432 433 434
  if (code != TSDB_CODE_SUCCESS) {
    QW_TASK_ELOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
  } else {
    QW_TASK_DLOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
  }
D
dapan1121 已提交
435 436 437 438 439

  QW_RET(code);
}

int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
440 441 442
  int32_t        code = 0;
  SQWTaskCtx    *ctx = NULL;
  SRpcHandleInfo connInfo = {0};
D
dapan1121 已提交
443

D
dapan1121 已提交
444
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
dengyihao's avatar
dengyihao 已提交
445

D
dapan1121 已提交
446
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
447

D
dapan1121 已提交
448 449
  QW_LOCK(QW_WRITE, &ctx->lock);

D
dapan1121 已提交
450
  if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
451
    QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
452 453 454
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
  }

D
dapan1121 已提交
455
  if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
456 457 458 459
    if (QW_PHASE_POST_FETCH == phase) {
      QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
D
dapan1121 已提交
460

461 462
    // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
    // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
463

D
dapan1121 已提交
464 465
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
466 467 468
  }

  if (ctx->rspCode) {
dengyihao's avatar
dengyihao 已提交
469 470
    QW_TASK_ELOG("task already failed, phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode,
                 tstrerror(ctx->rspCode));
D
dapan1121 已提交
471
    QW_ERR_JRET(ctx->rspCode);
dengyihao's avatar
dengyihao 已提交
472
  }
D
dapan1121 已提交
473

D
dapan1121 已提交
474
  QW_ERR_JRET(input->code);
D
dapan1121 已提交
475 476 477

_return:

D
dapan1121 已提交
478
  if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
D
dapan1121 已提交
479
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC);
D
dapan1121 已提交
480 481
  }

D
dapan1121 已提交
482
  if (QW_PHASE_POST_QUERY == phase && ctx) {
D
dapan1121 已提交
483
    ctx->queryRsped = true;
D
dapan1121 已提交
484

485
    bool   rsped = false;
D
dapan1121 已提交
486
    SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo};
D
dapan1121 已提交
487 488
    qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
    qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
D
dapan1121 已提交
489 490
    if (!rsped) {
      qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx);
D
dapan1121 已提交
491
      QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
492
    }
493 494
  }

D
dapan1121 已提交
495
  if (ctx) {
D
dapan1121 已提交
496
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
497

D
dapan1121 已提交
498
    QW_SET_PHASE(ctx, phase);
dengyihao's avatar
dengyihao 已提交
499

D
dapan1121 已提交
500
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
501
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
502 503
  }

D
dapan1121 已提交
504
  if (code) {
D
dapan1121 已提交
505
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL);
D
dapan1121 已提交
506 507
  }

D
dapan1121 已提交
508
  QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
D
dapan1121 已提交
509

D
dapan1121 已提交
510 511 512
  QW_RET(code);
}

D
dapan1121 已提交
513 514 515 516 517 518
int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) {
  QW_ERR_RET(qwDropTask(QW_FPARAMS()));

  QW_RET(TSDB_CODE_SUCCESS);
}

D
dapan1121 已提交
519
int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
520 521 522 523 524 525 526
  int32_t        code = 0;
  bool           queryRsped = false;
  SSubplan      *plan = NULL;
  SQWPhaseInput  input = {0};
  qTaskInfo_t    pTaskInfo = NULL;
  DataSinkHandle sinkHandle = NULL;
  SQWTaskCtx    *ctx = NULL;
D
dapan1121 已提交
527

D
dapan1121 已提交
528
  QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
D
dapan1121 已提交
529

D
dapan1121 已提交
530 531 532
  QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS()));

  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
533

D
dapan1121 已提交
534 535
  ctx->ctrlConnInfo = qwMsg->connInfo;

D
dapan1121 已提交
536
  QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
D
dapan1121 已提交
537 538 539 540 541 542 543 544 545 546 547

_return:

  if (ctx) {
    QW_UPDATE_RSP_CODE(ctx, code);
    qwReleaseTaskCtx(mgmt, ctx);
  }

  QW_RET(TSDB_CODE_SUCCESS);
}

548
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
D
dapan1121 已提交
549 550 551 552 553 554 555 556
  int32_t        code = 0;
  bool           queryRsped = false;
  SSubplan      *plan = NULL;
  SQWPhaseInput  input = {0};
  qTaskInfo_t    pTaskInfo = NULL;
  DataSinkHandle sinkHandle = NULL;
  SQWTaskCtx    *ctx = NULL;

D
dapan1121 已提交
557
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
D
dapan1121 已提交
558 559

  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
560

D
dapan1121 已提交
561 562 563
  ctx->taskType = qwMsg->msgInfo.taskType;
  ctx->explain = qwMsg->msgInfo.explain;
  ctx->needFetch = qwMsg->msgInfo.needFetch;
D
dapan1121 已提交
564
  ctx->msgType = qwMsg->msgType;
X
Xiaoyu Wang 已提交
565

566
  // QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
X
Xiaoyu Wang 已提交
567

568
  code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan);
D
dapan1121 已提交
569
  if (TSDB_CODE_SUCCESS != code) {
570 571
    code = TSDB_CODE_INVALID_MSG;
    QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
572
    QW_ERR_JRET(code);
D
dapan1121 已提交
573
  }
dengyihao's avatar
dengyihao 已提交
574

575
  code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
576
  sql = NULL;
D
dapan1121 已提交
577
  if (code) {
D
dapan1121 已提交
578
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
579
    QW_ERR_JRET(code);
D
dapan1121 已提交
580
  }
D
dapan1121 已提交
581

H
Haojun Liao 已提交
582
  if (NULL == sinkHandle || NULL == pTaskInfo) {
D
dapan1121 已提交
583 584 585 586
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

dengyihao's avatar
dengyihao 已提交
587 588
  // QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
  // QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
589

dengyihao's avatar
dengyihao 已提交
590
  // queryRsped = true;
D
dapan1121 已提交
591

592
  ctx->level = plan->level;
D
dapan1121 已提交
593 594 595
  atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
  atomic_store_ptr(&ctx->sinkHandle, sinkHandle);

D
dapan1121 已提交
596
  if (pTaskInfo && sinkHandle) {
597
    qwSaveTbVersionInfo(pTaskInfo, ctx);
D
dapan1121 已提交
598
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
D
dapan1121 已提交
599
  }
dengyihao's avatar
dengyihao 已提交
600

D
dapan1121 已提交
601 602
_return:

D
dapan1121 已提交
603
  taosMemoryFree(sql);
604

D
dapan1121 已提交
605
  input.code = code;
D
dapan1121 已提交
606
  input.msgType = qwMsg->msgType;
D
dapan1121 已提交
607
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
608

dengyihao's avatar
dengyihao 已提交
609
  // if (!queryRsped) {
D
dapan1121 已提交
610 611 612
  //  qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
  //  QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
  //}
D
dapan1121 已提交
613

D
dapan1121 已提交
614
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
615 616
}

D
dapan1121 已提交
617
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
618
  SQWTaskCtx   *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
619
  int32_t       code = 0;
620
  SQWPhaseInput input = {0};
621
  void         *rsp = NULL;
dengyihao's avatar
dengyihao 已提交
622
  int32_t       dataLen = 0;
623
  bool          queryStop = false;
dengyihao's avatar
dengyihao 已提交
624

D
dapan1121 已提交
625
  do {
D
dapan1121 已提交
626
    QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
D
dapan1121 已提交
627

D
dapan1121 已提交
628
    QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
629

dengyihao's avatar
dengyihao 已提交
630 631
    atomic_store_8((int8_t *)&ctx->queryInQueue, 0);
    atomic_store_8((int8_t *)&ctx->queryContinue, 0);
D
dapan1121 已提交
632

633
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop));
D
dapan1121 已提交
634

D
dapan1121 已提交
635
    if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
636
      SOutputData sOutput = {0};
D
dapan1121 已提交
637
      QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
dengyihao's avatar
dengyihao 已提交
638 639

      if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
640
        QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
dengyihao's avatar
dengyihao 已提交
641 642

        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
643
      }
dengyihao's avatar
dengyihao 已提交
644

D
dapan1121 已提交
645
      if (rsp) {
D
dapan1121 已提交
646
        bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
647

D
dapan1121 已提交
648
        qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
649
        if (qComplete) {
dengyihao's avatar
dengyihao 已提交
650
          atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
651
        }
H
Haojun Liao 已提交
652

D
dapan1121 已提交
653
        qwMsg->connInfo = ctx->dataConnInfo;
dengyihao's avatar
dengyihao 已提交
654 655
        QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);

D
dapan1121 已提交
656
        qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code);
D
dapan1121 已提交
657
        rsp = NULL;
658

dengyihao's avatar
dengyihao 已提交
659 660
        QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
                     tstrerror(code), dataLen);
D
dapan1121 已提交
661
      } else {
dengyihao's avatar
dengyihao 已提交
662
        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
663 664 665
      }
    }

dengyihao's avatar
dengyihao 已提交
666
  _return:
667

D
dapan1121 已提交
668 669 670 671
    if (NULL == ctx) {
      break;
    }

D
dapan1121 已提交
672
    if (code && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
dengyihao's avatar
dengyihao 已提交
673
      QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
674 675
      qwFreeFetchRsp(rsp);
      rsp = NULL;
dengyihao's avatar
dengyihao 已提交
676

D
dapan1121 已提交
677
      qwMsg->connInfo = ctx->dataConnInfo;
D
dapan1121 已提交
678
      qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, NULL, 0, code);
dengyihao's avatar
dengyihao 已提交
679 680
      QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
                   0);
681
    }
D
dapan1121 已提交
682

D
dapan1121 已提交
683
    QW_LOCK(QW_WRITE, &ctx->lock);
684
    if (queryStop || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
D
dapan1121 已提交
685 686
      // Note: query is not running anymore
      QW_SET_PHASE(ctx, 0);
dengyihao's avatar
dengyihao 已提交
687
      QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
688 689
      break;
    }
dengyihao's avatar
dengyihao 已提交
690
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
691
  } while (true);
D
dapan1121 已提交
692

D
dapan1121 已提交
693
  input.code = code;
dengyihao's avatar
dengyihao 已提交
694
  qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL);
D
dapan1121 已提交
695

dengyihao's avatar
dengyihao 已提交
696
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
697
}
D
dapan1121 已提交
698

D
dapan1121 已提交
699
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
700 701 702
  int32_t       code = 0;
  int32_t       dataLen = 0;
  bool          locked = false;
703 704
  SQWTaskCtx   *ctx = NULL;
  void         *rsp = NULL;
D
dapan1121 已提交
705
  SQWPhaseInput input = {0};
D
dapan1121 已提交
706

D
dapan1121 已提交
707
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
708

709
  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
710

D
dapan1121 已提交
711
  ctx->msgType = qwMsg->msgType;
D
dapan1121 已提交
712
  ctx->dataConnInfo = qwMsg->connInfo;
D
dapan1121 已提交
713

D
dapan 已提交
714
  SOutputData sOutput = {0};
D
dapan1121 已提交
715
  QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
D
dapan1121 已提交
716

717 718
  if (NULL == rsp) {
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
D
dapan1121 已提交
719
  } else {
D
dapan1121 已提交
720
    bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
721

D
dapan1121 已提交
722
    qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
723
    if (qComplete) {
dengyihao's avatar
dengyihao 已提交
724
      atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
725
    }
D
dapan1121 已提交
726 727
  }

dengyihao's avatar
dengyihao 已提交
728
  if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
729
    QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
D
dapan1121 已提交
730

D
dapan1121 已提交
731 732
    QW_LOCK(QW_WRITE, &ctx->lock);
    locked = true;
733

D
dapan1121 已提交
734
    // RC WARNING
D
dapan1121 已提交
735
    if (QW_QUERY_RUNNING(ctx)) {
dengyihao's avatar
dengyihao 已提交
736 737
      atomic_store_8((int8_t *)&ctx->queryContinue, 1);
    } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
D
dapan1121 已提交
738
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC);
D
dapan1121 已提交
739

dengyihao's avatar
dengyihao 已提交
740 741
      atomic_store_8((int8_t *)&ctx->queryInQueue, 1);

D
dapan1121 已提交
742
      QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
743
    }
D
dapan 已提交
744
  }
dengyihao's avatar
dengyihao 已提交
745

D
dapan1121 已提交
746
_return:
D
dapan1121 已提交
747

D
dapan1121 已提交
748 749 750 751 752
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

  input.code = code;
D
dapan1121 已提交
753
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL);
D
dapan1121 已提交
754

D
dapan 已提交
755 756 757
  if (code) {
    qwFreeFetchRsp(rsp);
    rsp = NULL;
D
dapan1121 已提交
758
    dataLen = 0;
D
dapan1121 已提交
759 760 761
  }

  if (code || rsp) {
D
dapan1121 已提交
762
    bool rsped = false;
D
dapan1121 已提交
763
    if (ctx) {
764
      qwDbgSimulateRedirect(qwMsg, ctx, &rsped);
D
dapan1121 已提交
765 766
      qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
    }
D
dapan1121 已提交
767 768
    if (!rsped) {
      qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
769 770
      QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1),
                   qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
D
dapan1121 已提交
771
    }
D
dapan1121 已提交
772 773
  }

D
dapan1121 已提交
774
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
775
}
D
dapan1121 已提交
776

D
dapan1121 已提交
777
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
778
  int32_t     code = 0;
D
dapan1121 已提交
779
  bool        dropped = false;
D
dapan1121 已提交
780
  SQWTaskCtx *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
781
  bool        locked = false;
D
dapan1121 已提交
782

D
dapan1121 已提交
783
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
784

D
dapan1121 已提交
785 786 787 788
  QW_LOCK(QW_WRITE, &ctx->lock);

  locked = true;

D
dapan1121 已提交
789
  if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
790
    QW_TASK_WLOG_E("task already dropping");
D
dapan1121 已提交
791 792 793
    QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
  }

D
dapan1121 已提交
794
  if (QW_QUERY_RUNNING(ctx)) {
D
dapan1121 已提交
795
    QW_ERR_JRET(qwKillTaskHandle(ctx));
D
dapan1121 已提交
796
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP);
D
dapan1121 已提交
797
  } else {
D
dapan1121 已提交
798
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
799
    dropped = true;
D
dapan1121 已提交
800
  }
D
dapan1121 已提交
801

D
dapan1121 已提交
802
  if (!dropped) {
D
dapan1121 已提交
803 804
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
  }
805

D
dapan1121 已提交
806
_return:
D
dapan1121 已提交
807

D
dapan1121 已提交
808
  if (code) {
D
dapan1121 已提交
809 810 811
    if (ctx) {
      QW_UPDATE_RSP_CODE(ctx, code);
    }
H
Haojun Liao 已提交
812

D
dapan1121 已提交
813
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL);
D
dapan1121 已提交
814 815
  }

D
dapan 已提交
816 817 818 819
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

D
dapan1121 已提交
820
  if (ctx) {
D
dapan1121 已提交
821
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
822 823
  }

D
dapan1121 已提交
824 825 826
  QW_RET(TSDB_CODE_SUCCESS);
}

D
dapan1121 已提交
827
int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
dengyihao's avatar
dengyihao 已提交
828
  int32_t         code = 0;
D
dapan1121 已提交
829
  SSchedulerHbRsp rsp = {0};
830
  SQWSchStatus   *sch = NULL;
D
dapan1121 已提交
831

D
dapan1121 已提交
832 833 834
  if (qwMsg->code) {
    QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req));
  }
D
dapan1121 已提交
835 836

  QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
D
dapan1121 已提交
837 838
  QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo));

D
dapan1121 已提交
839 840
  sch->hbBrokenTs = 0;

D
dapan1121 已提交
841
  QW_LOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
842

D
dapan1121 已提交
843
  if (sch->hbConnInfo.handle) {
844
    tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
S
Shengliang Guan 已提交
845
    sch->hbConnInfo.handle = NULL;
D
dapan1121 已提交
846
  }
D
dapan1121 已提交
847

D
dapan1121 已提交
848
  memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
D
dapan1121 已提交
849
  memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId));
dengyihao's avatar
dengyihao 已提交
850

D
dapan1121 已提交
851
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
dengyihao's avatar
dengyihao 已提交
852 853 854

  QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", req->sId,
          req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle);
D
dapan1121 已提交
855

D
dapan1121 已提交
856 857 858 859
  qwReleaseScheduler(QW_READ, mgmt);

_return:

D
dapan1121 已提交
860 861
  memcpy(&rsp.epId, &req->epId, sizeof(req->epId));

S
shm  
Shengliang Guan 已提交
862
  qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
D
dapan1121 已提交
863 864

  if (code) {
865
    tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER);
S
Shengliang Guan 已提交
866
    qwMsg->connInfo.handle = NULL;
D
dapan1121 已提交
867
  }
dengyihao's avatar
dengyihao 已提交
868

869
  /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));*/
dengyihao's avatar
dengyihao 已提交
870

D
dapan1121 已提交
871
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
872 873 874
}

void qwProcessHbTimerEvent(void *param, void *tmrId) {
875
  SQWHbParam *hbParam = (SQWHbParam *)param;
D
dapan1121 已提交
876 877 878 879
  if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) {
    return;
  }

880
  int64_t   refId = hbParam->refId;
D
dapan1121 已提交
881 882 883 884 885
  SQWorker *mgmt = qwAcquire(refId);
  if (NULL == mgmt) {
    QW_DLOG("qwAcquire %" PRIx64 "failed", refId);
    return;
  }
886

D
dapan1121 已提交
887
  SQWSchStatus *sch = NULL;
dengyihao's avatar
dengyihao 已提交
888
  int32_t       taskNum = 0;
889
  SQWHbInfo    *rspList = NULL;
D
dapan1121 已提交
890
  SArray       *pExpiredSch = NULL;
dengyihao's avatar
dengyihao 已提交
891
  int32_t       code = 0;
D
dapan1121 已提交
892

D
dapan1121 已提交
893 894
  qwDbgDumpMgmtInfo(mgmt);

D
dapan1121 已提交
895 896 897 898 899
  QW_LOCK(QW_READ, &mgmt->schLock);

  int32_t schNum = taosHashGetSize(mgmt->schHash);
  if (schNum <= 0) {
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
900
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
901
    qwRelease(refId);
D
dapan1121 已提交
902
    return;
D
dapan1121 已提交
903 904
  }

wafwerar's avatar
wafwerar 已提交
905
  rspList = taosMemoryCalloc(schNum, sizeof(SQWHbInfo));
D
dapan1121 已提交
906 907
  pExpiredSch = taosArrayInit(schNum, sizeof(uint64_t));
  if (NULL == rspList || NULL == pExpiredSch) {
D
dapan1121 已提交
908
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
909 910
    taosMemoryFree(rspList);
    taosArrayDestroy(pExpiredSch);
D
dapan1121 已提交
911 912
    QW_ELOG("calloc %d SQWHbInfo failed", schNum);
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
913
    qwRelease(refId);
D
dapan1121 已提交
914
    return;
D
dapan1121 已提交
915 916
  }

917
  void   *key = NULL;
dengyihao's avatar
dengyihao 已提交
918
  size_t  keyLen = 0;
D
dapan1121 已提交
919
  int32_t i = 0;
D
dapan1121 已提交
920
  int64_t currentMs = taosGetTimestampMs();
D
dapan1121 已提交
921 922 923

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
H
Haojun Liao 已提交
924 925
    SQWSchStatus *sch1 = (SQWSchStatus *)pIter;
    if (NULL == sch1->hbConnInfo.handle) {
D
dapan1121 已提交
926
      uint64_t *sId = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
927
      QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
D
dapan1121 已提交
928

H
Haojun Liao 已提交
929 930
      if (sch1->hbBrokenTs > 0 && ((currentMs - sch1->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) &&
          taosHashGetSize(sch1->tasksHash) <= 0) {
D
dapan1121 已提交
931 932
        taosArrayPush(pExpiredSch, sId);
      }
933

D
dapan1121 已提交
934 935 936
      pIter = taosHashIterate(mgmt->schHash, pIter);
      continue;
    }
dengyihao's avatar
dengyihao 已提交
937

D
dapan1121 已提交
938 939 940 941 942 943 944 945 946 947 948 949 950 951 952
    code = qwGenerateSchHbRsp(mgmt, (SQWSchStatus *)pIter, &rspList[i]);
    if (code) {
      taosHashCancelIterate(mgmt->schHash, pIter);
      QW_ERR_JRET(code);
    }

    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

_return:

  QW_UNLOCK(QW_READ, &mgmt->schLock);

  for (int32_t j = 0; j < i; ++j) {
S
shm  
Shengliang Guan 已提交
953
    qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code);
954 955
    /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),*/
    /*(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));*/
D
dapan1121 已提交
956
    tFreeSSchedulerHbRsp(&rspList[j].rsp);
D
dapan1121 已提交
957 958
  }

D
dapan1121 已提交
959
  if (taosArrayGetSize(pExpiredSch) > 0) {
D
dapan1121 已提交
960
    qwClearExpiredSch(mgmt, pExpiredSch);
D
dapan1121 已提交
961 962
  }

wafwerar's avatar
wafwerar 已提交
963
  taosMemoryFreeClear(rspList);
D
dapan1121 已提交
964
  taosArrayDestroy(pExpiredSch);
D
dapan1121 已提交
965

dengyihao's avatar
dengyihao 已提交
966
  taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
967
  qwRelease(refId);
D
dapan1121 已提交
968 969
}

970
int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
D
dapan1121 已提交
971 972 973 974 975 976
  int32_t        code = 0;
  SSubplan      *plan = NULL;
  qTaskInfo_t    pTaskInfo = NULL;
  DataSinkHandle sinkHandle = NULL;
  SQWTaskCtx     ctx = {0};

977
  code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan);
D
dapan1121 已提交
978 979 980 981 982 983
  if (TSDB_CODE_SUCCESS != code) {
    code = TSDB_CODE_INVALID_MSG;
    QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
    QW_ERR_JRET(code);
  }

984
  code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
985 986 987 988 989 990 991 992 993 994
  if (code) {
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
    QW_ERR_JRET(code);
  }

  if (NULL == sinkHandle || NULL == pTaskInfo) {
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

D
dapan1121 已提交
995 996
  ctx.taskHandle = pTaskInfo;
  ctx.sinkHandle = sinkHandle;
D
dapan1121 已提交
997

D
dapan1121 已提交
998
  QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx, NULL));
D
dapan1121 已提交
999

1000
  QW_ERR_JRET(qwGetDeleteResFromSink(QW_FPARAMS(), &ctx, pRes));
D
dapan1121 已提交
1001 1002 1003

_return:

D
dapan1121 已提交
1004
  qwFreeTaskCtx(&ctx);
D
dapan1121 已提交
1005 1006 1007 1008

  QW_RET(TSDB_CODE_SUCCESS);
}

S
Shengliang Guan 已提交
1009
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
1010
  if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) {
D
dapan1121 已提交
1011 1012 1013
    qError("invalid param to init qworker");
    QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
S
Shengliang 已提交
1014

D
dapan1121 已提交
1015 1016 1017 1018
  int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1);
  if (1 == qwNum) {
    memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param));
  }
D
dapan1121 已提交
1019 1020 1021 1022 1023 1024 1025 1026

  int32_t code = qwOpenRef();
  if (code) {
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
    QW_RET(code);
  }

  SQWorker *mgmt = taosMemoryCalloc(1, sizeof(SQWorker));
D
dapan1121 已提交
1027
  if (NULL == mgmt) {
D
dapan1121 已提交
1028 1029
    qError("calloc %d failed", (int32_t)sizeof(SQWorker));
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
D
dapan1121 已提交
1030
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1031 1032 1033 1034
  }

  if (cfg) {
    mgmt->cfg = *cfg;
D
dapan1121 已提交
1035
    if (0 == mgmt->cfg.maxSchedulerNum) {
D
dapan1121 已提交
1036
      mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
D
dapan1121 已提交
1037 1038
    }
    if (0 == mgmt->cfg.maxTaskNum) {
D
dapan1121 已提交
1039
      mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
D
dapan1121 已提交
1040 1041
    }
    if (0 == mgmt->cfg.maxSchTaskNum) {
D
dapan1121 已提交
1042
      mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
1043
    }
D
dapan1121 已提交
1044
  } else {
D
dapan1121 已提交
1045 1046 1047
    mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
    mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
    mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
1048 1049
  }

dengyihao's avatar
dengyihao 已提交
1050 1051
  mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                               HASH_ENTRY_LOCK);
D
dapan1121 已提交
1052
  if (NULL == mgmt->schHash) {
wafwerar's avatar
wafwerar 已提交
1053
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1054
    qError("init %d scheduler hash failed", mgmt->cfg.maxSchedulerNum);
D
dapan1121 已提交
1055
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1056 1057
  }

dengyihao's avatar
dengyihao 已提交
1058 1059
  mgmt->ctxHash =
      taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1060
  if (NULL == mgmt->ctxHash) {
D
dapan1121 已提交
1061
    qError("init %d task ctx hash failed", mgmt->cfg.maxTaskNum);
D
dapan1121 已提交
1062 1063 1064 1065 1066 1067 1068 1069 1070
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  mgmt->timer = taosTmrInit(0, 0, 0, "qworker");
  if (NULL == mgmt->timer) {
    qError("init timer failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1071 1072
  mgmt->nodeType = nodeType;
  mgmt->nodeId = nodeId;
S
Shengliang Guan 已提交
1073
  mgmt->msgCb = *pMsgCb;
D
dapan1121 已提交
1074

D
dapan1121 已提交
1075 1076 1077 1078 1079 1080
  mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt);
  if (mgmt->refId < 0) {
    qError("taosAddRef qw failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(terrno);
  }

D
dapan1121 已提交
1081 1082 1083
  SQWHbParam *param = NULL;
  qwSetHbParam(mgmt->refId, &param);

1084
  mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void *)param, mgmt->timer);
D
dapan1121 已提交
1085 1086 1087 1088
  if (NULL == mgmt->hbTimer) {
    qError("start hb timer failed");
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
1089

D
dapan1121 已提交
1090 1091
  *qWorkerMgmt = mgmt;

S
Shengliang Guan 已提交
1092
  qDebug("qworker initialized, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt);
D
dapan1121 已提交
1093

D
dapan1121 已提交
1094
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1095 1096 1097

_return:

D
dapan1121 已提交
1098 1099 1100 1101 1102 1103 1104
  if (mgmt->refId >= 0) {
    qwRelease(mgmt->refId);
  } else {
    taosHashCleanup(mgmt->schHash);
    taosHashCleanup(mgmt->ctxHash);
    taosTmrCleanUp(mgmt->timer);
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1105

1106
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
D
dapan1121 已提交
1107
  }
1108

D
dapan1121 已提交
1109
  QW_RET(code);
D
dapan1121 已提交
1110
}
D
dapan1121 已提交
1111 1112 1113 1114

void qWorkerDestroy(void **qWorkerMgmt) {
  if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
    return;
D
dapan1121 已提交
1115
  }
D
dapan 已提交
1116

D
dapan1121 已提交
1117
  SQWorker *mgmt = *qWorkerMgmt;
D
dapan1121 已提交
1118

D
dapan1121 已提交
1119 1120 1121
  if (taosRemoveRef(gQwMgmt.qwRef, mgmt->refId)) {
    qError("remove qw from ref list failed, refId:%" PRIx64, mgmt->refId);
  }
D
dapan1121 已提交
1122
}
D
dapan1121 已提交
1123

D
dapan1121 已提交
1124 1125 1126 1127 1128
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat) {
  if (NULL == handle || NULL == qWorkerMgmt || NULL == pStat) {
    QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

1129
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
1130
  SDataSinkStat sinkStat = {0};
1131

D
dapan1121 已提交
1132 1133
  dsDataSinkGetCacheSize(&sinkStat);
  pStat->cacheDataSize = sinkStat.cachedSize;
1134

D
dapan1121 已提交
1135 1136 1137 1138 1139
  pStat->queryProcessed = QW_STAT_GET(mgmt->stat.msgStat.queryProcessed);
  pStat->cqueryProcessed = QW_STAT_GET(mgmt->stat.msgStat.cqueryProcessed);
  pStat->fetchProcessed = QW_STAT_GET(mgmt->stat.msgStat.fetchProcessed);
  pStat->dropProcessed = QW_STAT_GET(mgmt->stat.msgStat.dropProcessed);
  pStat->hbProcessed = QW_STAT_GET(mgmt->stat.msgStat.hbProcessed);
D
dapan1121 已提交
1140
  pStat->deleteProcessed = QW_STAT_GET(mgmt->stat.msgStat.deleteProcessed);
D
dapan1121 已提交
1141 1142 1143 1144 1145 1146 1147

  pStat->numOfQueryInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, QUERY_QUEUE);
  pStat->numOfFetchInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, FETCH_QUEUE);
  pStat->timeInQueryQueue = qwGetTimeInQueue((SQWorker *)qWorkerMgmt, QUERY_QUEUE);
  pStat->timeInFetchQueue = qwGetTimeInQueue((SQWorker *)qWorkerMgmt, FETCH_QUEUE);

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1148
}