qwMsg.c 17.8 KB
Newer Older
D
dapan1121 已提交
1
#include "qwMsg.h"
L
Liu Jicong 已提交
2
#include "dataSinkMgt.h"
D
dapan1121 已提交
3 4 5
#include "executor.h"
#include "planner.h"
#include "query.h"
L
Liu Jicong 已提交
6
#include "qworker.h"
D
dapan1121 已提交
7
#include "qwInt.h"
L
Liu Jicong 已提交
8
#include "tcommon.h"
D
dapan1121 已提交
9 10 11 12 13
#include "tmsg.h"
#include "tname.h"

int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
  int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
L
Liu Jicong 已提交
14

D
dapan1121 已提交
15 16 17 18 19 20 21 22 23 24 25 26 27
  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(msgSize);
  if (NULL == pRsp) {
    qError("rpcMallocCont %d failed", msgSize);
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  memset(pRsp, 0, sizeof(SRetrieveTableRsp));

  *rsp = pRsp;

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
28
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) {
D
dapan1121 已提交
29
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
L
Liu Jicong 已提交
30

D
dapan1121 已提交
31
  rsp->useconds = htobe64(input->useconds);
D
dapan1121 已提交
32
  rsp->completed = qComplete;
D
dapan1121 已提交
33 34 35 36
  rsp->precision = input->precision;
  rsp->compressed = input->compressed;
  rsp->compLen = htonl(len);
  rsp->numOfRows = htonl(input->numOfRows);
37
  rsp->numOfCols = htonl(input->numOfCols);
D
dapan1121 已提交
38 39 40
}

void qwFreeFetchRsp(void *msg) {
D
dapan 已提交
41 42 43
  if (msg) {
    rpcFreeCont(msg);
  }
D
dapan1121 已提交
44 45
}

D
dapan1121 已提交
46
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo) {
D
dapan1121 已提交
47
  SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
D
dapan1121 已提交
48
  pRsp->code = code;
49 50 51 52 53
  if (tbInfo) {
    strcpy(pRsp->tbFName, tbInfo->tbFName);
    pRsp->sversion = tbInfo->sversion;
    pRsp->tversion = tbInfo->tversion;
  }
D
dapan1121 已提交
54 55

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
56
      .msgType = rspType,
L
Liu Jicong 已提交
57 58 59
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
60
      .info = *pConn,
D
dapan1121 已提交
61 62
  };

S
shm  
Shengliang Guan 已提交
63
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
64 65 66 67

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
68
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num) {
D
dapan1121 已提交
69 70 71
  SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo};

  int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
dengyihao's avatar
dengyihao 已提交
72
  void *  pRsp = rpcMallocCont(contLen);
D
dapan1121 已提交
73 74 75
  tSerializeSExplainRsp(pRsp, contLen, &rsp);

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
76
      .msgType = TDMT_SCH_EXPLAIN_RSP,
L
Liu Jicong 已提交
77 78 79
      .pCont = pRsp,
      .contLen = contLen,
      .code = 0,
80
      .info = *pConn,
D
dapan1121 已提交
81
  };
82
  rpcRsp.info.ahandle = NULL;
D
dapan1121 已提交
83 84 85 86 87 88

  tmsgSendRsp(&rpcRsp);

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
89
int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
D
dapan1121 已提交
90
  int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
dengyihao's avatar
dengyihao 已提交
91
  void *  pRsp = rpcMallocCont(contLen);
D
dapan1121 已提交
92
  tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
D
dapan1121 已提交
93 94

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
95
      .msgType = TDMT_SCH_QUERY_HEARTBEAT_RSP,
L
Liu Jicong 已提交
96
      .contLen = contLen,
97
      .pCont = pRsp,
L
Liu Jicong 已提交
98
      .code = code,
99
      .info = *pConn,
D
dapan1121 已提交
100 101
  };

S
shm  
Shengliang Guan 已提交
102
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
103 104 105 106

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
107
int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
D
dapan1121 已提交
108 109 110 111 112 113 114
  if (NULL == pRsp) {
    pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRsp, 0, sizeof(SRetrieveTableRsp));
    dataLength = 0;
  }

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
115
      .msgType = rspType,
L
Liu Jicong 已提交
116 117 118
      .pCont = pRsp,
      .contLen = sizeof(*pRsp) + dataLength,
      .code = code,
119
      .info = *pConn,
D
dapan1121 已提交
120 121
  };

S
shm  
Shengliang Guan 已提交
122
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
123 124 125 126

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
127
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
D
dapan1121 已提交
128 129 130 131
  STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
132
      .msgType = TDMT_SCH_CANCEL_TASK_RSP,
L
Liu Jicong 已提交
133 134 135
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
136
      .info = *pConn,
D
dapan1121 已提交
137 138
  };

S
shm  
Shengliang Guan 已提交
139
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
140 141 142
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
143
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
D
dapan1121 已提交
144 145 146 147
  STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
148
      .msgType = TDMT_SCH_DROP_TASK_RSP,
L
Liu Jicong 已提交
149 150 151
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
152
      .info = *pConn,
D
dapan1121 已提交
153 154
  };

S
shm  
Shengliang Guan 已提交
155
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
156 157 158
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
  STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  req->header.vgId = mgmt->nodeId;
  req->sId = sId;
  req->queryId = qId;
  req->taskId = tId;
  req->refId = rId;
  req->execId = eId;

  SRpcMsg pNewMsg = {
      .msgType = TDMT_SCH_DROP_TASK,
      .pCont = req,
      .contLen = sizeof(STaskDropReq),
      .code = 0,
      .info = *pConn,
  };

  int32_t code = tmsgPutToQueue(&mgmt->msgCb, FETCH_QUEUE, &pNewMsg);
  if (TSDB_CODE_SUCCESS != code) {
    QW_SCH_TASK_ELOG("put drop task msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
    rpcFreeCont(req);
    QW_ERR_RET(code);
  }

  QW_SCH_TASK_DLOG("drop task msg put to queue, vgId:%d", mgmt->nodeId);

  return TSDB_CODE_SUCCESS;
}


S
Shengliang Guan 已提交
194
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
L
Liu Jicong 已提交
195
  SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
D
dapan1121 已提交
196 197 198 199 200 201 202 203 204
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  req->header.vgId = mgmt->nodeId;
  req->sId = sId;
  req->queryId = qId;
  req->taskId = tId;
D
dapan1121 已提交
205
  req->execId = eId;
D
dapan1121 已提交
206 207

  SRpcMsg pNewMsg = {
D
dapan1121 已提交
208
      .msgType = TDMT_SCH_QUERY_CONTINUE,
L
Liu Jicong 已提交
209 210 211
      .pCont = req,
      .contLen = sizeof(SQueryContinueReq),
      .code = 0,
212
      .info = *pConn,
D
dapan1121 已提交
213 214
  };

S
Shengliang Guan 已提交
215
  int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg);
D
dapan1121 已提交
216
  if (TSDB_CODE_SUCCESS != code) {
H
Haojun Liao 已提交
217
    QW_SCH_TASK_ELOG("put query continue msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
D
dapan1121 已提交
218 219 220 221
    rpcFreeCont(req);
    QW_ERR_RET(code);
  }

D
dapan1121 已提交
222
  QW_SCH_TASK_DLOG("query continue msg put to queue, vgId:%d", mgmt->nodeId);
D
dapan1121 已提交
223 224 225 226

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
227
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
L
Liu Jicong 已提交
228
  STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
D
dapan1121 已提交
229 230 231
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
L
Liu Jicong 已提交
232
  }
D
dapan1121 已提交
233

D
dapan1121 已提交
234 235 236 237 238
  req->header.vgId = htonl(mgmt->nodeId);
  req->sId = htobe64(sId);
  req->queryId = htobe64(qId);
  req->taskId = htobe64(tId);
  req->refId = htobe64(rId);
L
Liu Jicong 已提交
239

S
Shengliang Guan 已提交
240
  SRpcMsg brokenMsg = {
D
dapan1121 已提交
241
      .msgType = TDMT_SCH_DROP_TASK,
L
Liu Jicong 已提交
242 243
      .pCont = req,
      .contLen = sizeof(STaskDropReq),
D
dapan1121 已提交
244
      .code = TSDB_CODE_RPC_BROKEN_LINK,
245
      .info = *pConn,
D
dapan1121 已提交
246
  };
L
Liu Jicong 已提交
247

S
Shengliang Guan 已提交
248
  tmsgRegisterBrokenLinkArg(&brokenMsg);
D
dapan1121 已提交
249 250 251 252

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
253
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn) {
D
dapan1121 已提交
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
  SSchedulerHbReq req = {0};
  req.header.vgId = mgmt->nodeId;
  req.sId = sId;

  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
  if (msgSize < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  void *msg = rpcMallocCont(msgSize);
  if (NULL == msg) {
    QW_SCH_ELOG("calloc %d failed", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    taosMemoryFree(msg);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
273

S
Shengliang Guan 已提交
274
  SRpcMsg brokenMsg = {
D
dapan1121 已提交
275
      .msgType = TDMT_SCH_QUERY_HEARTBEAT,
L
Liu Jicong 已提交
276 277
      .pCont = msg,
      .contLen = msgSize,
D
dapan1121 已提交
278
      .code = TSDB_CODE_RPC_BROKEN_LINK,
279
      .info = *pConn,
D
dapan1121 已提交
280
  };
L
Liu Jicong 已提交
281

S
Shengliang Guan 已提交
282
  tmsgRegisterBrokenLinkArg(&brokenMsg);
D
dapan1121 已提交
283 284 285 286

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t       code = 0;
  SSubQueryMsg *msg = pMsg->pCont;
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;

  if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
    QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
305
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
306 307 308 309 310 311 312
  msg->phyLen = ntohl(msg->phyLen);
  msg->sqlLen = ntohl(msg->sqlLen);

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;
D
dapan1121 已提交
313
  int32_t  eId = msg->execId;
D
dapan1121 已提交
314

D
dapan1121 已提交
315
  SQWMsg qwMsg = {.msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
D
dapan1121 已提交
316 317 318 319 320 321 322 323

  QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p", pMsg->info.handle);
  QW_ERR_RET(qwPrerocessQuery(QW_FPARAMS(), &qwMsg));
  QW_SCH_TASK_DLOG("prerocessQuery end, handle:%p", pMsg->info.handle);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
324 325 326 327 328 329 330 331 332 333 334 335
int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SSubQueryMsg *msg = pMsg->pCont;
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;
D
dapan1121 已提交
336
  int32_t  eId = msg->execId;
D
dapan1121 已提交
337 338 339 340 341 342 343 344

  QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle);
  qwAbortPrerocessQuery(QW_FPARAMS());
  QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
345
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
346 347 348 349
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

L
Liu Jicong 已提交
350
  int32_t       code = 0;
D
dapan1121 已提交
351
  SSubQueryMsg *msg = pMsg->pCont;
dengyihao's avatar
dengyihao 已提交
352
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
353

D
dapan1121 已提交
354 355
  qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1);
D
dapan1121 已提交
356

D
dapan1121 已提交
357
  if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
D
dapan1121 已提交
358
    QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
359
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
360 361 362 363 364
  }

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
365
  int64_t  rId = msg->refId;
D
dapan1121 已提交
366
  int32_t  eId = msg->execId;
D
dapan1121 已提交
367

D
dapan1121 已提交
368
  SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info, .msgType = pMsg->msgType};
dengyihao's avatar
dengyihao 已提交
369
  char * sql = strndup(msg->msg, msg->sqlLen);
D
dapan1121 已提交
370
  QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, sql:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql);
371

372
  QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain, sql));
D
dapan1121 已提交
373
  QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
374

L
Liu Jicong 已提交
375
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
376 377
}

D
dapan1121 已提交
378
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
L
Liu Jicong 已提交
379 380 381
  int32_t            code = 0;
  int8_t             status = 0;
  bool               queryDone = false;
382
  SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
L
Liu Jicong 已提交
383
  bool               needStop = false;
dengyihao's avatar
dengyihao 已提交
384 385
  SQWTaskCtx *       handles = NULL;
  SQWorker *         mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
386

D
dapan1121 已提交
387 388
  qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1);
D
dapan1121 已提交
389

D
dapan1121 已提交
390
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
391
    QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
392
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
393 394
  }

395 396 397
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
L
Liu Jicong 已提交
398
  int64_t  rId = 0;
D
dapan1121 已提交
399
  int32_t  eId = msg->execId;
D
dapan1121 已提交
400

401
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
D
dapan1121 已提交
402

S
Shengliang Guan 已提交
403
  QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->info.handle);
404 405 406

  QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
407
  QW_SCH_TASK_DLOG("processCQuery end, node:%p", node);
408

L
Liu Jicong 已提交
409
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
410 411
}

D
dapan1121 已提交
412
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
413 414 415 416 417
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  SResFetchReq *msg = pMsg->pCont;
dengyihao's avatar
dengyihao 已提交
418
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
419

D
dapan1121 已提交
420 421
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1);
D
dapan1121 已提交
422

D
dapan1121 已提交
423
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
L
Liu Jicong 已提交
424
    QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
425
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
426
  }
D
dapan1121 已提交
427

D
dapan1121 已提交
428 429 430
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
431
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
432 433 434 435

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
L
Liu Jicong 已提交
436
  int64_t  rId = 0;
D
dapan1121 已提交
437
  int32_t  eId = msg->execId;
D
dapan1121 已提交
438

D
dapan1121 已提交
439
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info, .msgType = pMsg->msgType};
D
dapan1121 已提交
440

S
Shengliang Guan 已提交
441
  QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
442 443 444

  QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
445
  QW_SCH_TASK_DLOG("processFetch end, node:%p", node);
446

L
Liu Jicong 已提交
447
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
448 449
}

D
dapan1121 已提交
450 451
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
  SQWorker *      mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
452 453 454 455
  if (mgmt) {
    qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
    QW_STAT_INC(mgmt->stat.msgStat.fetchRspProcessed, 1);
  }
D
dapan1121 已提交
456

S
Shengliang Guan 已提交
457
  qProcessFetchRsp(NULL, pMsg, NULL);
458
  pMsg->pCont = NULL;
S
Shengliang 已提交
459 460 461
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
462
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
463 464 465 466
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

dengyihao's avatar
dengyihao 已提交
467
  SQWorker *      mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
468
  int32_t         code = 0;
D
dapan1121 已提交
469
  STaskCancelReq *msg = pMsg->pCont;
D
dapan1121 已提交
470

D
dapan1121 已提交
471 472
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1);
D
dapan1121 已提交
473

D
dapan1121 已提交
474
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
L
Liu Jicong 已提交
475
    qError("invalid task cancel msg");
D
dapan1121 已提交
476
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
477
  }
D
dapan1121 已提交
478

D
dapan1121 已提交
479 480 481 482
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
483
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
484

D
dapan1121 已提交
485 486 487 488
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;
D
dapan1121 已提交
489
  int32_t  eId = msg->execId;
D
dapan1121 已提交
490

491
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
D
dapan1121 已提交
492

L
Liu Jicong 已提交
493
  // QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
D
dapan1121 已提交
494 495 496

_return:

S
shm  
Shengliang Guan 已提交
497
  QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code));
D
dapan1121 已提交
498
  QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
499 500 501 502

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
503
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
504 505 506 507
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

L
Liu Jicong 已提交
508
  int32_t       code = 0;
D
dapan1121 已提交
509
  STaskDropReq *msg = pMsg->pCont;
dengyihao's avatar
dengyihao 已提交
510
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
511

D
dapan1121 已提交
512 513
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1);
D
dapan1121 已提交
514

D
dapan1121 已提交
515
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
516
    QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
517
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
518
  }
D
dapan1121 已提交
519

D
dapan1121 已提交
520 521 522
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
523
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
524
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
525 526 527 528

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
529
  int64_t  rId = msg->refId;
D
dapan1121 已提交
530
  int32_t  eId = msg->execId;
D
dapan1121 已提交
531

532
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
D
dapan1121 已提交
533

D
dapan1121 已提交
534
  if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) {
L
Liu Jicong 已提交
535
    QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));
D
dapan1121 已提交
536
  }
D
dapan1121 已提交
537

S
Shengliang Guan 已提交
538
  QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle);
539 540 541

  QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
542
  QW_SCH_TASK_DLOG("processDrop end, node:%p", node);
543 544

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
545 546
}

D
dapan1121 已提交
547
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
548 549 550 551
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

L
Liu Jicong 已提交
552
  int32_t         code = 0;
D
dapan1121 已提交
553
  SSchedulerHbReq req = {0};
dengyihao's avatar
dengyihao 已提交
554
  SQWorker *      mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
555

D
dapan1121 已提交
556 557
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1);
D
dapan1121 已提交
558

D
dapan1121 已提交
559 560 561
  if (NULL == pMsg->pCont) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
562
  }
D
dapan1121 已提交
563 564 565 566 567 568 569 570

  if (tDeserializeSSchedulerHbReq(pMsg->pCont, pMsg->contLen, &req)) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    tFreeSSchedulerHbReq(&req);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  uint64_t sId = req.sId;
571
  SQWMsg   qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
D
dapan1121 已提交
572
  if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) {
L
Liu Jicong 已提交
573
    QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));
D
dapan1121 已提交
574 575
  }

S
Shengliang Guan 已提交
576
  QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle);
D
dapan1121 已提交
577 578 579 580 581 582 583

  QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));

  QW_SCH_DLOG("processHb end, node:%p", node);

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
584 585


586 587
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
D
dapan1121 已提交
588 589 590 591 592 593 594 595 596 597 598 599 600 601 602
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t       code = 0;
  SVDeleteReq req = {0};
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;

  QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1);

  tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req);
  
  uint64_t sId = req.sId;
  uint64_t qId = req.queryId;
  uint64_t tId = req.taskId;
  int64_t  rId = 0;
D
dapan1121 已提交
603
  int32_t  eId = -1;
D
dapan1121 已提交
604 605 606 607 608

  SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info};
  QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql);
  taosMemoryFreeClear(req.sql);

609
  QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRes));
D
dapan1121 已提交
610 611 612 613 614 615 616 617 618

  QW_SCH_TASK_DLOG("processDelete end, node:%p", node);

_return:

  QW_RET(code);
}