qworker.c 27.8 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1
#include "dataSinkMgt.h"
2
#include "executor.h"
D
dapan1121 已提交
3
#include "planner.h"
H
Haojun Liao 已提交
4
#include "query.h"
D
dapan1121 已提交
5 6
#include "qwInt.h"
#include "qwMsg.h"
dengyihao's avatar
dengyihao 已提交
7
#include "tcommon.h"
H
Haojun Liao 已提交
8
#include "tmsg.h"
9
#include "tname.h"
D
dapan1121 已提交
10
#include "qworker.h"
D
dapan1121 已提交
11

D
dapan1121 已提交
12
SQWorkerMgmt gQwMgmt = {
13 14 15
    .lock = 0,
    .qwRef = -1,
    .qwNum = 0,
D
dapan1121 已提交
16
};
17

dengyihao's avatar
dengyihao 已提交
18

D
dapan1121 已提交
19 20 21 22
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
  int32_t         code = 0;
  SSchedulerHbRsp rsp = {0};
  SQWSchStatus   *sch = NULL;
D
dapan1121 已提交
23

D
dapan1121 已提交
24
  QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
D
dapan1121 已提交
25

D
dapan1121 已提交
26
  QW_LOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
27

D
dapan1121 已提交
28 29 30 31
  if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
    tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
    sch->hbConnInfo.handle = NULL;
    sch->hbConnInfo.ahandle = NULL;
D
dapan1121 已提交
32

D
dapan1121 已提交
33 34 35
    QW_DLOG("release hb handle due to connection broken, handle:%p", qwMsg->connInfo.handle);
  } else {
    QW_DLOG("ignore hb connection broken, handle:%p, currentHandle:%p", qwMsg->connInfo.handle, sch->hbConnInfo.handle);
D
dapan1121 已提交
36
  }
D
dapan1121 已提交
37

D
dapan1121 已提交
38
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
39

D
dapan1121 已提交
40
  qwReleaseScheduler(QW_READ, mgmt);
D
dapan1121 已提交
41

D
dapan1121 已提交
42
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
43 44
}

D
dapan1121 已提交
45
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
D
dapan1121 已提交
46
  qTaskInfo_t taskHandle = ctx->taskHandle;
D
dapan1121 已提交
47

D
dapan1121 已提交
48
  if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) {
D
dapan1121 已提交
49 50
    if (ctx->explain) {
      SExplainExecInfo *execInfo = NULL;
dengyihao's avatar
dengyihao 已提交
51
      int32_t           resNum = 0;
D
dapan1121 已提交
52
      QW_ERR_RET(qGetExplainExecInfo(taskHandle, &resNum, &execInfo));
53 54 55 56

      SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
      connInfo.ahandle = NULL;
      QW_ERR_RET(qwBuildAndSendExplainRsp(&connInfo, execInfo, resNum));
D
dapan1121 已提交
57 58 59 60 61 62
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
63
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
dengyihao's avatar
dengyihao 已提交
64 65
  int32_t        code = 0;
  bool           qcontinue = true;
66
  SSDataBlock   *pRes = NULL;
dengyihao's avatar
dengyihao 已提交
67 68 69
  uint64_t       useconds = 0;
  int32_t        i = 0;
  int32_t        execNum = 0;
D
dapan1121 已提交
70
  qTaskInfo_t    taskHandle = ctx->taskHandle;
D
dapan1121 已提交
71
  DataSinkHandle sinkHandle = ctx->sinkHandle;
dengyihao's avatar
dengyihao 已提交
72

D
dapan1121 已提交
73
  while (true) {
H
Haojun Liao 已提交
74
    QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
D
dapan1121 已提交
75

D
dapan1121 已提交
76 77 78 79 80 81
    pRes = NULL;

    // if *taskHandle is NULL, it's killed right now
    if (taskHandle) {
      code = qExecTask(taskHandle, &pRes, &useconds);
      if (code) {
82 83 84 85 86
        if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
          QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
        } else {
          QW_TASK_DLOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
        }
D
dapan1121 已提交
87 88
        QW_ERR_RET(code);
      }
D
dapan1121 已提交
89 90
    }

D
dapan1121 已提交
91 92
    ++execNum;

D
dapan1121 已提交
93
    if (NULL == pRes) {
dengyihao's avatar
dengyihao 已提交
94
      QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
H
Haojun Liao 已提交
95

D
dapan1121 已提交
96
      dsEndPut(sinkHandle, useconds);
D
dapan1121 已提交
97 98

      QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
D
dapan1121 已提交
99 100 101 102

      if (queryEnd) {
        *queryEnd = true;
      }
dengyihao's avatar
dengyihao 已提交
103

D
dapan1121 已提交
104 105 106
      break;
    }

D
dapan1121 已提交
107
    int32_t rows = pRes->info.rows;
H
Haojun Liao 已提交
108

109 110
    ASSERT(pRes->info.rows > 0);

H
Haojun Liao 已提交
111
    SInputData inputData = {.pData = pRes};
D
dapan1121 已提交
112 113
    code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
    if (code) {
D
dapan1121 已提交
114 115
      QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code));
      QW_ERR_RET(code);
D
dapan1121 已提交
116
    }
D
dapan1121 已提交
117

D
dapan1121 已提交
118
    QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", rows, qcontinue);
dengyihao's avatar
dengyihao 已提交
119

D
dapan1121 已提交
120 121 122 123
    if (!qcontinue) {
      break;
    }

D
dapan1121 已提交
124 125 126 127 128
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) {
      break;
    }

    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
129 130
      break;
    }
D
dapan1121 已提交
131

D
dapan1121 已提交
132 133 134
    if (atomic_load_32(&ctx->rspCode)) {
      break;
    }
D
dapan1121 已提交
135 136 137 138
  }

  QW_RET(code);
}
D
dapan1121 已提交
139

D
dapan1121 已提交
140
int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
D
dapan1121 已提交
141 142
  int32_t taskNum = 0;

D
dapan1121 已提交
143
  hbInfo->connInfo = sch->hbConnInfo;
D
dapan1121 已提交
144
  hbInfo->rsp.epId = sch->hbEpId;
D
dapan1121 已提交
145

D
dapan1121 已提交
146
  QW_LOCK(QW_READ, &sch->tasksLock);
dengyihao's avatar
dengyihao 已提交
147

D
dapan1121 已提交
148
  taskNum = taosHashGetSize(sch->tasksHash);
D
dapan1121 已提交
149 150 151

  hbInfo->rsp.taskStatus = taosArrayInit(taskNum, sizeof(STaskStatus));
  if (NULL == hbInfo->rsp.taskStatus) {
D
dapan1121 已提交
152
    QW_UNLOCK(QW_READ, &sch->tasksLock);
D
dapan1121 已提交
153
    QW_ELOG("taosArrayInit taskStatus failed, num:%d", taskNum);
D
dapan1121 已提交
154 155 156
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }

157
  void       *key = NULL;
dengyihao's avatar
dengyihao 已提交
158 159
  size_t      keyLen = 0;
  int32_t     i = 0;
D
dapan1121 已提交
160
  STaskStatus status = {0};
D
dapan1121 已提交
161 162 163 164

  void *pIter = taosHashIterate(sch->tasksHash, NULL);
  while (pIter) {
    SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
D
dapan1121 已提交
165
    key = taosHashGetKey(pIter, &keyLen);
D
dapan1121 已提交
166

dengyihao's avatar
dengyihao 已提交
167
    // TODO GET EXECUTOR API TO GET MORE INFO
D
dapan1121 已提交
168

D
dapan1121 已提交
169 170 171
    QW_GET_QTID(key, status.queryId, status.taskId);
    status.status = taskStatus->status;
    status.refId = taskStatus->refId;
dengyihao's avatar
dengyihao 已提交
172

D
dapan1121 已提交
173
    taosArrayPush(hbInfo->rsp.taskStatus, &status);
dengyihao's avatar
dengyihao 已提交
174

D
dapan1121 已提交
175 176
    ++i;
    pIter = taosHashIterate(sch->tasksHash, pIter);
dengyihao's avatar
dengyihao 已提交
177
  }
D
dapan1121 已提交
178 179 180 181 182 183

  QW_UNLOCK(QW_READ, &sch->tasksLock);

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
184
int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
dengyihao's avatar
dengyihao 已提交
185
  int32_t            len = 0;
D
dapan1121 已提交
186
  SRetrieveTableRsp *rsp = NULL;
dengyihao's avatar
dengyihao 已提交
187 188
  bool               queryEnd = false;
  int32_t            code = 0;
D
dapan1121 已提交
189

D
dapan1121 已提交
190
  dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
D
dapan1121 已提交
191

D
dapan1121 已提交
192 193 194 195
  if (len < 0) {
    QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
D
dapan1121 已提交
196

D
dapan1121 已提交
197 198
  if (len == 0) {
    if (queryEnd) {
D
dapan 已提交
199
      code = dsGetDataBlock(ctx->sinkHandle, pOutput);
D
dapan1121 已提交
200
      if (code) {
D
dapan1121 已提交
201
        QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
202 203
        QW_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
204

D
dapan1121 已提交
205
      QW_TASK_DLOG_E("no data in sink and query end");
H
Haojun Liao 已提交
206

D
dapan1121 已提交
207
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
L
Liu Jicong 已提交
208
      QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
209

D
dapan1121 已提交
210
      *rspMsg = rsp;
D
dapan 已提交
211
      *dataLen = 0;
D
dapan1121 已提交
212 213
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
214 215

    pOutput->bufStatus = DS_BUF_EMPTY;
D
dapan1121 已提交
216

D
dapan1121 已提交
217
    return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
218
  }
D
dapan1121 已提交
219

D
dapan1121 已提交
220
  // Got data from sink
D
dapan1121 已提交
221
  QW_TASK_DLOG("there are data in sink, dataLength:%d", len);
D
dapan1121 已提交
222

D
dapan 已提交
223
  *dataLen = len;
dengyihao's avatar
dengyihao 已提交
224

D
dapan1121 已提交
225
  QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
D
dapan 已提交
226
  *rspMsg = rsp;
dengyihao's avatar
dengyihao 已提交
227

D
dapan 已提交
228 229
  pOutput->pData = rsp->data;
  code = dsGetDataBlock(ctx->sinkHandle, pOutput);
D
dapan1121 已提交
230
  if (code) {
D
dapan1121 已提交
231
    QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
232 233
    QW_ERR_RET(code);
  }
D
dapan1121 已提交
234

D
dapan1121 已提交
235
  if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
D
dapan1121 已提交
236
    QW_TASK_DLOG_E("task all data fetched, done");
D
dapan1121 已提交
237
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
D
dapan1121 已提交
238 239
  }

D
dapan1121 已提交
240
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
241 242
}

243

D
dapan1121 已提交
244
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
245 246
  int32_t         code = 0;
  SQWTaskCtx     *ctx = NULL;
S
Shengliang Guan 已提交
247
  SRpcHandleInfo *cancelConnection = NULL;
D
dapan1121 已提交
248

D
dapan1121 已提交
249
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
250 251 252 253 254 255

  if (QW_PHASE_PRE_QUERY == phase) {
    QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
  } else {
    QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
  }
dengyihao's avatar
dengyihao 已提交
256

D
dapan1121 已提交
257
  QW_LOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
258

D
dapan1121 已提交
259
  if (QW_PHASE_PRE_FETCH == phase) {
dengyihao's avatar
dengyihao 已提交
260
    atomic_store_8((int8_t *)&ctx->queryFetched, true);
D
dapan1121 已提交
261
  } else {
D
dapan1121 已提交
262 263
    atomic_store_8(&ctx->phase, phase);
  }
D
dapan1121 已提交
264

dengyihao's avatar
dengyihao 已提交
265
  if (atomic_load_8((int8_t *)&ctx->queryEnd)) {
D
dapan1121 已提交
266 267 268
    QW_TASK_ELOG_E("query already end");
    QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
  }
D
dapan1121 已提交
269

D
dapan1121 已提交
270 271
  switch (phase) {
    case QW_PHASE_PRE_QUERY: {
D
dapan1121 已提交
272
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
H
Haojun Liao 已提交
273
        QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase));
D
dapan1121 已提交
274
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
D
dapan1121 已提交
275 276
        break;
      }
D
dapan1121 已提交
277

D
dapan1121 已提交
278
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
279
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
280

D
dapan1121 已提交
281 282
        //qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        //QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
283

D
dapan1121 已提交
284
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan 已提交
285
        break;
D
dapan1121 已提交
286
      }
D
dapan1121 已提交
287

D
dapan1121 已提交
288
      QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
D
dapan1121 已提交
289 290
      break;
    }
D
dapan1121 已提交
291
    case QW_PHASE_PRE_FETCH: {
D
dapan1121 已提交
292 293
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
        QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
294 295
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
      }
D
dapan1121 已提交
296

D
dapan1121 已提交
297
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
D
dapan1121 已提交
298
        QW_TASK_WLOG("last fetch still not processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
299 300 301 302
        QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
      }

      if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) {
D
dapan1121 已提交
303
        QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
304
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
D
dapan1121 已提交
305 306
      }
      break;
dengyihao's avatar
dengyihao 已提交
307
    }
D
dapan1121 已提交
308 309
    case QW_PHASE_PRE_CQUERY: {
      if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
310
        QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
311
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
312
      }
D
dapan1121 已提交
313

D
dapan1121 已提交
314
      if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
315
        QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
H
Haojun Liao 已提交
316

D
dapan1121 已提交
317 318
        //qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
        //QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
319

D
dapan1121 已提交
320
        QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
321
      }
D
dapan1121 已提交
322

D
dapan1121 已提交
323
      break;
D
dapan1121 已提交
324 325 326 327 328 329 330
    }
    default:
      QW_TASK_ELOG("invalid phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

  if (ctx->rspCode) {
331
    QW_TASK_ELOG("task already failed at phase %s, code:%s", qwPhaseStr(phase), tstrerror(ctx->rspCode));
D
dapan1121 已提交
332
    QW_ERR_JRET(ctx->rspCode);
D
dapan1121 已提交
333
  }
D
dapan1121 已提交
334

D
dapan1121 已提交
335 336
_return:
  if (ctx) {
D
dapan1121 已提交
337
    QW_UPDATE_RSP_CODE(ctx, code);
dengyihao's avatar
dengyihao 已提交
338

D
dapan1121 已提交
339
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
340 341
    qwReleaseTaskCtx(mgmt, ctx);
  }
D
dapan1121 已提交
342

D
dapan1121 已提交
343
  if (cancelConnection) {
S
shm  
Shengliang Guan 已提交
344
    qwBuildAndSendCancelRsp(cancelConnection, code);
D
dapan1121 已提交
345
    QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code));
D
dapan1121 已提交
346 347
  }

348 349 350 351 352
  if (code != TSDB_CODE_SUCCESS) {
    QW_TASK_ELOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
  } else {
    QW_TASK_DLOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
  }
D
dapan1121 已提交
353 354 355 356 357

  QW_RET(code);
}

int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
S
Shengliang Guan 已提交
358 359 360
  int32_t         code = 0;
  SQWTaskCtx     *ctx = NULL;
  SRpcHandleInfo  connInfo = {0};
D
dapan1121 已提交
361
  SRpcHandleInfo *rspConnection = NULL;
D
dapan1121 已提交
362

D
dapan1121 已提交
363
  QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
dengyihao's avatar
dengyihao 已提交
364

D
dapan1121 已提交
365
  QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
366

D
dapan1121 已提交
367 368 369
  QW_LOCK(QW_WRITE, &ctx->lock);

  if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
370
    QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
D
dapan1121 已提交
371 372 373 374
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
  }

  if (QW_PHASE_POST_QUERY == phase) {
D
dapan1121 已提交
375
#if 0    
D
dapan1121 已提交
376
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
D
dapan1121 已提交
377
      readyConnection = &ctx->connInfo;
D
dapan1121 已提交
378 379
      QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
    }
D
dapan1121 已提交
380
#else
381
    connInfo = ctx->ctrlConnInfo;
D
dapan1121 已提交
382
    rspConnection = &connInfo;
dengyihao's avatar
dengyihao 已提交
383

D
dapan1121 已提交
384 385
    QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
#endif
D
dapan1121 已提交
386 387
  }

D
dapan1121 已提交
388
  if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
389 390 391 392
    if (QW_PHASE_POST_FETCH == phase) {
      QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase));
      QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
D
dapan1121 已提交
393

D
dapan1121 已提交
394 395
    //qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
    //QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
396

D
dapan1121 已提交
397 398
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
    QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
D
dapan1121 已提交
399 400 401
  }

  if (ctx->rspCode) {
dengyihao's avatar
dengyihao 已提交
402 403
    QW_TASK_ELOG("task already failed, phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode,
                 tstrerror(ctx->rspCode));
D
dapan1121 已提交
404
    QW_ERR_JRET(ctx->rspCode);
dengyihao's avatar
dengyihao 已提交
405
  }
D
dapan1121 已提交
406

D
dapan1121 已提交
407
  QW_ERR_JRET(input->code);
D
dapan1121 已提交
408 409 410

_return:

D
dapan1121 已提交
411 412 413 414
  if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED);
  }

D
dapan1121 已提交
415 416 417
  if (rspConnection) {
    qwBuildAndSendQueryRsp(rspConnection, code, ctx ? &ctx->tbInfo : NULL);
    QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", rspConnection->handle, code, tstrerror(code));
418 419
  }

D
dapan1121 已提交
420
  if (ctx) {
D
dapan1121 已提交
421
    QW_UPDATE_RSP_CODE(ctx, code);
D
dapan1121 已提交
422

D
dapan1121 已提交
423 424 425
    if (QW_PHASE_POST_FETCH != phase) {
      atomic_store_8(&ctx->phase, phase);
    }
dengyihao's avatar
dengyihao 已提交
426

D
dapan1121 已提交
427
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
428
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
429 430
  }

D
dapan1121 已提交
431 432
  if (code) {
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
D
dapan1121 已提交
433 434
  }

D
dapan1121 已提交
435
  QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
D
dapan1121 已提交
436

D
dapan1121 已提交
437 438 439
  QW_RET(code);
}

D
dapan1121 已提交
440
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) {
441 442 443 444 445 446 447
  int32_t        code = 0;
  bool           queryRsped = false;
  SSubplan      *plan = NULL;
  SQWPhaseInput  input = {0};
  qTaskInfo_t    pTaskInfo = NULL;
  DataSinkHandle sinkHandle = NULL;
  SQWTaskCtx    *ctx = NULL;
D
dapan1121 已提交
448

D
dapan1121 已提交
449
  QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
D
dapan1121 已提交
450

D
dapan1121 已提交
451
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
D
dapan1121 已提交
452 453

  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
454

D
dapan1121 已提交
455
  atomic_store_8(&ctx->taskType, taskType);
D
dapan1121 已提交
456
  atomic_store_8(&ctx->explain, explain);
X
Xiaoyu Wang 已提交
457

458
  ctx->ctrlConnInfo = qwMsg->connInfo;
D
dapan1121 已提交
459

460
  /*QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);*/
X
Xiaoyu Wang 已提交
461

D
dapan1121 已提交
462 463
  code = qStringToSubplan(qwMsg->msg, &plan);
  if (TSDB_CODE_SUCCESS != code) {
464 465
    code = TSDB_CODE_INVALID_MSG;
    QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
466
    QW_ERR_JRET(code);
D
dapan1121 已提交
467
  }
dengyihao's avatar
dengyihao 已提交
468

D
dapan1121 已提交
469 470
  ctx->plan = plan;

471
  code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
D
dapan1121 已提交
472
  if (code) {
D
dapan1121 已提交
473
    QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
D
dapan1121 已提交
474
    QW_ERR_JRET(code);
D
dapan1121 已提交
475
  }
D
dapan1121 已提交
476

H
Haojun Liao 已提交
477
  if (NULL == sinkHandle || NULL == pTaskInfo) {
D
dapan1121 已提交
478 479 480 481
    QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

dengyihao's avatar
dengyihao 已提交
482 483
  // QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
  // QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
484

dengyihao's avatar
dengyihao 已提交
485
  // queryRsped = true;
D
dapan1121 已提交
486

D
dapan1121 已提交
487 488 489
  atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
  atomic_store_ptr(&ctx->sinkHandle, sinkHandle);

D
dapan1121 已提交
490
  if (pTaskInfo && sinkHandle) {
491
    qwSaveTbVersionInfo(pTaskInfo, ctx);
D
dapan1121 已提交
492
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
D
dapan1121 已提交
493
  }
dengyihao's avatar
dengyihao 已提交
494

D
dapan1121 已提交
495 496
_return:

D
dapan1121 已提交
497 498
  input.code = code;
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
499

dengyihao's avatar
dengyihao 已提交
500
  // if (!queryRsped) {
D
dapan1121 已提交
501 502 503
  //  qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
  //  QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
  //}
D
dapan1121 已提交
504

D
dapan1121 已提交
505
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
506 507
}

D
dapan1121 已提交
508
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
509
  SQWTaskCtx   *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
510
  int32_t       code = 0;
511
  SQWPhaseInput input = {0};
512
  void         *rsp = NULL;
dengyihao's avatar
dengyihao 已提交
513 514 515
  int32_t       dataLen = 0;
  bool          queryEnd = false;

D
dapan1121 已提交
516
  do {
D
dapan1121 已提交
517
    QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
D
dapan1121 已提交
518

D
dapan1121 已提交
519
    QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
D
dapan1121 已提交
520

dengyihao's avatar
dengyihao 已提交
521 522
    atomic_store_8((int8_t *)&ctx->queryInQueue, 0);
    atomic_store_8((int8_t *)&ctx->queryContinue, 0);
D
dapan1121 已提交
523

D
dapan1121 已提交
524
    QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd));
D
dapan1121 已提交
525

D
dapan1121 已提交
526 527 528
    if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
      SOutputData sOutput = {0};
      QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
dengyihao's avatar
dengyihao 已提交
529 530

      if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
531
        QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
dengyihao's avatar
dengyihao 已提交
532 533

        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
534
      }
dengyihao's avatar
dengyihao 已提交
535

D
dapan1121 已提交
536
      if (rsp) {
D
dapan1121 已提交
537
        bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
538

D
dapan1121 已提交
539
        qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
540
        if (qComplete) {
dengyihao's avatar
dengyihao 已提交
541
          atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
542
        }
H
Haojun Liao 已提交
543

D
dapan1121 已提交
544
        qwMsg->connInfo = ctx->dataConnInfo;
dengyihao's avatar
dengyihao 已提交
545 546 547 548 549
        QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);

        qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
        QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
                     tstrerror(code), dataLen);
D
dapan1121 已提交
550
      } else {
dengyihao's avatar
dengyihao 已提交
551
        atomic_store_8((int8_t *)&ctx->queryContinue, 1);
552 553 554
      }
    }

dengyihao's avatar
dengyihao 已提交
555
  _return:
556

D
dapan1121 已提交
557 558 559 560
    if (NULL == ctx) {
      break;
    }

D
dapan1121 已提交
561
    if (code && QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
dengyihao's avatar
dengyihao 已提交
562
      QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
563 564
      qwFreeFetchRsp(rsp);
      rsp = NULL;
dengyihao's avatar
dengyihao 已提交
565

D
dapan1121 已提交
566
      qwMsg->connInfo = ctx->dataConnInfo;
S
shm  
Shengliang Guan 已提交
567
      qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code);
dengyihao's avatar
dengyihao 已提交
568 569
      QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
                   0);
570
    }
D
dapan1121 已提交
571

D
dapan1121 已提交
572
    QW_LOCK(QW_WRITE, &ctx->lock);
dengyihao's avatar
dengyihao 已提交
573
    if (queryEnd || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
D
dapan1121 已提交
574
      // Note: if necessary, fetch need to put cquery to queue again
D
dapan1121 已提交
575
      atomic_store_8(&ctx->phase, 0);
dengyihao's avatar
dengyihao 已提交
576
      QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
577 578
      break;
    }
dengyihao's avatar
dengyihao 已提交
579
    QW_UNLOCK(QW_WRITE, &ctx->lock);
D
dapan1121 已提交
580
  } while (true);
D
dapan1121 已提交
581

D
dapan1121 已提交
582
  input.code = code;
dengyihao's avatar
dengyihao 已提交
583
  qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL);
D
dapan1121 已提交
584

dengyihao's avatar
dengyihao 已提交
585
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
586
}
D
dapan1121 已提交
587

D
dapan1121 已提交
588
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
589 590 591
  int32_t       code = 0;
  int32_t       dataLen = 0;
  bool          locked = false;
592 593
  SQWTaskCtx   *ctx = NULL;
  void         *rsp = NULL;
D
dapan1121 已提交
594
  SQWPhaseInput input = {0};
D
dapan1121 已提交
595

D
dapan1121 已提交
596
  QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
597

598
  QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
599

D
dapan 已提交
600 601
  SOutputData sOutput = {0};
  QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
D
dapan1121 已提交
602

603
  if (NULL == rsp) {
D
dapan1121 已提交
604
    ctx->dataConnInfo = qwMsg->connInfo;
dengyihao's avatar
dengyihao 已提交
605

606
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
D
dapan1121 已提交
607
  } else {
D
dapan1121 已提交
608
    bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
dengyihao's avatar
dengyihao 已提交
609

D
dapan1121 已提交
610
    qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
D
dapan1121 已提交
611
    if (qComplete) {
dengyihao's avatar
dengyihao 已提交
612
      atomic_store_8((int8_t *)&ctx->queryEnd, true);
D
dapan1121 已提交
613
    }
D
dapan1121 已提交
614 615
  }

dengyihao's avatar
dengyihao 已提交
616
  if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
D
dapan1121 已提交
617
    QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
D
dapan1121 已提交
618

D
dapan1121 已提交
619 620
    QW_LOCK(QW_WRITE, &ctx->lock);
    locked = true;
621

D
dapan1121 已提交
622
    // RC WARNING
D
dapan1121 已提交
623
    if (QW_IS_QUERY_RUNNING(ctx)) {
dengyihao's avatar
dengyihao 已提交
624 625
      atomic_store_8((int8_t *)&ctx->queryContinue, 1);
    } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
H
Haojun Liao 已提交
626
      qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING);
D
dapan1121 已提交
627

dengyihao's avatar
dengyihao 已提交
628 629
      atomic_store_8((int8_t *)&ctx->queryInQueue, 1);

D
dapan1121 已提交
630
      QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
631
    }
D
dapan 已提交
632
  }
dengyihao's avatar
dengyihao 已提交
633

D
dapan1121 已提交
634
_return:
D
dapan1121 已提交
635

D
dapan1121 已提交
636 637 638 639 640
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

  input.code = code;
D
dapan1121 已提交
641
  code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL);
D
dapan1121 已提交
642

D
dapan 已提交
643 644 645
  if (code) {
    qwFreeFetchRsp(rsp);
    rsp = NULL;
D
dapan1121 已提交
646
    dataLen = 0;
D
dapan1121 已提交
647 648 649
  }

  if (code || rsp) {
S
shm  
Shengliang Guan 已提交
650
    qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
dengyihao's avatar
dengyihao 已提交
651 652
    QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
                 dataLen);
D
dapan1121 已提交
653 654
  }

D
dapan1121 已提交
655
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
656
}
D
dapan1121 已提交
657

D
dapan1121 已提交
658
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
dengyihao's avatar
dengyihao 已提交
659 660
  int32_t     code = 0;
  bool        rsped = false;
D
dapan1121 已提交
661
  SQWTaskCtx *ctx = NULL;
dengyihao's avatar
dengyihao 已提交
662
  bool        locked = false;
D
dapan1121 已提交
663

D
dapan1121 已提交
664 665
  // TODO : TASK ALREADY REMOVED AND A NEW DROP MSG RECEIVED

D
dapan1121 已提交
666
  QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
dengyihao's avatar
dengyihao 已提交
667

D
dapan1121 已提交
668 669 670 671 672
  QW_LOCK(QW_WRITE, &ctx->lock);

  locked = true;

  if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
D
dapan1121 已提交
673
    QW_TASK_WLOG_E("task already dropping");
D
dapan1121 已提交
674 675 676
    QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
  }

D
dapan1121 已提交
677
  if (QW_IS_QUERY_RUNNING(ctx)) {
D
dapan1121 已提交
678
    QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
D
dapan1121 已提交
679
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING);
D
dapan1121 已提交
680
  } else if (ctx->phase > 0) {
D
dapan1121 已提交
681
    QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
D
dapan1121 已提交
682
    rsped = true;
D
dapan1121 已提交
683 684
  } else {
    // task not started
D
dapan1121 已提交
685
  }
D
dapan1121 已提交
686

D
dapan1121 已提交
687
  if (!rsped) {
688
    ctx->ctrlConnInfo = qwMsg->connInfo;
dengyihao's avatar
dengyihao 已提交
689

D
dapan1121 已提交
690 691
    QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
  }
692

D
dapan1121 已提交
693
_return:
D
dapan1121 已提交
694

D
dapan1121 已提交
695
  if (code) {
D
dapan1121 已提交
696 697 698
    if (ctx) {
      QW_UPDATE_RSP_CODE(ctx, code);
    }
H
Haojun Liao 已提交
699

D
dapan1121 已提交
700
    qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
D
dapan1121 已提交
701 702
  }

D
dapan 已提交
703 704 705 706
  if (locked) {
    QW_UNLOCK(QW_WRITE, &ctx->lock);
  }

D
dapan1121 已提交
707
  if (ctx) {
D
dapan1121 已提交
708
    qwReleaseTaskCtx(mgmt, ctx);
D
dapan1121 已提交
709 710
  }

D
dapan1121 已提交
711 712 713
  QW_RET(TSDB_CODE_SUCCESS);
}

D
dapan1121 已提交
714
int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
dengyihao's avatar
dengyihao 已提交
715
  int32_t         code = 0;
D
dapan1121 已提交
716
  SSchedulerHbRsp rsp = {0};
717
  SQWSchStatus   *sch = NULL;
D
dapan1121 已提交
718

D
dapan1121 已提交
719 720 721
  if (qwMsg->code) {
    QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req));
  }
D
dapan1121 已提交
722 723 724

  QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));

D
dapan1121 已提交
725 726
  QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo));

D
dapan1121 已提交
727
  QW_LOCK(QW_WRITE, &sch->hbConnLock);
D
dapan1121 已提交
728

D
dapan1121 已提交
729
  if (sch->hbConnInfo.handle) {
730
    tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
S
Shengliang Guan 已提交
731
    sch->hbConnInfo.handle = NULL;
D
dapan1121 已提交
732
  }
D
dapan1121 已提交
733

D
dapan1121 已提交
734
  memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
D
dapan1121 已提交
735
  memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId));
dengyihao's avatar
dengyihao 已提交
736

D
dapan1121 已提交
737
  QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
dengyihao's avatar
dengyihao 已提交
738 739 740

  QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", req->sId,
          req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle);
D
dapan1121 已提交
741

D
dapan1121 已提交
742 743 744 745
  qwReleaseScheduler(QW_READ, mgmt);

_return:

D
dapan1121 已提交
746 747
  memcpy(&rsp.epId, &req->epId, sizeof(req->epId));

S
shm  
Shengliang Guan 已提交
748
  qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
D
dapan1121 已提交
749 750

  if (code) {
751
    tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER);
S
Shengliang Guan 已提交
752
    qwMsg->connInfo.handle = NULL;
D
dapan1121 已提交
753
  }
dengyihao's avatar
dengyihao 已提交
754

755
  /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));*/
dengyihao's avatar
dengyihao 已提交
756

D
dapan1121 已提交
757
  QW_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
758 759 760
}

void qwProcessHbTimerEvent(void *param, void *tmrId) {
761
  SQWHbParam *hbParam = (SQWHbParam *)param;
D
dapan1121 已提交
762 763 764 765
  if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) {
    return;
  }

766
  int64_t   refId = hbParam->refId;
D
dapan1121 已提交
767 768 769 770 771 772
  SQWorker *mgmt = qwAcquire(refId);
  if (NULL == mgmt) {
    QW_DLOG("qwAcquire %" PRIx64 "failed", refId);
    taosMemoryFree(param);
    return;
  }
773

D
dapan1121 已提交
774
  SQWSchStatus *sch = NULL;
dengyihao's avatar
dengyihao 已提交
775
  int32_t       taskNum = 0;
776
  SQWHbInfo    *rspList = NULL;
dengyihao's avatar
dengyihao 已提交
777
  int32_t       code = 0;
D
dapan1121 已提交
778

D
dapan1121 已提交
779 780
  qwDbgDumpMgmtInfo(mgmt);

D
dapan1121 已提交
781 782 783 784 785
  QW_LOCK(QW_READ, &mgmt->schLock);

  int32_t schNum = taosHashGetSize(mgmt->schHash);
  if (schNum <= 0) {
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
786
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
787
    qwRelease(refId);
D
dapan1121 已提交
788
    return;
D
dapan1121 已提交
789 790
  }

wafwerar's avatar
wafwerar 已提交
791
  rspList = taosMemoryCalloc(schNum, sizeof(SQWHbInfo));
D
dapan1121 已提交
792 793
  if (NULL == rspList) {
    QW_UNLOCK(QW_READ, &mgmt->schLock);
D
dapan1121 已提交
794 795
    QW_ELOG("calloc %d SQWHbInfo failed", schNum);
    taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
D
dapan1121 已提交
796
    qwRelease(refId);
D
dapan1121 已提交
797
    return;
D
dapan1121 已提交
798 799
  }

800
  void   *key = NULL;
dengyihao's avatar
dengyihao 已提交
801
  size_t  keyLen = 0;
D
dapan1121 已提交
802 803 804 805
  int32_t i = 0;

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
D
dapan1121 已提交
806 807 808
    SQWSchStatus *sch = (SQWSchStatus *)pIter;
    if (NULL == sch->hbConnInfo.handle) {
      uint64_t *sId = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
809
      QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
D
dapan1121 已提交
810 811 812
      pIter = taosHashIterate(mgmt->schHash, pIter);
      continue;
    }
dengyihao's avatar
dengyihao 已提交
813

D
dapan1121 已提交
814 815 816 817 818 819 820 821 822 823 824 825 826 827 828
    code = qwGenerateSchHbRsp(mgmt, (SQWSchStatus *)pIter, &rspList[i]);
    if (code) {
      taosHashCancelIterate(mgmt->schHash, pIter);
      QW_ERR_JRET(code);
    }

    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

_return:

  QW_UNLOCK(QW_READ, &mgmt->schLock);

  for (int32_t j = 0; j < i; ++j) {
S
shm  
Shengliang Guan 已提交
829
    qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code);
830 831
    /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),*/
    /*(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));*/
D
dapan1121 已提交
832
    tFreeSSchedulerHbRsp(&rspList[j].rsp);
D
dapan1121 已提交
833 834
  }

wafwerar's avatar
wafwerar 已提交
835
  taosMemoryFreeClear(rspList);
D
dapan1121 已提交
836

dengyihao's avatar
dengyihao 已提交
837
  taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
838
  qwRelease(refId);
D
dapan1121 已提交
839 840 841
}


S
Shengliang Guan 已提交
842
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
843
  if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) {
D
dapan1121 已提交
844 845 846
    qError("invalid param to init qworker");
    QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
S
Shengliang 已提交
847

D
dapan1121 已提交
848 849 850 851
  int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1);
  if (1 == qwNum) {
    memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param));
  }
D
dapan1121 已提交
852 853 854 855 856 857 858 859

  int32_t code = qwOpenRef();
  if (code) {
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
    QW_RET(code);
  }

  SQWorker *mgmt = taosMemoryCalloc(1, sizeof(SQWorker));
D
dapan1121 已提交
860
  if (NULL == mgmt) {
D
dapan1121 已提交
861 862
    qError("calloc %d failed", (int32_t)sizeof(SQWorker));
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
D
dapan1121 已提交
863
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
864 865 866 867
  }

  if (cfg) {
    mgmt->cfg = *cfg;
D
dapan1121 已提交
868
    if (0 == mgmt->cfg.maxSchedulerNum) {
D
dapan1121 已提交
869
      mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
D
dapan1121 已提交
870 871
    }
    if (0 == mgmt->cfg.maxTaskNum) {
D
dapan1121 已提交
872
      mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
D
dapan1121 已提交
873 874
    }
    if (0 == mgmt->cfg.maxSchTaskNum) {
D
dapan1121 已提交
875
      mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
876
    }
D
dapan1121 已提交
877
  } else {
D
dapan1121 已提交
878 879 880
    mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
    mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
    mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
D
dapan1121 已提交
881 882
  }

dengyihao's avatar
dengyihao 已提交
883 884
  mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
                               HASH_ENTRY_LOCK);
D
dapan1121 已提交
885
  if (NULL == mgmt->schHash) {
wafwerar's avatar
wafwerar 已提交
886
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
887
    qError("init %d scheduler hash failed", mgmt->cfg.maxSchedulerNum);
D
dapan1121 已提交
888
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
889 890
  }

dengyihao's avatar
dengyihao 已提交
891 892
  mgmt->ctxHash =
      taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
893
  if (NULL == mgmt->ctxHash) {
D
dapan1121 已提交
894
    qError("init %d task ctx hash failed", mgmt->cfg.maxTaskNum);
D
dapan1121 已提交
895 896 897 898 899 900 901 902 903
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  mgmt->timer = taosTmrInit(0, 0, 0, "qworker");
  if (NULL == mgmt->timer) {
    qError("init timer failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
904 905
  mgmt->nodeType = nodeType;
  mgmt->nodeId = nodeId;
S
Shengliang Guan 已提交
906
  mgmt->msgCb = *pMsgCb;
D
dapan1121 已提交
907

D
dapan1121 已提交
908 909 910 911 912 913
  mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt);
  if (mgmt->refId < 0) {
    qError("taosAddRef qw failed, error:%s", tstrerror(terrno));
    QW_ERR_JRET(terrno);
  }

D
dapan1121 已提交
914 915 916
  SQWHbParam *param = NULL;
  qwSetHbParam(mgmt->refId, &param);

917
  mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void *)param, mgmt->timer);
D
dapan1121 已提交
918 919 920 921
  if (NULL == mgmt->hbTimer) {
    qError("start hb timer failed");
    QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
922

D
dapan1121 已提交
923 924
  *qWorkerMgmt = mgmt;

D
dapan1121 已提交
925 926
  qDebug("qworker initialized for node, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt);

D
dapan1121 已提交
927
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
928 929 930

_return:

D
dapan1121 已提交
931 932 933 934 935 936 937
  if (mgmt->refId >= 0) {
    qwRelease(mgmt->refId);
  } else {
    taosHashCleanup(mgmt->schHash);
    taosHashCleanup(mgmt->ctxHash);
    taosTmrCleanUp(mgmt->timer);
    taosMemoryFreeClear(mgmt);
D
dapan1121 已提交
938

939
    atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
D
dapan1121 已提交
940
  }
941

D
dapan1121 已提交
942
  QW_RET(code);
D
dapan1121 已提交
943
}
D
dapan1121 已提交
944 945 946 947

void qWorkerDestroy(void **qWorkerMgmt) {
  if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
    return;
D
dapan1121 已提交
948
  }
D
dapan 已提交
949

D
dapan1121 已提交
950
  SQWorker *mgmt = *qWorkerMgmt;
D
dapan1121 已提交
951

D
dapan1121 已提交
952 953 954
  if (taosRemoveRef(gQwMgmt.qwRef, mgmt->refId)) {
    qError("remove qw from ref list failed, refId:%" PRIx64, mgmt->refId);
  }
D
dapan1121 已提交
955
}
D
dapan1121 已提交
956

D
dapan1121 已提交
957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat) {
  if (NULL == handle || NULL == qWorkerMgmt || NULL == pStat) {
    QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
  SDataSinkStat sinkStat = {0};
  
  dsDataSinkGetCacheSize(&sinkStat);
  pStat->cacheDataSize = sinkStat.cachedSize;
  
  pStat->queryProcessed = QW_STAT_GET(mgmt->stat.msgStat.queryProcessed);
  pStat->cqueryProcessed = QW_STAT_GET(mgmt->stat.msgStat.cqueryProcessed);
  pStat->fetchProcessed = QW_STAT_GET(mgmt->stat.msgStat.fetchProcessed);
  pStat->dropProcessed = QW_STAT_GET(mgmt->stat.msgStat.dropProcessed);
  pStat->hbProcessed = QW_STAT_GET(mgmt->stat.msgStat.hbProcessed);

  pStat->numOfQueryInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, QUERY_QUEUE);
  pStat->numOfFetchInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, FETCH_QUEUE);
  pStat->timeInQueryQueue = qwGetTimeInQueue((SQWorker *)qWorkerMgmt, QUERY_QUEUE);
  pStat->timeInFetchQueue = qwGetTimeInQueue((SQWorker *)qWorkerMgmt, FETCH_QUEUE);

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
980 981 982
}


dengyihao's avatar
dengyihao 已提交
983