qworker.c 31.0 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1
#include "dataSinkMgt.h"
2
#include "executor.h"
D
dapan1121 已提交
3
#include "planner.h"
H
Haojun Liao 已提交
4
#include "query.h"
D
dapan1121 已提交
5 6
#include "qwInt.h"
#include "qwMsg.h"
dengyihao's avatar
dengyihao 已提交
7
#include "tcommon.h"
H
Haojun Liao 已提交
8
#include "tmsg.h"
9
#include "tname.h"
D
dapan1121 已提交
10
#include "qworker.h"
D
dapan1121 已提交
11

D
dapan1121 已提交
12
SQWorkerMgmt gQwMgmt = {
13 14 15
    .lock = 0,
    .qwRef = -1,
    .qwNum = 0,
D
dapan1121 已提交
16
};
17

dengyihao's avatar
dengyihao 已提交
18

D
dapan1121 已提交
19 20 21 22
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
  int32_t         code = 0;
  SSchedulerHbRsp rsp = {0};
  SQWSchStatus   *sch = NULL;
D
dapan1121 已提交
23

D
dapan1121 已提交
24
  QW_ERR_RET(qwAcquireScheduler(mgmt, req->sId, QW_READ, &sch));
D
dapan1121 已提交
25

D
dapan1121 已提交
26
  QW_LOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
27

D
dapan1121 已提交
28 29
  sch->hbBrokenTs = taosGetTimestampMs();
  
D
dapan1121 已提交
30 31 32 33
  if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
    tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
    sch->hbConnInfo.handle = NULL;
    sch->hbConnInfo.ahandle = NULL;
D
dapan1121 已提交
34

D
dapan1121 已提交
35 36 37
    QW_DLOG("release hb handle due to connection broken, handle:%p", qwMsg->connInfo.handle);
  } else {
    QW_DLOG("ignore hb connection broken, handle:%p, currentHandle:%p", qwMsg->connInfo.handle, sch->hbConnInfo.handle);
D
dapan1121 已提交
38
  }
D
dapan1121 已提交
39

D
dapan1121 已提交
40
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
41

D
dapan1121 已提交
42
  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
43

D
dapan1121 已提交
44
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
45 46
}

D
dapan1121 已提交
47
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
D
dapan1121 已提交
48
  qTaskInfo_t taskHandle = ctx->taskHandle;
D
dapan1121 已提交
49

D
dapan1121 已提交
50
  if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) {
D
dapan1121 已提交
51 52
    if (ctx->explain) {
      SExplainExecInfo *execInfo = NULL;
dengyihao's avatar
dengyihao 已提交
53
      int32_t           resNum = 0;
D
dapan1121 已提交
54
      QW_ERR_RET(qGetExplainExecInfo(taskHandle, &resNum, &execInfo));
55 56 57 58

      SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
      connInfo.ahandle = NULL;
      QW_ERR_RET(qwBuildAndSendExplainRsp(&connInfo, execInfo, resNum));
D
dapan1121 已提交
59
    }
D
dapan1121 已提交
60 61 62 63

    if (!ctx->needFetch) {
      dsGetDataLength(ctx->sinkHandle, &ctx->affectedRows, NULL);
    }
D
dapan1121 已提交
64 65 66 67 68
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
69
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
dengyihao's avatar
dengyihao 已提交
70 71
  int32_t        code = 0;
  bool           qcontinue = true;
72
  SSDataBlock   *pRes = NULL;
dengyihao's avatar
dengyihao 已提交
73 74 75
  uint64_t       useconds = 0;
  int32_t        i = 0;
  int32_t        execNum = 0;
D
dapan1121 已提交
76
  qTaskInfo_t    taskHandle = ctx->taskHandle;
D
dapan1121 已提交
77
  DataSinkHandle sinkHandle = ctx->sinkHandle;
dengyihao's avatar
dengyihao 已提交
78

D
dapan1121 已提交
79
  while (true) {
H
Haojun Liao 已提交
80
    QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
D
dapan1121 已提交
81

D
dapan1121 已提交
82 83 84 85 86 87
    pRes = NULL;

    // if *taskHandle is NULL, it's killed right now
    if (taskHandle) {
      code = qExecTask(taskHandle, &pRes, &useconds);
      if (code) {
88 89 90 91 92
        if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
          QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
        } else {
          QW_TASK_DLOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
        }
D
dapan1121 已提交
93 94
        QW_ERR_RET(code);
      }
D
dapan1121 已提交
95 96
    }

D
dapan1121 已提交
97 98
    ++execNum;

D
dapan1121 已提交
99
    if (NULL == pRes) {
dengyihao's avatar
dengyihao 已提交
100
      QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
H
Haojun Liao 已提交
101

D
dapan1121 已提交
102
      dsEndPut(sinkHandle, useconds);
D
dapan1121 已提交
103 104

      QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
D
dapan1121 已提交
105 106 107 108

      if (queryEnd) {
        *queryEnd = true;
      }
dengyihao's avatar
dengyihao 已提交
109

D
dapan1121 已提交
110 111 112
      break;
    }

D
dapan1121 已提交
113
    int32_t rows = pRes->info.rows;
H
Haojun Liao 已提交
114

115 116
    ASSERT(pRes->info.rows > 0);

H
Haojun Liao 已提交
117
    SInputData inputData = {.pData = pRes};
D
dapan1121 已提交
118 119
    code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
    if (code) {
D
dapan1121 已提交
120 121
      QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code));
      QW_ERR_RET(code);
D
dapan1121 已提交
122
    }
D
dapan1121 已提交
123

D
dapan1121 已提交
124
    QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", rows, qcontinue);
dengyihao's avatar
dengyihao 已提交
125

D
dapan1121 已提交
126 127 128 129
    if (!qcontinue) {
      break;
    }

D
dapan1121 已提交
130
    if (ctx->needFetch && (!ctx->queryRsped) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) {
D
dapan1121 已提交
131 132 133
      break;
    }

D
dapan1121 已提交
134
    if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
135 136
      break;
    }
D
dapan1121 已提交
137

D
dapan1121 已提交
138 139 140
    if (atomic_load_32(&ctx->rspCode)) {
      break;
    }
D
dapan1121 已提交
141 142 143 144
  }

  QW_RET(code);
}
D
dapan1121 已提交
145

D
dapan1121 已提交
146
int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
D
dapan1121 已提交
147 148
  int32_t taskNum = 0;

D
dapan1121 已提交
149
  hbInfo->connInfo = sch->hbConnInfo;
D
dapan1121 已提交
150
  hbInfo->rsp.epId = sch->hbEpId;
D
dapan1121 已提交
151

D
dapan1121 已提交
152
  QW_LOCK(QW_READ, &sch->tasksLock);
dengyihao's avatar
dengyihao 已提交
153

D
dapan1121 已提交
154
  taskNum = taosHashGetSize(sch->tasksHash);
D
dapan1121 已提交
155 156 157

  hbInfo->rsp.taskStatus = taosArrayInit(taskNum, sizeof(STaskStatus));
  if (NULL == hbInfo->rsp.taskStatus) {
D
dapan1121 已提交
158
    QW_UNLOCK(QW_READ, &sch->tasksLock);
D
dapan1121 已提交
159
    QW_ELOG("taosArrayInit taskStatus failed, num:%d", taskNum);
D
dapan1121 已提交
160 161 162
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }

163
  void       *key = NULL;
dengyihao's avatar
dengyihao 已提交
164 165
  size_t      keyLen = 0;
  int32_t     i = 0;
D
dapan1121 已提交
166
  STaskStatus status = {0};
D
dapan1121 已提交
167 168 169 170

  void *pIter = taosHashIterate(sch->tasksHash, NULL);
  while (pIter) {
    SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
D
dapan1121 已提交
171
    key = taosHashGetKey(pIter, &keyLen);
D
dapan1121 已提交
172

dengyihao's avatar
dengyihao 已提交
173
    // TODO GET EXECUTOR API TO GET MORE INFO
D
dapan1121 已提交
174

D
dapan1121 已提交
175
    QW_GET_QTID(key, status.queryId, status.taskId, status.execId);
D
dapan1121 已提交
176 177
    status.status = taskStatus->status;
    status.refId = taskStatus->refId;
dengyihao's avatar
dengyihao 已提交
178

D
dapan1121 已提交
179
    taosArrayPush(hbInfo->rsp.taskStatus, &status);
dengyihao's avatar
dengyihao 已提交
180

D
dapan1121 已提交
181 182
    ++i;
    pIter = taosHashIterate(sch->tasksHash, pIter);
dengyihao's avatar
dengyihao 已提交
183
  }
D
dapan1121 已提交
184 185 186 187 188 189

  QW_UNLOCK(QW_READ, &sch->tasksLock);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
190
int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
D
dapan1121 已提交
191
  int64_t            len = 0;
D
dapan1121 已提交
192
  SRetrieveTableRsp *rsp = NULL;
dengyihao's avatar
dengyihao 已提交
193 194
  bool               queryEnd = false;
  int32_t            code = 0;
D
dapan1121 已提交
195

D
dapan1121 已提交
196
  dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
D
dapan1121 已提交
197

D
dapan1121 已提交
198 199 200 201
  if (len < 0) {
    QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
D
dapan1121 已提交
202

D
dapan1121 已提交
203 204
  if (len == 0) {
    if (queryEnd) {
D
dapan 已提交
205
      code = dsGetDataBlock(ctx->sinkHandle, pOutput);
D
dapan1121 已提交
206
      if (code) {
D
dapan1121 已提交
207
        QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
208 209
        QW_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
210

D
dapan1121 已提交
211
      QW_TASK_DLOG_E("no data in sink and query end");
H
Haojun Liao 已提交
212

D
dapan1121 已提交
213
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
L
Liu Jicong 已提交
214
      QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
215

D
dapan1121 已提交
216
      *rspMsg = rsp;
D
dapan 已提交
217
      *dataLen = 0;
D
dapan1121 已提交
218 219
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
220 221

    pOutput->bufStatus = DS_BUF_EMPTY;
D
dapan1121 已提交
222

D
dapan1121 已提交
223
    return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
224
  }
D
dapan1121 已提交
225

D
dapan1121 已提交
226
  // Got data from sink
D
dapan1121 已提交
227
  QW_TASK_DLOG("there are data in sink, dataLength:%d", len);
D
dapan1121 已提交
228

D
dapan 已提交
229
  *dataLen = len;
dengyihao's avatar
dengyihao 已提交
230

D
dapan1121 已提交
231
  QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
D
dapan 已提交
232
  *rspMsg = rsp;
dengyihao's avatar
dengyihao 已提交
233

D
dapan 已提交
234 235
  pOutput->pData = rsp->data;
  code = dsGetDataBlock(ctx->sinkHandle, pOutput);
D
dapan1121 已提交
236
  if (code) {
D
dapan1121 已提交
237
    QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
238 239
    QW_ERR_RET(code);
  }
D
dapan1121 已提交
240

D
dapan1121 已提交
241
  if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
D
dapan1121 已提交
242
    QW_TASK_DLOG_E("task all data fetched, done");
D
dapan1121 已提交
243
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
D
dapan1121 已提交
244 245
  }

D
dapan1121 已提交
246
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
247 248
}

249
int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes) {
D
dapan1121 已提交
250
  int64_t            len = 0;
D
dapan1121 已提交
251 252 253 254 255 256
  bool               queryEnd = false;
  int32_t            code = 0;
  SOutputData        output = {0};

  dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);

D
dapan1121 已提交
257
  if (len <= 0 || len != sizeof(SDeleterRes)) {
D
dapan1121 已提交
258
    QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len);
D
dapan1121 已提交
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  output.pData = taosMemoryCalloc(1, len);
  if (NULL == output.pData) {
    QW_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
  
  code = dsGetDataBlock(ctx->sinkHandle, &output);
  if (code) {
    QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
    taosMemoryFree(output.pData);
    QW_ERR_RET(code);
  }

D
dapan1121 已提交
274 275
  SDeleterRes* pDelRes = (SDeleterRes*)output.pData;
  
D
dapan1121 已提交
276
  pRes->suid = pDelRes->suid;
D
dapan1121 已提交
277 278 279
  pRes->uidList = pDelRes->uidList;
  pRes->skey = pDelRes->skey;
  pRes->ekey = pDelRes->ekey;
280
  pRes->affectedRows = pDelRes->affectedRows;
D
dapan1121 已提交
281 282 283 284
  
  return TSDB_CODE_SUCCESS;
}

285

D
dapan1121 已提交
286
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
287 288
  int32_t         code = 0;
  SQWTaskCtx     *ctx = NULL;
D
dapan1121 已提交
289

D
dapan1121 已提交
290
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
291

D
dapan1121 已提交
292
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
293

D
dapan1121 已提交
294
  QW_LOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
295

D
dapan1121 已提交
296
  if (QW_PHASE_PRE_FETCH == phase) {
dengyihao's avatar
dengyihao 已提交
297
    atomic_store_8((int8_t *)&ctx->queryFetched, true);
D
dapan1121 已提交
298
  } else {
D
dapan1121 已提交
299 300
    atomic_store_8(&ctx->phase, phase);
  }
D
dapan1121 已提交
301

dengyihao's avatar
dengyihao 已提交
302
  if (atomic_load_8((int8_t *)&ctx->queryEnd)) {
D
dapan1121 已提交
303 304 305
    QW_TASK_ELOG_E("query already end");
    QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
  }
D
dapan1121 已提交
306

D
dapan1121 已提交
307 308
  switch (phase) {
    case QW_PHASE_PRE_QUERY: {
D
dapan1121 已提交
309
      if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
H
Haojun Liao 已提交
310
        QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
311
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
D
dapan1121 已提交
312 313
        break;
      }
D
dapan1121 已提交
314

D
dapan1121 已提交
315
      if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
316
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
317

D
dapan1121 已提交
318 319
        //qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        //QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
320

D
dapan1121 已提交
321
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan 已提交
322
        break;
D
dapan1121 已提交
323
      }
D
dapan1121 已提交
324

D
dapan1121 已提交
325
      QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC));
D
dapan1121 已提交
326 327
      break;
    }
D
dapan1121 已提交
328
    case QW_PHASE_PRE_FETCH: {
D
dapan1121 已提交
329
      if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
330
        QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
331 332
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
      }
D
dapan1121 已提交
333

D
dapan1121 已提交
334
      if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
335
        QW_TASK_WLOG("last fetch still not processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
336 337 338
        QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
      }

D
dapan1121 已提交
339
      if (!ctx->queryRsped) {
D
dapan1121 已提交
340
        QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
341
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
D
dapan1121 已提交
342 343
      }
      break;
dengyihao's avatar
dengyihao 已提交
344
    }
D
dapan1121 已提交
345
    case QW_PHASE_PRE_CQUERY: {
D
dapan1121 已提交
346
      if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
347
        QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
348
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
349
      }
D
dapan1121 已提交
350

D
dapan1121 已提交
351
      if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
352
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
H
Haojun Liao 已提交
353

D
dapan1121 已提交
354 355
        //qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        //QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
356

D
dapan1121 已提交
357
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
358
      }
D
dapan1121 已提交
359

D
dapan1121 已提交
360
      break;
D
dapan1121 已提交
361 362 363 364 365 366 367
    }
    default:
      QW_TASK_ELOG("invalid phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

  if (ctx->rspCode) {
368
    QW_TASK_ELOG("task already failed at phase %s, code:%s", qwPhaseStr(phase), tstrerror(ctx->rspCode));
D
dapan1121 已提交
369
    QW_ERR_JRET(ctx->rspCode);
D
dapan1121 已提交
370
  }
D
dapan1121 已提交
371

D
dapan1121 已提交
372 373
_return:
  if (ctx) {
D
dapan1121 已提交
374
    QW_UPDATE_RSP_CODE(ctx, code);
dengyihao's avatar
dengyihao 已提交
375

D
dapan1121 已提交
376
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
377 378
    qwReleaseTaskCtx(mgmt, ctx);
  }
D
dapan1121 已提交
379

380 381 382 383 384
  if (code != TSDB_CODE_SUCCESS) {
    QW_TASK_ELOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
  } else {
    QW_TASK_DLOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
  }
D
dapan1121 已提交
385 386 387 388 389

  QW_RET(code);
}

int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
S
Shengliang Guan 已提交
390 391 392
  int32_t         code = 0;
  SQWTaskCtx     *ctx = NULL;
  SRpcHandleInfo  connInfo = {0};
D
dapan1121 已提交
393
  SRpcHandleInfo *rspConnection = NULL;
D
dapan1121 已提交
394

D
dapan1121 已提交
395
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
dengyihao's avatar
dengyihao 已提交
396

D
dapan1121 已提交
397
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
398

D
dapan1121 已提交
399 400
  QW_LOCK(QW_WRITE, &ctx->lock);

D
dapan1121 已提交
401
  if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
402
    QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
403 404 405 406
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
  }

  if (QW_PHASE_POST_QUERY == phase) {
407
    connInfo = ctx->ctrlConnInfo;
D
dapan1121 已提交
408
    rspConnection = &connInfo;
dengyihao's avatar
dengyihao 已提交
409

D
dapan1121 已提交
410
    ctx->queryRsped = true;
D
dapan1121 已提交
411 412
  }

D
dapan1121 已提交
413
  if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
414 415 416 417
    if (QW_PHASE_POST_FETCH == phase) {
      QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
D
dapan1121 已提交
418

D
dapan1121 已提交
419 420
    //qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
    //QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
421

D
dapan1121 已提交
422 423
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
424 425 426
  }

  if (ctx->rspCode) {
dengyihao's avatar
dengyihao 已提交
427 428
    QW_TASK_ELOG("task already failed, phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode,
                 tstrerror(ctx->rspCode));
D
dapan1121 已提交
429
    QW_ERR_JRET(ctx->rspCode);
dengyihao's avatar
dengyihao 已提交
430
  }
D
dapan1121 已提交
431

D
dapan1121 已提交
432
  QW_ERR_JRET(input->code);
D
dapan1121 已提交
433 434 435

_return:

D
dapan1121 已提交
436
  if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
D
dapan1121 已提交
437
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC);
D
dapan1121 已提交
438 439
  }

D
dapan1121 已提交
440
  if (rspConnection) {
D
dapan1121 已提交
441
    qwBuildAndSendQueryRsp(input->msgType + 1, rspConnection, code, ctx);
D
dapan1121 已提交
442
    QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", rspConnection->handle, code, tstrerror(code));
443 444
  }

D
dapan1121 已提交
445
  if (ctx) {
D
dapan1121 已提交
446
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
447

D
dapan1121 已提交
448 449 450
    if (QW_PHASE_POST_FETCH != phase) {
      atomic_store_8(&ctx->phase, phase);
    }
dengyihao's avatar
dengyihao 已提交
451

D
dapan1121 已提交
452
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
453
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
454 455
  }

D
dapan1121 已提交
456
  if (code) {
D
dapan1121 已提交
457
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL);
D
dapan1121 已提交
458 459
  }

D
dapan1121 已提交
460
  QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
D
dapan1121 已提交
461

D
dapan1121 已提交
462 463 464
  QW_RET(code);
}

D
dapan1121 已提交
465 466 467 468 469 470 471
int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) {
  QW_ERR_RET(qwDropTask(QW_FPARAMS()));

  QW_RET(TSDB_CODE_SUCCESS);
}


D
dapan1121 已提交
472
int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
473 474 475 476 477 478 479
  int32_t        code = 0;
  bool           queryRsped = false;
  SSubplan      *plan = NULL;
  SQWPhaseInput  input = {0};
  qTaskInfo_t    pTaskInfo = NULL;
  DataSinkHandle sinkHandle = NULL;
  SQWTaskCtx    *ctx = NULL;
D
dapan1121 已提交
480

D
dapan1121 已提交
481
  QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
D
dapan1121 已提交
482

D
dapan1121 已提交
483 484 485
  QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS()));

  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
486

D
dapan1121 已提交
487 488
  ctx->ctrlConnInfo = qwMsg->connInfo;

D
dapan1121 已提交
489
  QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
D
dapan1121 已提交
490

D
dapan1121 已提交
491 492
  qwDbgResponseRedirect(qwMsg, ctx);

D
dapan1121 已提交
493 494 495 496 497 498 499 500 501 502 503
_return:

  if (ctx) {
    QW_UPDATE_RSP_CODE(ctx, code);
    qwReleaseTaskCtx(mgmt, ctx);
  }

  QW_RET(TSDB_CODE_SUCCESS);
}


D
dapan1121 已提交
504
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) {
D
dapan1121 已提交
505 506 507 508 509 510 511 512
  int32_t        code = 0;
  bool           queryRsped = false;
  SSubplan      *plan = NULL;
  SQWPhaseInput  input = {0};
  qTaskInfo_t    pTaskInfo = NULL;
  DataSinkHandle sinkHandle = NULL;
  SQWTaskCtx    *ctx = NULL;

D
dapan1121 已提交
513
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
D
dapan1121 已提交
514 515

  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
516

D
dapan1121 已提交
517 518 519
  ctx->taskType = qwMsg->msgInfo.taskType;
  ctx->explain = qwMsg->msgInfo.explain;
  ctx->needFetch = qwMsg->msgInfo.needFetch;
D
dapan1121 已提交
520
  ctx->queryType = qwMsg->msgType;
X
Xiaoyu Wang 已提交
521

D
dapan1121 已提交
522
  //QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
X
Xiaoyu Wang 已提交
523

D
dapan1121 已提交
524 525
  code = qStringToSubplan(qwMsg->msg, &plan);
  if (TSDB_CODE_SUCCESS != code) {
526 527
    code = TSDB_CODE_INVALID_MSG;
    QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
528
    QW_ERR_JRET(code);
D
dapan1121 已提交
529
  }
dengyihao's avatar
dengyihao 已提交
530

D
dapan1121 已提交
531 532
  ctx->plan = plan;

533
  code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
534
  if (code) {
D
dapan1121 已提交
535
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
536
    QW_ERR_JRET(code);
D
dapan1121 已提交
537
  }
D
dapan1121 已提交
538

H
Haojun Liao 已提交
539
  if (NULL == sinkHandle || NULL == pTaskInfo) {
D
dapan1121 已提交
540 541 542 543
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

dengyihao's avatar
dengyihao 已提交
544 545
  // QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
  // QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
546

dengyihao's avatar
dengyihao 已提交
547
  // queryRsped = true;
D
dapan1121 已提交
548

D
dapan1121 已提交
549 550 551
  atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
  atomic_store_ptr(&ctx->sinkHandle, sinkHandle);

D
dapan1121 已提交
552
  if (pTaskInfo && sinkHandle) {
553
    qwSaveTbVersionInfo(pTaskInfo, ctx);
D
dapan1121 已提交
554
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
D
dapan1121 已提交
555
  }
dengyihao's avatar
dengyihao 已提交
556

D
dapan1121 已提交
557 558
_return:

D
dapan1121 已提交
559
  input.code = code;
D
dapan1121 已提交
560
  input.msgType = qwMsg->msgType;
D
dapan1121 已提交
561
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
562

dengyihao's avatar
dengyihao 已提交
563
  // if (!queryRsped) {
D
dapan1121 已提交
564 565 566
  //  qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
  //  QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
  //}
D
dapan1121 已提交
567

D
dapan1121 已提交
568
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
569 570
}

D
dapan1121 已提交
571
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
572
  SQWTaskCtx   *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
573
  int32_t       code = 0;
574
  SQWPhaseInput input = {0};
575
  void         *rsp = NULL;
dengyihao's avatar
dengyihao 已提交
576 577 578
  int32_t       dataLen = 0;
  bool          queryEnd = false;

D
dapan1121 已提交
579
  do {
D
dapan1121 已提交
580
    QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
D
dapan1121 已提交
581

D
dapan1121 已提交
582
    QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
583

dengyihao's avatar
dengyihao 已提交
584 585
    atomic_store_8((int8_t *)&ctx->queryInQueue, 0);
    atomic_store_8((int8_t *)&ctx->queryContinue, 0);
D
dapan1121 已提交
586

D
dapan1121 已提交
587
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd));
D
dapan1121 已提交
588

D
dapan1121 已提交
589
    if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
590
      SOutputData sOutput = {0};
D
dapan1121 已提交
591
      QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
dengyihao's avatar
dengyihao 已提交
592 593

      if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
594
        QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
dengyihao's avatar
dengyihao 已提交
595 596

        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
597
      }
dengyihao's avatar
dengyihao 已提交
598

D
dapan1121 已提交
599
      if (rsp) {
D
dapan1121 已提交
600
        bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
601

D
dapan1121 已提交
602
        qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
603
        if (qComplete) {
dengyihao's avatar
dengyihao 已提交
604
          atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
605
        }
H
Haojun Liao 已提交
606

D
dapan1121 已提交
607
        qwMsg->connInfo = ctx->dataConnInfo;
dengyihao's avatar
dengyihao 已提交
608 609
        QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);

D
dapan1121 已提交
610
        qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code);
D
dapan1121 已提交
611 612
        rsp = NULL;
        
dengyihao's avatar
dengyihao 已提交
613 614
        QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
                     tstrerror(code), dataLen);
D
dapan1121 已提交
615
      } else {
dengyihao's avatar
dengyihao 已提交
616
        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
617 618 619
      }
    }

dengyihao's avatar
dengyihao 已提交
620
  _return:
621

D
dapan1121 已提交
622 623 624 625
    if (NULL == ctx) {
      break;
    }

D
dapan1121 已提交
626
    if (code && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
dengyihao's avatar
dengyihao 已提交
627
      QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
628 629
      qwFreeFetchRsp(rsp);
      rsp = NULL;
dengyihao's avatar
dengyihao 已提交
630

D
dapan1121 已提交
631
      qwMsg->connInfo = ctx->dataConnInfo;
D
dapan1121 已提交
632
      qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, NULL, 0, code);
dengyihao's avatar
dengyihao 已提交
633 634
      QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
                   0);
635
    }
D
dapan1121 已提交
636

D
dapan1121 已提交
637
    QW_LOCK(QW_WRITE, &ctx->lock);
dengyihao's avatar
dengyihao 已提交
638
    if (queryEnd || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
D
dapan1121 已提交
639
      // Note: if necessary, fetch need to put cquery to queue again
D
dapan1121 已提交
640
      atomic_store_8(&ctx->phase, 0);
dengyihao's avatar
dengyihao 已提交
641
      QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
642 643
      break;
    }
dengyihao's avatar
dengyihao 已提交
644
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
645
  } while (true);
D
dapan1121 已提交
646

D
dapan1121 已提交
647
  input.code = code;
dengyihao's avatar
dengyihao 已提交
648
  qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL);
D
dapan1121 已提交
649

dengyihao's avatar
dengyihao 已提交
650
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
651
}
D
dapan1121 已提交
652

D
dapan1121 已提交
653
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
654 655 656
  int32_t       code = 0;
  int32_t       dataLen = 0;
  bool          locked = false;
657 658
  SQWTaskCtx   *ctx = NULL;
  void         *rsp = NULL;
D
dapan1121 已提交
659
  SQWPhaseInput input = {0};
D
dapan1121 已提交
660

D
dapan1121 已提交
661
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
662

663
  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
664

D
dapan1121 已提交
665 666
  ctx->queryType = qwMsg->msgType;

D
dapan 已提交
667
  SOutputData sOutput = {0};
D
dapan1121 已提交
668
  QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
D
dapan1121 已提交
669

670
  if (NULL == rsp) {
D
dapan1121 已提交
671
    ctx->dataConnInfo = qwMsg->connInfo;
dengyihao's avatar
dengyihao 已提交
672

673
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
D
dapan1121 已提交
674
  } else {
D
dapan1121 已提交
675
    bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
676

D
dapan1121 已提交
677
    qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
678
    if (qComplete) {
dengyihao's avatar
dengyihao 已提交
679
      atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
680
    }
D
dapan1121 已提交
681 682
  }

dengyihao's avatar
dengyihao 已提交
683
  if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
684
    QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
D
dapan1121 已提交
685

D
dapan1121 已提交
686 687
    QW_LOCK(QW_WRITE, &ctx->lock);
    locked = true;
688

D
dapan1121 已提交
689
    // RC WARNING
D
dapan1121 已提交
690
    if (QW_QUERY_RUNNING(ctx)) {
dengyihao's avatar
dengyihao 已提交
691 692
      atomic_store_8((int8_t *)&ctx->queryContinue, 1);
    } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
D
dapan1121 已提交
693
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC);
D
dapan1121 已提交
694

dengyihao's avatar
dengyihao 已提交
695 696
      atomic_store_8((int8_t *)&ctx->queryInQueue, 1);

D
dapan1121 已提交
697
      QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
698
    }
D
dapan 已提交
699
  }
dengyihao's avatar
dengyihao 已提交
700

D
dapan1121 已提交
701
_return:
D
dapan1121 已提交
702

D
dapan1121 已提交
703 704 705 706 707
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

  input.code = code;
D
dapan1121 已提交
708
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL);
D
dapan1121 已提交
709

D
dapan 已提交
710 711 712
  if (code) {
    qwFreeFetchRsp(rsp);
    rsp = NULL;
D
dapan1121 已提交
713
    dataLen = 0;
D
dapan1121 已提交
714 715 716
  }

  if (code || rsp) {
D
dapan1121 已提交
717
    qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
D
dapan1121 已提交
718
    QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), qwMsg->connInfo.handle, code, tstrerror(code),
dengyihao's avatar
dengyihao 已提交
719
                 dataLen);
D
dapan1121 已提交
720 721
  }

D
dapan1121 已提交
722
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
723
}
D
dapan1121 已提交
724

D
dapan1121 已提交
725
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
726 727
  int32_t     code = 0;
  bool        rsped = false;
D
dapan1121 已提交
728
  SQWTaskCtx *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
729
  bool        locked = false;
D
dapan1121 已提交
730

D
dapan1121 已提交
731
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
732

D
dapan1121 已提交
733 734 735 736
  QW_LOCK(QW_WRITE, &ctx->lock);

  locked = true;

D
dapan1121 已提交
737
  if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
738
    QW_TASK_WLOG_E("task already dropping");
D
dapan1121 已提交
739 740 741
    QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
  }

D
dapan1121 已提交
742
  if (QW_QUERY_RUNNING(ctx)) {
D
dapan1121 已提交
743
    QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
D
dapan1121 已提交
744
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP);
D
dapan1121 已提交
745
  } else if (ctx->phase > 0) {
D
dapan1121 已提交
746
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
747
    rsped = true;
D
dapan1121 已提交
748 749
  } else {
    // task not started
D
dapan1121 已提交
750
  }
D
dapan1121 已提交
751

D
dapan1121 已提交
752
  if (!rsped) {
753
    ctx->ctrlConnInfo = qwMsg->connInfo;
dengyihao's avatar
dengyihao 已提交
754

D
dapan1121 已提交
755 756
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
  }
757

D
dapan1121 已提交
758
_return:
D
dapan1121 已提交
759

D
dapan1121 已提交
760
  if (code) {
D
dapan1121 已提交
761 762 763
    if (ctx) {
      QW_UPDATE_RSP_CODE(ctx, code);
    }
H
Haojun Liao 已提交
764

D
dapan1121 已提交
765
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL);
D
dapan1121 已提交
766 767
  }

D
dapan 已提交
768 769 770 771
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

D
dapan1121 已提交
772
  if (ctx) {
D
dapan1121 已提交
773
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
774 775
  }

D
dapan1121 已提交
776 777 778
  QW_RET(TSDB_CODE_SUCCESS);
}

D
dapan1121 已提交
779
int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
dengyihao's avatar
dengyihao 已提交
780
  int32_t         code = 0;
D
dapan1121 已提交
781
  SSchedulerHbRsp rsp = {0};
782
  SQWSchStatus   *sch = NULL;
D
dapan1121 已提交
783

D
dapan1121 已提交
784 785 786
  if (qwMsg->code) {
    QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req));
  }
D
dapan1121 已提交
787 788

  QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
D
dapan1121 已提交
789 790
  QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo));

D
dapan1121 已提交
791 792
  sch->hbBrokenTs = 0;

D
dapan1121 已提交
793
  QW_LOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
794

D
dapan1121 已提交
795
  if (sch->hbConnInfo.handle) {
796
    tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
S
Shengliang Guan 已提交
797
    sch->hbConnInfo.handle = NULL;
D
dapan1121 已提交
798
  }
D
dapan1121 已提交
799

D
dapan1121 已提交
800
  memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
D
dapan1121 已提交
801
  memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId));
dengyihao's avatar
dengyihao 已提交
802

D
dapan1121 已提交
803
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
dengyihao's avatar
dengyihao 已提交
804 805 806

  QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", req->sId,
          req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle);
D
dapan1121 已提交
807

D
dapan1121 已提交
808 809 810 811
  qwReleaseScheduler(QW_READ, mgmt);

_return:

D
dapan1121 已提交
812 813
  memcpy(&rsp.epId, &req->epId, sizeof(req->epId));

S
shm  
Shengliang Guan 已提交
814
  qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
D
dapan1121 已提交
815 816

  if (code) {
817
    tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER);
S
Shengliang Guan 已提交
818
    qwMsg->connInfo.handle = NULL;
D
dapan1121 已提交
819
  }
dengyihao's avatar
dengyihao 已提交
820

821
  /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));*/
dengyihao's avatar
dengyihao 已提交
822

D
dapan1121 已提交
823
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
824 825 826
}

void qwProcessHbTimerEvent(void *param, void *tmrId) {
827
  SQWHbParam *hbParam = (SQWHbParam *)param;
D
dapan1121 已提交
828 829 830 831
  if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) {
    return;
  }

832
  int64_t   refId = hbParam->refId;
D
dapan1121 已提交
833 834 835 836 837
  SQWorker *mgmt = qwAcquire(refId);
  if (NULL == mgmt) {
    QW_DLOG("qwAcquire %" PRIx64 "failed", refId);
    return;
  }
838

D
dapan1121 已提交
839
  SQWSchStatus *sch = NULL;
dengyihao's avatar
dengyihao 已提交
840
  int32_t       taskNum = 0;
841
  SQWHbInfo    *rspList = NULL;
D
dapan1121 已提交
842
  SArray       *pExpiredSch = NULL;
dengyihao's avatar
dengyihao 已提交
843
  int32_t       code = 0;
D
dapan1121 已提交
844

D
dapan1121 已提交
845 846
  qwDbgDumpMgmtInfo(mgmt);

D
dapan1121 已提交
847 848 849 850 851
  QW_LOCK(QW_READ, &mgmt->schLock);

  int32_t schNum = taosHashGetSize(mgmt->schHash);
  if (schNum <= 0) {
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
852
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
853
    qwRelease(refId);
D
dapan1121 已提交
854
    return;
D
dapan1121 已提交
855 856
  }

wafwerar's avatar
wafwerar 已提交
857
  rspList = taosMemoryCalloc(schNum, sizeof(SQWHbInfo));
D
dapan1121 已提交
858 859
  pExpiredSch = taosArrayInit(schNum, sizeof(uint64_t));
  if (NULL == rspList || NULL == pExpiredSch) {
D
dapan1121 已提交
860
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
861 862
    taosMemoryFree(rspList);
    taosArrayDestroy(pExpiredSch);
D
dapan1121 已提交
863 864
    QW_ELOG("calloc %d SQWHbInfo failed", schNum);
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
865
    qwRelease(refId);
D
dapan1121 已提交
866
    return;
D
dapan1121 已提交
867 868
  }

869
  void   *key = NULL;
dengyihao's avatar
dengyihao 已提交
870
  size_t  keyLen = 0;
D
dapan1121 已提交
871
  int32_t i = 0;
D
dapan1121 已提交
872
  int64_t currentMs = taosGetTimestampMs();
D
dapan1121 已提交
873 874 875

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
D
dapan1121 已提交
876 877 878
    SQWSchStatus *sch = (SQWSchStatus *)pIter;
    if (NULL == sch->hbConnInfo.handle) {
      uint64_t *sId = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
879
      QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
D
dapan1121 已提交
880 881 882 883 884

      if (sch->hbBrokenTs > 0 && ((currentMs - sch->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) && taosHashGetSize(sch->tasksHash) <= 0) {
        taosArrayPush(pExpiredSch, sId);
      }
      
D
dapan1121 已提交
885 886 887
      pIter = taosHashIterate(mgmt->schHash, pIter);
      continue;
    }
dengyihao's avatar
dengyihao 已提交
888

D
dapan1121 已提交
889 890 891 892 893 894 895 896 897 898 899 900 901 902 903
    code = qwGenerateSchHbRsp(mgmt, (SQWSchStatus *)pIter, &rspList[i]);
    if (code) {
      taosHashCancelIterate(mgmt->schHash, pIter);
      QW_ERR_JRET(code);
    }

    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

_return:

  QW_UNLOCK(QW_READ, &mgmt->schLock);

  for (int32_t j = 0; j < i; ++j) {
S
shm  
Shengliang Guan 已提交
904
    qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code);
905 906
    /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),*/
    /*(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));*/
D
dapan1121 已提交
907
    tFreeSSchedulerHbRsp(&rspList[j].rsp);
D
dapan1121 已提交
908 909
  }

D
dapan1121 已提交
910
  if (taosArrayGetSize(pExpiredSch) > 0) {
D
dapan1121 已提交
911
    qwClearExpiredSch(mgmt, pExpiredSch);
D
dapan1121 已提交
912 913
  }

wafwerar's avatar
wafwerar 已提交
914
  taosMemoryFreeClear(rspList);
D
dapan1121 已提交
915
  taosArrayDestroy(pExpiredSch);
D
dapan1121 已提交
916

dengyihao's avatar
dengyihao 已提交
917
  taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
918
  qwRelease(refId);
D
dapan1121 已提交
919 920
}

921
int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
D
dapan1121 已提交
922 923 924 925 926 927 928 929 930 931 932 933 934
  int32_t        code = 0;
  SSubplan      *plan = NULL;
  qTaskInfo_t    pTaskInfo = NULL;
  DataSinkHandle sinkHandle = NULL;
  SQWTaskCtx     ctx = {0};

  code = qStringToSubplan(qwMsg->msg, &plan);
  if (TSDB_CODE_SUCCESS != code) {
    code = TSDB_CODE_INVALID_MSG;
    QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
    QW_ERR_JRET(code);
  }

D
dapan1121 已提交
935
  ctx.plan = plan;
D
dapan1121 已提交
936
  
937
  code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
938 939 940 941 942 943 944 945 946 947
  if (code) {
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
    QW_ERR_JRET(code);
  }

  if (NULL == sinkHandle || NULL == pTaskInfo) {
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

D
dapan1121 已提交
948 949
  ctx.taskHandle = pTaskInfo;
  ctx.sinkHandle = sinkHandle;
D
dapan1121 已提交
950

D
dapan1121 已提交
951
  QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx, NULL));
D
dapan1121 已提交
952

953
  QW_ERR_JRET(qwGetDeleteResFromSink(QW_FPARAMS(), &ctx, pRes));
D
dapan1121 已提交
954 955 956 957 958 959 960 961

_return:

  qwFreeTaskCtx(QW_FPARAMS(), &ctx);

  QW_RET(TSDB_CODE_SUCCESS);
}

D
dapan1121 已提交
962

S
Shengliang Guan 已提交
963
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
964
  if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) {
D
dapan1121 已提交
965 966 967
    qError("invalid param to init qworker");
    QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
S
Shengliang 已提交
968

D
dapan1121 已提交
969 970 971 972
  int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1);
  if (1 == qwNum) {
    memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param));
  }
D
dapan1121 已提交
973 974 975 976 977 978 979 980

  int32_t code = qwOpenRef();
  if (code) {
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
    QW_RET(code);
  }

  SQWorker *mgmt = taosMemoryCalloc(1, sizeof(SQWorker));
D
dapan1121 已提交
981
  if (NULL == mgmt) {
D
dapan1121 已提交
982 983
    qError("calloc %d failed", (int32_t)sizeof(SQWorker));
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
D
dapan1121 已提交
984
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
985 986 987 988
  }

  if (cfg) {
    mgmt->cfg = *cfg;
D
dapan1121 已提交
989
    if (0 == mgmt->cfg.maxSchedulerNum) {
D
dapan1121 已提交
990
      mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
D
dapan1121 已提交
991 992
    }
    if (0 == mgmt->cfg.maxTaskNum) {
D
dapan1121 已提交
993
      mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
D
dapan1121 已提交
994 995
    }
    if (0 == mgmt->cfg.maxSchTaskNum) {
D
dapan1121 已提交
996
      mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
997
    }
D
dapan1121 已提交
998
  } else {
D
dapan1121 已提交
999 1000 1001
    mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
    mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
    mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
1002 1003
  }

dengyihao's avatar
dengyihao 已提交
1004 1005
  mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                               HASH_ENTRY_LOCK);
D
dapan1121 已提交
1006
  if (NULL == mgmt->schHash) {
wafwerar's avatar
wafwerar 已提交
1007
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1008
    qError("init %d scheduler hash failed", mgmt->cfg.maxSchedulerNum);
D
dapan1121 已提交
1009
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
1010 1011
  }

dengyihao's avatar
dengyihao 已提交
1012 1013
  mgmt->ctxHash =
      taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1014
  if (NULL == mgmt->ctxHash) {
D
dapan1121 已提交
1015
    qError("init %d task ctx hash failed", mgmt->cfg.maxTaskNum);
D
dapan1121 已提交
1016 1017 1018 1019 1020 1021 1022 1023 1024
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  mgmt->timer = taosTmrInit(0, 0, 0, "qworker");
  if (NULL == mgmt->timer) {
    qError("init timer failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
1025 1026
  mgmt->nodeType = nodeType;
  mgmt->nodeId = nodeId;
S
Shengliang Guan 已提交
1027
  mgmt->msgCb = *pMsgCb;
D
dapan1121 已提交
1028

D
dapan1121 已提交
1029 1030 1031 1032 1033 1034
  mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt);
  if (mgmt->refId < 0) {
    qError("taosAddRef qw failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(terrno);
  }

D
dapan1121 已提交
1035 1036 1037
  SQWHbParam *param = NULL;
  qwSetHbParam(mgmt->refId, &param);

1038
  mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void *)param, mgmt->timer);
D
dapan1121 已提交
1039 1040 1041 1042
  if (NULL == mgmt->hbTimer) {
    qError("start hb timer failed");
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
1043

D
dapan1121 已提交
1044 1045
  *qWorkerMgmt = mgmt;

S
Shengliang Guan 已提交
1046
  qDebug("qworker initialized, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt);
D
dapan1121 已提交
1047

D
dapan1121 已提交
1048
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1049 1050 1051

_return:

D
dapan1121 已提交
1052 1053 1054 1055 1056 1057 1058
  if (mgmt->refId >= 0) {
    qwRelease(mgmt->refId);
  } else {
    taosHashCleanup(mgmt->schHash);
    taosHashCleanup(mgmt->ctxHash);
    taosTmrCleanUp(mgmt->timer);
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
1059

1060
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
D
dapan1121 已提交
1061
  }
1062

D
dapan1121 已提交
1063
  QW_RET(code);
D
dapan1121 已提交
1064
}
D
dapan1121 已提交
1065 1066 1067 1068

void qWorkerDestroy(void **qWorkerMgmt) {
  if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
    return;
D
dapan1121 已提交
1069
  }
D
dapan 已提交
1070

D
dapan1121 已提交
1071
  SQWorker *mgmt = *qWorkerMgmt;
D
dapan1121 已提交
1072

D
dapan1121 已提交
1073 1074 1075
  if (taosRemoveRef(gQwMgmt.qwRef, mgmt->refId)) {
    qError("remove qw from ref list failed, refId:%" PRIx64, mgmt->refId);
  }
D
dapan1121 已提交
1076
}
D
dapan1121 已提交
1077

D
dapan1121 已提交
1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat) {
  if (NULL == handle || NULL == qWorkerMgmt || NULL == pStat) {
    QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
  SDataSinkStat sinkStat = {0};
  
  dsDataSinkGetCacheSize(&sinkStat);
  pStat->cacheDataSize = sinkStat.cachedSize;
  
  pStat->queryProcessed = QW_STAT_GET(mgmt->stat.msgStat.queryProcessed);
  pStat->cqueryProcessed = QW_STAT_GET(mgmt->stat.msgStat.cqueryProcessed);
  pStat->fetchProcessed = QW_STAT_GET(mgmt->stat.msgStat.fetchProcessed);
  pStat->dropProcessed = QW_STAT_GET(mgmt->stat.msgStat.dropProcessed);
  pStat->hbProcessed = QW_STAT_GET(mgmt->stat.msgStat.hbProcessed);
D
dapan1121 已提交
1094
  pStat->deleteProcessed = QW_STAT_GET(mgmt->stat.msgStat.deleteProcessed);
D
dapan1121 已提交
1095 1096 1097 1098 1099 1100 1101

  pStat->numOfQueryInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, QUERY_QUEUE);
  pStat->numOfFetchInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, FETCH_QUEUE);
  pStat->timeInQueryQueue = qwGetTimeInQueue((SQWorker *)qWorkerMgmt, QUERY_QUEUE);
  pStat->timeInFetchQueue = qwGetTimeInQueue((SQWorker *)qWorkerMgmt, FETCH_QUEUE);

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1102 1103 1104
}


dengyihao's avatar
dengyihao 已提交
1105