qwMsg.c 18.7 KB
Newer Older
D
dapan1121 已提交
1
#include "qwMsg.h"
L
Liu Jicong 已提交
2
#include "dataSinkMgt.h"
D
dapan1121 已提交
3 4 5
#include "executor.h"
#include "planner.h"
#include "query.h"
D
dapan1121 已提交
6
#include "qwInt.h"
H
Hongze Cheng 已提交
7
#include "qworker.h"
L
Liu Jicong 已提交
8
#include "tcommon.h"
D
dapan1121 已提交
9 10 11
#include "tmsg.h"
#include "tname.h"

D
dapan1121 已提交
12
int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp) {
D
dapan1121 已提交
13
  int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
L
Liu Jicong 已提交
14

H
Hongze Cheng 已提交
15 16
  SRetrieveTableRsp *pRsp =
      (SRetrieveTableRsp *)(rpcMalloc ? rpcReallocCont(*rsp, msgSize) : taosMemoryRealloc(*rsp, msgSize));
D
dapan1121 已提交
17 18 19 20 21
  if (NULL == pRsp) {
    qError("rpcMallocCont %d failed", msgSize);
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

22 23 24
  if (NULL == *rsp) {
    memset(pRsp, 0, sizeof(SRetrieveTableRsp));
  }
H
Hongze Cheng 已提交
25

D
dapan1121 已提交
26 27 28 29 30
  *rsp = pRsp;

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
31
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) {
D
dapan1121 已提交
32
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
L
Liu Jicong 已提交
33

D
dapan1121 已提交
34
  rsp->useconds = htobe64(input->useconds);
D
dapan1121 已提交
35
  rsp->completed = qComplete;
D
dapan1121 已提交
36 37 38 39
  rsp->precision = input->precision;
  rsp->compressed = input->compressed;
  rsp->compLen = htonl(len);
  rsp->numOfRows = htonl(input->numOfRows);
40
  rsp->numOfCols = htonl(input->numOfCols);
41
  rsp->numOfBlocks = htonl(input->numOfBlocks);
D
dapan1121 已提交
42 43 44
}

void qwFreeFetchRsp(void *msg) {
D
dapan 已提交
45 46 47
  if (msg) {
    rpcFreeCont(msg);
  }
D
dapan1121 已提交
48 49
}

D
dapan1121 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63
int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code) {
  SRpcMsg rpcRsp = {
      .msgType = rspType,
      .pCont = NULL,
      .contLen = 0,
      .code = code,
      .info = *pConn,
  };

  tmsgSendRsp(&rpcRsp);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
64
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) {
H
Hongze Cheng 已提交
65 66
  STbVerInfo     *tbInfo = ctx ? &ctx->tbInfo : NULL;
  int64_t         affectedRows = ctx ? ctx->affectedRows : 0;
D
dapan1121 已提交
67
  SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
D
dapan1121 已提交
68 69
  pRsp->code = htonl(code);
  pRsp->affectedRows = htobe64(affectedRows);
70 71
  if (tbInfo) {
    strcpy(pRsp->tbFName, tbInfo->tbFName);
D
dapan1121 已提交
72 73
    pRsp->sversion = htonl(tbInfo->sversion);
    pRsp->tversion = htonl(tbInfo->tversion);
74
  }
D
dapan1121 已提交
75 76

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
77
      .msgType = rspType,
L
Liu Jicong 已提交
78 79 80
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
81
      .info = *pConn,
D
dapan1121 已提交
82 83
  };

S
shm  
Shengliang Guan 已提交
84
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
85 86 87 88

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
89 90 91
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray *pExecList) {
  SExplainExecInfo *pInfo = taosArrayGet(pExecList, 0);
  SExplainRsp       rsp = {.numOfPlans = taosArrayGetSize(pExecList), .subplanInfo = pInfo};
D
dapan1121 已提交
92 93

  int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
H
Hongze Cheng 已提交
94
  void   *pRsp = rpcMallocCont(contLen);
D
dapan1121 已提交
95 96 97
  tSerializeSExplainRsp(pRsp, contLen, &rsp);

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
98
      .msgType = TDMT_SCH_EXPLAIN_RSP,
L
Liu Jicong 已提交
99 100 101
      .pCont = pRsp,
      .contLen = contLen,
      .code = 0,
102
      .info = *pConn,
D
dapan1121 已提交
103 104
  };

H
Haojun Liao 已提交
105
  rpcRsp.info.ahandle = NULL;
D
dapan1121 已提交
106 107 108 109
  tmsgSendRsp(&rpcRsp);
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
110
int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
D
dapan1121 已提交
111
  int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
H
Hongze Cheng 已提交
112
  void   *pRsp = rpcMallocCont(contLen);
D
dapan1121 已提交
113
  tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
D
dapan1121 已提交
114 115

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
116
      .msgType = TDMT_SCH_QUERY_HEARTBEAT_RSP,
L
Liu Jicong 已提交
117
      .contLen = contLen,
118
      .pCont = pRsp,
L
Liu Jicong 已提交
119
      .code = code,
120
      .info = *pConn,
D
dapan1121 已提交
121 122
  };

S
shm  
Shengliang Guan 已提交
123
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
124 125 126 127

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
128 129
int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength,
                               int32_t code) {
D
dapan1121 已提交
130 131 132 133 134 135 136
  if (NULL == pRsp) {
    pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRsp, 0, sizeof(SRetrieveTableRsp));
    dataLength = 0;
  }

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
137
      .msgType = rspType,
L
Liu Jicong 已提交
138 139 140
      .pCont = pRsp,
      .contLen = sizeof(*pRsp) + dataLength,
      .code = code,
141
      .info = *pConn,
D
dapan1121 已提交
142 143
  };

S
shm  
Shengliang Guan 已提交
144
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
145 146 147 148

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
149
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
D
dapan1121 已提交
150 151 152 153
  STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
154
      .msgType = TDMT_SCH_CANCEL_TASK_RSP,
L
Liu Jicong 已提交
155 156 157
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
158
      .info = *pConn,
D
dapan1121 已提交
159 160
  };

S
shm  
Shengliang Guan 已提交
161
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
162 163 164
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
165
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
D
dapan1121 已提交
166 167 168 169
  STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
170
      .msgType = TDMT_SCH_DROP_TASK_RSP,
L
Liu Jicong 已提交
171 172 173
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
174
      .info = *pConn,
D
dapan1121 已提交
175 176
  };

S
shm  
Shengliang Guan 已提交
177
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
178 179 180
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
  STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  req->header.vgId = mgmt->nodeId;
  req->sId = sId;
  req->queryId = qId;
  req->taskId = tId;
  req->refId = rId;
  req->execId = eId;

  SRpcMsg pNewMsg = {
      .msgType = TDMT_SCH_DROP_TASK,
      .pCont = req,
      .contLen = sizeof(STaskDropReq),
      .code = 0,
      .info = *pConn,
  };

  int32_t code = tmsgPutToQueue(&mgmt->msgCb, FETCH_QUEUE, &pNewMsg);
  if (TSDB_CODE_SUCCESS != code) {
    QW_SCH_TASK_ELOG("put drop task msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
    QW_ERR_RET(code);
  }

  QW_SCH_TASK_DLOG("drop task msg put to queue, vgId:%d", mgmt->nodeId);

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
214
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
L
Liu Jicong 已提交
215
  SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
D
dapan1121 已提交
216 217 218 219 220 221 222 223 224
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  req->header.vgId = mgmt->nodeId;
  req->sId = sId;
  req->queryId = qId;
  req->taskId = tId;
D
dapan1121 已提交
225
  req->execId = eId;
D
dapan1121 已提交
226 227

  SRpcMsg pNewMsg = {
D
dapan1121 已提交
228
      .msgType = TDMT_SCH_QUERY_CONTINUE,
L
Liu Jicong 已提交
229 230 231
      .pCont = req,
      .contLen = sizeof(SQueryContinueReq),
      .code = 0,
232
      .info = *pConn,
D
dapan1121 已提交
233 234
  };

S
Shengliang Guan 已提交
235
  int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg);
D
dapan1121 已提交
236
  if (TSDB_CODE_SUCCESS != code) {
H
Haojun Liao 已提交
237
    QW_SCH_TASK_ELOG("put query continue msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
D
dapan1121 已提交
238 239 240
    QW_ERR_RET(code);
  }

D
dapan1121 已提交
241
  QW_SCH_TASK_DLOG("query continue msg put to queue, vgId:%d", mgmt->nodeId);
D
dapan1121 已提交
242 243 244 245

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
246
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
L
Liu Jicong 已提交
247
  STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
D
dapan1121 已提交
248 249 250
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
L
Liu Jicong 已提交
251
  }
D
dapan1121 已提交
252

D
dapan1121 已提交
253 254 255 256 257
  req->header.vgId = htonl(mgmt->nodeId);
  req->sId = htobe64(sId);
  req->queryId = htobe64(qId);
  req->taskId = htobe64(tId);
  req->refId = htobe64(rId);
L
Liu Jicong 已提交
258

S
Shengliang Guan 已提交
259
  SRpcMsg brokenMsg = {
D
dapan1121 已提交
260
      .msgType = TDMT_SCH_DROP_TASK,
L
Liu Jicong 已提交
261 262
      .pCont = req,
      .contLen = sizeof(STaskDropReq),
D
dapan1121 已提交
263
      .code = TSDB_CODE_RPC_BROKEN_LINK,
264
      .info = *pConn,
D
dapan1121 已提交
265
  };
L
Liu Jicong 已提交
266

S
Shengliang Guan 已提交
267
  tmsgRegisterBrokenLinkArg(&brokenMsg);
D
dapan1121 已提交
268 269 270 271

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
272
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn) {
D
dapan1121 已提交
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
  SSchedulerHbReq req = {0};
  req.header.vgId = mgmt->nodeId;
  req.sId = sId;

  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
  if (msgSize < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  void *msg = rpcMallocCont(msgSize);
  if (NULL == msg) {
    QW_SCH_ELOG("calloc %d failed", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    taosMemoryFree(msg);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
292

S
Shengliang Guan 已提交
293
  SRpcMsg brokenMsg = {
D
dapan1121 已提交
294
      .msgType = TDMT_SCH_QUERY_HEARTBEAT,
L
Liu Jicong 已提交
295 296
      .pCont = msg,
      .contLen = msgSize,
D
dapan1121 已提交
297
      .code = TSDB_CODE_RPC_BROKEN_LINK,
298
      .info = *pConn,
D
dapan1121 已提交
299
  };
L
Liu Jicong 已提交
300

S
Shengliang Guan 已提交
301
  tmsgRegisterBrokenLinkArg(&brokenMsg);
D
dapan1121 已提交
302 303 304 305

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
306 307 308 309 310 311 312
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t       code = 0;
  SSubQueryMsg *msg = pMsg->pCont;
H
Hongze Cheng 已提交
313
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
314 315 316 317 318 319 320 321 322 323

  if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
    QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
324
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
325 326 327 328 329 330 331
  msg->phyLen = ntohl(msg->phyLen);
  msg->sqlLen = ntohl(msg->sqlLen);

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;
D
dapan1121 已提交
332
  int32_t  eId = msg->execId;
D
dapan1121 已提交
333

H
Hongze Cheng 已提交
334 335
  SQWMsg qwMsg = {
      .msgType = pMsg->msgType, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
D
dapan1121 已提交
336 337

  QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p", pMsg->info.handle);
D
dapan1121 已提交
338
  QW_ERR_RET(qwPreprocessQuery(QW_FPARAMS(), &qwMsg));
D
dapan1121 已提交
339 340 341 342 343
  QW_SCH_TASK_DLOG("prerocessQuery end, handle:%p", pMsg->info.handle);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
344 345 346 347 348 349
int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SSubQueryMsg *msg = pMsg->pCont;
H
Hongze Cheng 已提交
350
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
351 352 353 354 355

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;
D
dapan1121 已提交
356
  int32_t  eId = msg->execId;
D
dapan1121 已提交
357 358 359 360 361 362 363 364

  QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle);
  qwAbortPrerocessQuery(QW_FPARAMS());
  QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
365
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
366 367 368 369
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

L
Liu Jicong 已提交
370
  int32_t       code = 0;
D
dapan1121 已提交
371
  SSubQueryMsg *msg = pMsg->pCont;
H
Hongze Cheng 已提交
372
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
373

D
dapan1121 已提交
374 375
  qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1);
D
dapan1121 已提交
376

D
dapan1121 已提交
377
  if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
D
dapan1121 已提交
378
    QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
379
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
380 381 382 383 384
  }

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
385
  int64_t  rId = msg->refId;
D
dapan1121 已提交
386
  int32_t  eId = msg->execId;
D
dapan1121 已提交
387

H
Hongze Cheng 已提交
388 389 390 391 392
  SQWMsg qwMsg = {.node = node,
                  .msg = msg->msg + msg->sqlLen,
                  .msgLen = msg->phyLen,
                  .connInfo = pMsg->info,
                  .msgType = pMsg->msgType};
D
dapan1121 已提交
393 394 395
  qwMsg.msgInfo.explain = msg->explain;
  qwMsg.msgInfo.taskType = msg->taskType;
  qwMsg.msgInfo.needFetch = msg->needFetch;
H
Hongze Cheng 已提交
396 397 398 399

  char *sql = strndup(msg->msg, msg->sqlLen);
  QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType),
                   pMsg->info.handle, sql);
D
dapan1121 已提交
400
  QW_ERR_JRET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql));
401

D
dapan1121 已提交
402 403
_return:

D
dapan1121 已提交
404 405
  QW_SCH_TASK_DLOG("processQuery end, node:%p, code:%d", node, code);

D
dapan1121 已提交
406
  return code;
D
dapan1121 已提交
407 408
}

D
dapan1121 已提交
409
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
L
Liu Jicong 已提交
410 411 412
  int32_t            code = 0;
  int8_t             status = 0;
  bool               queryDone = false;
413
  SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
L
Liu Jicong 已提交
414
  bool               needStop = false;
H
Hongze Cheng 已提交
415 416
  SQWTaskCtx        *handles = NULL;
  SQWorker          *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
417

D
dapan1121 已提交
418 419
  qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1);
D
dapan1121 已提交
420

D
dapan1121 已提交
421
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
422
    QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
423
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
424 425
  }

426 427 428
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
L
Liu Jicong 已提交
429
  int64_t  rId = 0;
D
dapan1121 已提交
430
  int32_t  eId = msg->execId;
D
dapan1121 已提交
431

432
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
D
dapan1121 已提交
433

S
Shengliang Guan 已提交
434
  QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->info.handle);
435 436 437

  QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
438
  QW_SCH_TASK_DLOG("processCQuery end, node:%p", node);
439

L
Liu Jicong 已提交
440
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
441 442
}

D
dapan1121 已提交
443
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
444 445 446 447 448
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  SResFetchReq *msg = pMsg->pCont;
H
Hongze Cheng 已提交
449
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
450

D
dapan1121 已提交
451 452
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1);
D
dapan1121 已提交
453

D
dapan1121 已提交
454
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
L
Liu Jicong 已提交
455
    QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
456
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
457
  }
D
dapan1121 已提交
458

D
dapan1121 已提交
459 460 461
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
462
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
463 464 465 466

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
L
Liu Jicong 已提交
467
  int64_t  rId = 0;
D
dapan1121 已提交
468
  int32_t  eId = msg->execId;
D
dapan1121 已提交
469

D
dapan1121 已提交
470
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info, .msgType = pMsg->msgType};
D
dapan1121 已提交
471

S
Shengliang Guan 已提交
472
  QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
473 474 475

  QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
476
  QW_SCH_TASK_DLOG("processFetch end, node:%p", node);
477

L
Liu Jicong 已提交
478
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
479 480
}

D
dapan1121 已提交
481
int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
H
Hongze Cheng 已提交
482
  SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
483 484
  if (mgmt) {
    qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
D
dapan1121 已提交
485
    QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1);
D
dapan1121 已提交
486
  }
D
dapan1121 已提交
487

D
dapan1121 已提交
488
  qProcessRspMsg(NULL, pMsg, NULL);
489
  pMsg->pCont = NULL;
S
Shengliang 已提交
490 491 492
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
493
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
494 495 496 497
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

H
Hongze Cheng 已提交
498
  SQWorker       *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
499
  int32_t         code = 0;
D
dapan1121 已提交
500
  STaskCancelReq *msg = pMsg->pCont;
D
dapan1121 已提交
501

D
dapan1121 已提交
502 503
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1);
D
dapan1121 已提交
504

D
dapan1121 已提交
505
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
L
Liu Jicong 已提交
506
    qError("invalid task cancel msg");
D
dapan1121 已提交
507
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
508
  }
D
dapan1121 已提交
509

D
dapan1121 已提交
510 511 512 513
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
514
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
515

D
dapan1121 已提交
516 517 518 519
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;
D
dapan1121 已提交
520
  int32_t  eId = msg->execId;
D
dapan1121 已提交
521

522
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
D
dapan1121 已提交
523

L
Liu Jicong 已提交
524
  // QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
D
dapan1121 已提交
525 526 527

_return:

S
shm  
Shengliang Guan 已提交
528
  QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code));
D
dapan1121 已提交
529
  QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
530 531 532 533

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
534
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
535 536 537 538
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

L
Liu Jicong 已提交
539
  int32_t       code = 0;
D
dapan1121 已提交
540
  STaskDropReq *msg = pMsg->pCont;
H
Hongze Cheng 已提交
541
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
542

D
dapan1121 已提交
543 544
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1);
D
dapan1121 已提交
545

D
dapan1121 已提交
546
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
547
    QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
548
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
549
  }
D
dapan1121 已提交
550

D
dapan1121 已提交
551 552 553
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
554
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
555
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
556 557 558 559

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
560
  int64_t  rId = msg->refId;
D
dapan1121 已提交
561
  int32_t  eId = msg->execId;
D
dapan1121 已提交
562

563
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
D
dapan1121 已提交
564

D
dapan1121 已提交
565
  if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) {
L
Liu Jicong 已提交
566
    QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));
D
dapan1121 已提交
567
  }
D
dapan1121 已提交
568

S
Shengliang Guan 已提交
569
  QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle);
570 571 572

  QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
573
  QW_SCH_TASK_DLOG("processDrop end, node:%p", node);
574 575

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
576 577
}

D
dapan1121 已提交
578
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
579 580 581 582
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

L
Liu Jicong 已提交
583
  int32_t         code = 0;
D
dapan1121 已提交
584
  SSchedulerHbReq req = {0};
H
Hongze Cheng 已提交
585
  SQWorker       *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
586

D
dapan1121 已提交
587 588
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1);
D
dapan1121 已提交
589

D
dapan1121 已提交
590 591 592
  if (NULL == pMsg->pCont) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
593
  }
D
dapan1121 已提交
594 595 596 597 598 599 600 601

  if (tDeserializeSSchedulerHbReq(pMsg->pCont, pMsg->contLen, &req)) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    tFreeSSchedulerHbReq(&req);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  uint64_t sId = req.sId;
602
  SQWMsg   qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
D
dapan1121 已提交
603
  if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) {
L
Liu Jicong 已提交
604
    QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));
D
dapan1121 已提交
605 606
  }

S
Shengliang Guan 已提交
607
  QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle);
D
dapan1121 已提交
608 609 610 611 612 613 614

  QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));

  QW_SCH_DLOG("processHb end, node:%p", node);

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
615

616 617
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
D
dapan1121 已提交
618 619 620
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

H
Hongze Cheng 已提交
621
  int32_t     code = 0;
D
dapan1121 已提交
622
  SVDeleteReq req = {0};
H
Hongze Cheng 已提交
623
  SQWorker   *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
624 625 626 627

  QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1);

  tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req);
H
Hongze Cheng 已提交
628

D
dapan1121 已提交
629 630 631 632
  uint64_t sId = req.sId;
  uint64_t qId = req.queryId;
  uint64_t tId = req.taskId;
  int64_t  rId = 0;
D
dapan1121 已提交
633
  int32_t  eId = -1;
D
dapan1121 已提交
634 635 636 637 638

  SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info};
  QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql);
  taosMemoryFreeClear(req.sql);

639
  QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRes));
D
dapan1121 已提交
640

641
  taosMemoryFreeClear(req.msg);
D
dapan1121 已提交
642 643 644 645 646 647
  QW_SCH_TASK_DLOG("processDelete end, node:%p", node);

_return:

  QW_RET(code);
}