qwMsg.c 18.4 KB
Newer Older
D
dapan1121 已提交
1
#include "qwMsg.h"
L
Liu Jicong 已提交
2
#include "dataSinkMgt.h"
D
dapan1121 已提交
3 4 5
#include "executor.h"
#include "planner.h"
#include "query.h"
L
Liu Jicong 已提交
6
#include "qworker.h"
D
dapan1121 已提交
7
#include "qwInt.h"
L
Liu Jicong 已提交
8
#include "tcommon.h"
D
dapan1121 已提交
9 10 11 12 13
#include "tmsg.h"
#include "tname.h"

int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
  int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
L
Liu Jicong 已提交
14

D
dapan1121 已提交
15 16 17 18 19 20 21 22 23 24 25 26 27
  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(msgSize);
  if (NULL == pRsp) {
    qError("rpcMallocCont %d failed", msgSize);
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  memset(pRsp, 0, sizeof(SRetrieveTableRsp));

  *rsp = pRsp;

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
28
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) {
D
dapan1121 已提交
29
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
L
Liu Jicong 已提交
30

D
dapan1121 已提交
31
  rsp->useconds = htobe64(input->useconds);
D
dapan1121 已提交
32
  rsp->completed = qComplete;
D
dapan1121 已提交
33 34 35 36
  rsp->precision = input->precision;
  rsp->compressed = input->compressed;
  rsp->compLen = htonl(len);
  rsp->numOfRows = htonl(input->numOfRows);
37
  rsp->numOfCols = htonl(input->numOfCols);
D
dapan1121 已提交
38 39 40
}

void qwFreeFetchRsp(void *msg) {
D
dapan 已提交
41 42 43
  if (msg) {
    rpcFreeCont(msg);
  }
D
dapan1121 已提交
44 45
}

D
dapan1121 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59
int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code) {
  SRpcMsg rpcRsp = {
      .msgType = rspType,
      .pCont = NULL,
      .contLen = 0,
      .code = code,
      .info = *pConn,
  };

  tmsgSendRsp(&rpcRsp);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
60 61 62
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) {
  STbVerInfo* tbInfo = ctx ? &ctx->tbInfo : NULL;
  int64_t affectedRows = ctx ? ctx->affectedRows : 0;
D
dapan1121 已提交
63
  SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
D
dapan1121 已提交
64 65
  pRsp->code = htonl(code);
  pRsp->affectedRows = htobe64(affectedRows);
66 67
  if (tbInfo) {
    strcpy(pRsp->tbFName, tbInfo->tbFName);
D
dapan1121 已提交
68 69
    pRsp->sversion = htonl(tbInfo->sversion);
    pRsp->tversion = htonl(tbInfo->tversion);
70
  }
D
dapan1121 已提交
71 72

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
73
      .msgType = rspType,
L
Liu Jicong 已提交
74 75 76
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
77
      .info = *pConn,
D
dapan1121 已提交
78 79
  };

S
shm  
Shengliang Guan 已提交
80
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
81 82 83 84

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
85 86 87
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray* pExecList) {
  SExplainExecInfo* pInfo = taosArrayGet(pExecList, 0);
  SExplainRsp rsp = {.numOfPlans = taosArrayGetSize(pExecList), .subplanInfo = pInfo};
D
dapan1121 已提交
88 89

  int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
dengyihao's avatar
dengyihao 已提交
90
  void *  pRsp = rpcMallocCont(contLen);
D
dapan1121 已提交
91 92 93
  tSerializeSExplainRsp(pRsp, contLen, &rsp);

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
94
      .msgType = TDMT_SCH_EXPLAIN_RSP,
L
Liu Jicong 已提交
95 96 97
      .pCont = pRsp,
      .contLen = contLen,
      .code = 0,
98
      .info = *pConn,
D
dapan1121 已提交
99 100
  };

H
Haojun Liao 已提交
101
  rpcRsp.info.ahandle = NULL;
D
dapan1121 已提交
102 103 104 105
  tmsgSendRsp(&rpcRsp);
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
106
int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
D
dapan1121 已提交
107
  int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
dengyihao's avatar
dengyihao 已提交
108
  void *  pRsp = rpcMallocCont(contLen);
D
dapan1121 已提交
109
  tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
D
dapan1121 已提交
110 111

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
112
      .msgType = TDMT_SCH_QUERY_HEARTBEAT_RSP,
L
Liu Jicong 已提交
113
      .contLen = contLen,
114
      .pCont = pRsp,
L
Liu Jicong 已提交
115
      .code = code,
116
      .info = *pConn,
D
dapan1121 已提交
117 118
  };

S
shm  
Shengliang Guan 已提交
119
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
120 121 122 123

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
124
int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
D
dapan1121 已提交
125 126 127 128 129 130 131
  if (NULL == pRsp) {
    pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRsp, 0, sizeof(SRetrieveTableRsp));
    dataLength = 0;
  }

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
132
      .msgType = rspType,
L
Liu Jicong 已提交
133 134 135
      .pCont = pRsp,
      .contLen = sizeof(*pRsp) + dataLength,
      .code = code,
136
      .info = *pConn,
D
dapan1121 已提交
137 138
  };

S
shm  
Shengliang Guan 已提交
139
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
140 141 142 143

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
144
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
D
dapan1121 已提交
145 146 147 148
  STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
149
      .msgType = TDMT_SCH_CANCEL_TASK_RSP,
L
Liu Jicong 已提交
150 151 152
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
153
      .info = *pConn,
D
dapan1121 已提交
154 155
  };

S
shm  
Shengliang Guan 已提交
156
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
157 158 159
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
160
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
D
dapan1121 已提交
161 162 163 164
  STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
165
      .msgType = TDMT_SCH_DROP_TASK_RSP,
L
Liu Jicong 已提交
166 167 168
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
169
      .info = *pConn,
D
dapan1121 已提交
170 171
  };

S
shm  
Shengliang Guan 已提交
172
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
173 174 175
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
  STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  req->header.vgId = mgmt->nodeId;
  req->sId = sId;
  req->queryId = qId;
  req->taskId = tId;
  req->refId = rId;
  req->execId = eId;

  SRpcMsg pNewMsg = {
      .msgType = TDMT_SCH_DROP_TASK,
      .pCont = req,
      .contLen = sizeof(STaskDropReq),
      .code = 0,
      .info = *pConn,
  };

  int32_t code = tmsgPutToQueue(&mgmt->msgCb, FETCH_QUEUE, &pNewMsg);
  if (TSDB_CODE_SUCCESS != code) {
    QW_SCH_TASK_ELOG("put drop task msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
    QW_ERR_RET(code);
  }

  QW_SCH_TASK_DLOG("drop task msg put to queue, vgId:%d", mgmt->nodeId);

  return TSDB_CODE_SUCCESS;
}


S
Shengliang Guan 已提交
210
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
L
Liu Jicong 已提交
211
  SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
D
dapan1121 已提交
212 213 214 215 216 217 218 219 220
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  req->header.vgId = mgmt->nodeId;
  req->sId = sId;
  req->queryId = qId;
  req->taskId = tId;
D
dapan1121 已提交
221
  req->execId = eId;
D
dapan1121 已提交
222 223

  SRpcMsg pNewMsg = {
D
dapan1121 已提交
224
      .msgType = TDMT_SCH_QUERY_CONTINUE,
L
Liu Jicong 已提交
225 226 227
      .pCont = req,
      .contLen = sizeof(SQueryContinueReq),
      .code = 0,
228
      .info = *pConn,
D
dapan1121 已提交
229 230
  };

S
Shengliang Guan 已提交
231
  int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg);
D
dapan1121 已提交
232
  if (TSDB_CODE_SUCCESS != code) {
H
Haojun Liao 已提交
233
    QW_SCH_TASK_ELOG("put query continue msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
D
dapan1121 已提交
234 235 236
    QW_ERR_RET(code);
  }

D
dapan1121 已提交
237
  QW_SCH_TASK_DLOG("query continue msg put to queue, vgId:%d", mgmt->nodeId);
D
dapan1121 已提交
238 239 240 241

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
242
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
L
Liu Jicong 已提交
243
  STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
D
dapan1121 已提交
244 245 246
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
L
Liu Jicong 已提交
247
  }
D
dapan1121 已提交
248

D
dapan1121 已提交
249 250 251 252 253
  req->header.vgId = htonl(mgmt->nodeId);
  req->sId = htobe64(sId);
  req->queryId = htobe64(qId);
  req->taskId = htobe64(tId);
  req->refId = htobe64(rId);
L
Liu Jicong 已提交
254

S
Shengliang Guan 已提交
255
  SRpcMsg brokenMsg = {
D
dapan1121 已提交
256
      .msgType = TDMT_SCH_DROP_TASK,
L
Liu Jicong 已提交
257 258
      .pCont = req,
      .contLen = sizeof(STaskDropReq),
D
dapan1121 已提交
259
      .code = TSDB_CODE_RPC_BROKEN_LINK,
260
      .info = *pConn,
D
dapan1121 已提交
261
  };
L
Liu Jicong 已提交
262

S
Shengliang Guan 已提交
263
  tmsgRegisterBrokenLinkArg(&brokenMsg);
D
dapan1121 已提交
264 265 266 267

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
268
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn) {
D
dapan1121 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
  SSchedulerHbReq req = {0};
  req.header.vgId = mgmt->nodeId;
  req.sId = sId;

  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
  if (msgSize < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  void *msg = rpcMallocCont(msgSize);
  if (NULL == msg) {
    QW_SCH_ELOG("calloc %d failed", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    taosMemoryFree(msg);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
288

S
Shengliang Guan 已提交
289
  SRpcMsg brokenMsg = {
D
dapan1121 已提交
290
      .msgType = TDMT_SCH_QUERY_HEARTBEAT,
L
Liu Jicong 已提交
291 292
      .pCont = msg,
      .contLen = msgSize,
D
dapan1121 已提交
293
      .code = TSDB_CODE_RPC_BROKEN_LINK,
294
      .info = *pConn,
D
dapan1121 已提交
295
  };
L
Liu Jicong 已提交
296

S
Shengliang Guan 已提交
297
  tmsgRegisterBrokenLinkArg(&brokenMsg);
D
dapan1121 已提交
298 299 300 301

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t       code = 0;
  SSubQueryMsg *msg = pMsg->pCont;
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;

  if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
    QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
320
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
321 322 323 324 325 326 327
  msg->phyLen = ntohl(msg->phyLen);
  msg->sqlLen = ntohl(msg->sqlLen);

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;
D
dapan1121 已提交
328
  int32_t  eId = msg->execId;
D
dapan1121 已提交
329

D
dapan1121 已提交
330
  SQWMsg qwMsg = {.msgType = pMsg->msgType, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
D
dapan1121 已提交
331 332

  QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p", pMsg->info.handle);
D
dapan1121 已提交
333
  QW_ERR_RET(qwPreprocessQuery(QW_FPARAMS(), &qwMsg));
D
dapan1121 已提交
334 335 336 337 338
  QW_SCH_TASK_DLOG("prerocessQuery end, handle:%p", pMsg->info.handle);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
339 340 341 342 343 344 345 346 347 348 349 350
int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SSubQueryMsg *msg = pMsg->pCont;
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;
D
dapan1121 已提交
351
  int32_t  eId = msg->execId;
D
dapan1121 已提交
352 353 354 355 356 357 358 359

  QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle);
  qwAbortPrerocessQuery(QW_FPARAMS());
  QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
360
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
361 362 363 364
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

L
Liu Jicong 已提交
365
  int32_t       code = 0;
D
dapan1121 已提交
366
  SSubQueryMsg *msg = pMsg->pCont;
dengyihao's avatar
dengyihao 已提交
367
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
368

D
dapan1121 已提交
369 370
  qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1);
D
dapan1121 已提交
371

D
dapan1121 已提交
372
  if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
D
dapan1121 已提交
373
    QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
374
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
375 376 377 378 379
  }

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
380
  int64_t  rId = msg->refId;
D
dapan1121 已提交
381
  int32_t  eId = msg->execId;
D
dapan1121 已提交
382

D
dapan1121 已提交
383
  SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info, .msgType = pMsg->msgType};
D
dapan1121 已提交
384 385 386 387
  qwMsg.msgInfo.explain = msg->explain;
  qwMsg.msgInfo.taskType = msg->taskType;
  qwMsg.msgInfo.needFetch = msg->needFetch;
  
dengyihao's avatar
dengyihao 已提交
388
  char * sql = strndup(msg->msg, msg->sqlLen);
D
dapan1121 已提交
389
  QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql);
D
dapan1121 已提交
390
  QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql));
D
dapan1121 已提交
391
  QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
392

L
Liu Jicong 已提交
393
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
394 395
}

D
dapan1121 已提交
396
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
L
Liu Jicong 已提交
397 398 399
  int32_t            code = 0;
  int8_t             status = 0;
  bool               queryDone = false;
400
  SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
L
Liu Jicong 已提交
401
  bool               needStop = false;
dengyihao's avatar
dengyihao 已提交
402 403
  SQWTaskCtx *       handles = NULL;
  SQWorker *         mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
404

D
dapan1121 已提交
405 406
  qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1);
D
dapan1121 已提交
407

D
dapan1121 已提交
408
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
409
    QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
410
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
411 412
  }

413 414 415
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
L
Liu Jicong 已提交
416
  int64_t  rId = 0;
D
dapan1121 已提交
417
  int32_t  eId = msg->execId;
D
dapan1121 已提交
418

419
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
D
dapan1121 已提交
420

S
Shengliang Guan 已提交
421
  QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->info.handle);
422 423 424

  QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
425
  QW_SCH_TASK_DLOG("processCQuery end, node:%p", node);
426

L
Liu Jicong 已提交
427
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
428 429
}

D
dapan1121 已提交
430
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
431 432 433 434 435
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  SResFetchReq *msg = pMsg->pCont;
dengyihao's avatar
dengyihao 已提交
436
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
437

D
dapan1121 已提交
438 439
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1);
D
dapan1121 已提交
440

D
dapan1121 已提交
441
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
L
Liu Jicong 已提交
442
    QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
443
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
444
  }
D
dapan1121 已提交
445

D
dapan1121 已提交
446 447 448
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
449
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
450 451 452 453

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
L
Liu Jicong 已提交
454
  int64_t  rId = 0;
D
dapan1121 已提交
455
  int32_t  eId = msg->execId;
D
dapan1121 已提交
456

D
dapan1121 已提交
457
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info, .msgType = pMsg->msgType};
D
dapan1121 已提交
458

S
Shengliang Guan 已提交
459
  QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
460 461 462

  QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
463
  QW_SCH_TASK_DLOG("processFetch end, node:%p", node);
464

L
Liu Jicong 已提交
465
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
466 467
}

D
dapan1121 已提交
468
int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
469
  SQWorker *      mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
470 471
  if (mgmt) {
    qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
D
dapan1121 已提交
472
    QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1);
D
dapan1121 已提交
473
  }
D
dapan1121 已提交
474

D
dapan1121 已提交
475
  qProcessRspMsg(NULL, pMsg, NULL);
476
  pMsg->pCont = NULL;
S
Shengliang 已提交
477 478 479
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
480
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
481 482 483 484
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

dengyihao's avatar
dengyihao 已提交
485
  SQWorker *      mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
486
  int32_t         code = 0;
D
dapan1121 已提交
487
  STaskCancelReq *msg = pMsg->pCont;
D
dapan1121 已提交
488

D
dapan1121 已提交
489 490
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1);
D
dapan1121 已提交
491

D
dapan1121 已提交
492
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
L
Liu Jicong 已提交
493
    qError("invalid task cancel msg");
D
dapan1121 已提交
494
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
495
  }
D
dapan1121 已提交
496

D
dapan1121 已提交
497 498 499 500
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
501
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
502

D
dapan1121 已提交
503 504 505 506
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;
D
dapan1121 已提交
507
  int32_t  eId = msg->execId;
D
dapan1121 已提交
508

509
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
D
dapan1121 已提交
510

L
Liu Jicong 已提交
511
  // QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
D
dapan1121 已提交
512 513 514

_return:

S
shm  
Shengliang Guan 已提交
515
  QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code));
D
dapan1121 已提交
516
  QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
517 518 519 520

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
521
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
522 523 524 525
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

L
Liu Jicong 已提交
526
  int32_t       code = 0;
D
dapan1121 已提交
527
  STaskDropReq *msg = pMsg->pCont;
dengyihao's avatar
dengyihao 已提交
528
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
529

D
dapan1121 已提交
530 531
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1);
D
dapan1121 已提交
532

D
dapan1121 已提交
533
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
534
    QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
535
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
536
  }
D
dapan1121 已提交
537

D
dapan1121 已提交
538 539 540
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
541
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
542
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
543 544 545 546

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
547
  int64_t  rId = msg->refId;
D
dapan1121 已提交
548
  int32_t  eId = msg->execId;
D
dapan1121 已提交
549

550
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
D
dapan1121 已提交
551

D
dapan1121 已提交
552
  if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) {
L
Liu Jicong 已提交
553
    QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));
D
dapan1121 已提交
554
  }
D
dapan1121 已提交
555

S
Shengliang Guan 已提交
556
  QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle);
557 558 559

  QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
560
  QW_SCH_TASK_DLOG("processDrop end, node:%p", node);
561 562

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
563 564
}

D
dapan1121 已提交
565
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
566 567 568 569
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

L
Liu Jicong 已提交
570
  int32_t         code = 0;
D
dapan1121 已提交
571
  SSchedulerHbReq req = {0};
dengyihao's avatar
dengyihao 已提交
572
  SQWorker *      mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
573

D
dapan1121 已提交
574 575
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1);
D
dapan1121 已提交
576

D
dapan1121 已提交
577 578 579
  if (NULL == pMsg->pCont) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
580
  }
D
dapan1121 已提交
581 582 583 584 585 586 587 588

  if (tDeserializeSSchedulerHbReq(pMsg->pCont, pMsg->contLen, &req)) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    tFreeSSchedulerHbReq(&req);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  uint64_t sId = req.sId;
589
  SQWMsg   qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
D
dapan1121 已提交
590
  if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) {
L
Liu Jicong 已提交
591
    QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));
D
dapan1121 已提交
592 593
  }

S
Shengliang Guan 已提交
594
  QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle);
D
dapan1121 已提交
595 596 597 598 599 600 601

  QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));

  QW_SCH_DLOG("processHb end, node:%p", node);

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
602 603


604 605
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
D
dapan1121 已提交
606 607 608 609 610 611 612 613 614 615 616 617 618 619 620
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t       code = 0;
  SVDeleteReq req = {0};
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;

  QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1);

  tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req);
  
  uint64_t sId = req.sId;
  uint64_t qId = req.queryId;
  uint64_t tId = req.taskId;
  int64_t  rId = 0;
D
dapan1121 已提交
621
  int32_t  eId = -1;
D
dapan1121 已提交
622 623 624 625 626

  SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info};
  QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql);
  taosMemoryFreeClear(req.sql);

627
  QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRes));
D
dapan1121 已提交
628 629 630 631 632 633 634 635 636

  QW_SCH_TASK_DLOG("processDelete end, node:%p", node);

_return:

  QW_RET(code);
}