qwMsg.c 18.3 KB
Newer Older
D
dapan1121 已提交
1
#include "qwMsg.h"
L
Liu Jicong 已提交
2
#include "dataSinkMgt.h"
D
dapan1121 已提交
3 4 5
#include "executor.h"
#include "planner.h"
#include "query.h"
L
Liu Jicong 已提交
6
#include "qworker.h"
D
dapan1121 已提交
7
#include "qwInt.h"
L
Liu Jicong 已提交
8
#include "tcommon.h"
D
dapan1121 已提交
9 10 11 12 13
#include "tmsg.h"
#include "tname.h"

int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
  int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
L
Liu Jicong 已提交
14

D
dapan1121 已提交
15 16 17 18 19 20 21 22 23 24 25 26 27
  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(msgSize);
  if (NULL == pRsp) {
    qError("rpcMallocCont %d failed", msgSize);
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  memset(pRsp, 0, sizeof(SRetrieveTableRsp));

  *rsp = pRsp;

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
28
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) {
D
dapan1121 已提交
29
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
L
Liu Jicong 已提交
30

D
dapan1121 已提交
31
  rsp->useconds = htobe64(input->useconds);
D
dapan1121 已提交
32
  rsp->completed = qComplete;
D
dapan1121 已提交
33 34 35 36
  rsp->precision = input->precision;
  rsp->compressed = input->compressed;
  rsp->compLen = htonl(len);
  rsp->numOfRows = htonl(input->numOfRows);
37
  rsp->numOfCols = htonl(input->numOfCols);
D
dapan1121 已提交
38 39 40
}

void qwFreeFetchRsp(void *msg) {
D
dapan 已提交
41 42 43
  if (msg) {
    rpcFreeCont(msg);
  }
D
dapan1121 已提交
44 45
}

D
dapan1121 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59
int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code) {
  SRpcMsg rpcRsp = {
      .msgType = rspType,
      .pCont = NULL,
      .contLen = 0,
      .code = code,
      .info = *pConn,
  };

  tmsgSendRsp(&rpcRsp);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
60 61 62
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) {
  STbVerInfo* tbInfo = ctx ? &ctx->tbInfo : NULL;
  int64_t affectedRows = ctx ? ctx->affectedRows : 0;
D
dapan1121 已提交
63
  SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
D
dapan1121 已提交
64 65
  pRsp->code = htonl(code);
  pRsp->affectedRows = htobe64(affectedRows);
66 67
  if (tbInfo) {
    strcpy(pRsp->tbFName, tbInfo->tbFName);
D
dapan1121 已提交
68 69
    pRsp->sversion = htonl(tbInfo->sversion);
    pRsp->tversion = htonl(tbInfo->tversion);
70
  }
D
dapan1121 已提交
71 72

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
73
      .msgType = rspType,
L
Liu Jicong 已提交
74 75 76
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
77
      .info = *pConn,
D
dapan1121 已提交
78 79
  };

S
shm  
Shengliang Guan 已提交
80
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
81 82 83 84

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
85
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num) {
D
dapan1121 已提交
86 87 88
  SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo};

  int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
dengyihao's avatar
dengyihao 已提交
89
  void *  pRsp = rpcMallocCont(contLen);
D
dapan1121 已提交
90 91 92
  tSerializeSExplainRsp(pRsp, contLen, &rsp);

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
93
      .msgType = TDMT_SCH_EXPLAIN_RSP,
L
Liu Jicong 已提交
94 95 96
      .pCont = pRsp,
      .contLen = contLen,
      .code = 0,
97
      .info = *pConn,
D
dapan1121 已提交
98
  };
99
  rpcRsp.info.ahandle = NULL;
D
dapan1121 已提交
100 101 102 103 104 105

  tmsgSendRsp(&rpcRsp);

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
106
int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
D
dapan1121 已提交
107
  int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
dengyihao's avatar
dengyihao 已提交
108
  void *  pRsp = rpcMallocCont(contLen);
D
dapan1121 已提交
109
  tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
D
dapan1121 已提交
110 111

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
112
      .msgType = TDMT_SCH_QUERY_HEARTBEAT_RSP,
L
Liu Jicong 已提交
113
      .contLen = contLen,
114
      .pCont = pRsp,
L
Liu Jicong 已提交
115
      .code = code,
116
      .info = *pConn,
D
dapan1121 已提交
117 118
  };

S
shm  
Shengliang Guan 已提交
119
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
120 121 122 123

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
124
int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
D
dapan1121 已提交
125 126 127 128 129 130 131
  if (NULL == pRsp) {
    pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRsp, 0, sizeof(SRetrieveTableRsp));
    dataLength = 0;
  }

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
132
      .msgType = rspType,
L
Liu Jicong 已提交
133 134 135
      .pCont = pRsp,
      .contLen = sizeof(*pRsp) + dataLength,
      .code = code,
136
      .info = *pConn,
D
dapan1121 已提交
137 138
  };

S
shm  
Shengliang Guan 已提交
139
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
140 141 142 143

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
144
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
D
dapan1121 已提交
145 146 147 148
  STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
149
      .msgType = TDMT_SCH_CANCEL_TASK_RSP,
L
Liu Jicong 已提交
150 151 152
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
153
      .info = *pConn,
D
dapan1121 已提交
154 155
  };

S
shm  
Shengliang Guan 已提交
156
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
157 158 159
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
160
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
D
dapan1121 已提交
161 162 163 164
  STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
165
      .msgType = TDMT_SCH_DROP_TASK_RSP,
L
Liu Jicong 已提交
166 167 168
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
169
      .info = *pConn,
D
dapan1121 已提交
170 171
  };

S
shm  
Shengliang Guan 已提交
172
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
173 174 175
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
  STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  req->header.vgId = mgmt->nodeId;
  req->sId = sId;
  req->queryId = qId;
  req->taskId = tId;
  req->refId = rId;
  req->execId = eId;

  SRpcMsg pNewMsg = {
      .msgType = TDMT_SCH_DROP_TASK,
      .pCont = req,
      .contLen = sizeof(STaskDropReq),
      .code = 0,
      .info = *pConn,
  };

  int32_t code = tmsgPutToQueue(&mgmt->msgCb, FETCH_QUEUE, &pNewMsg);
  if (TSDB_CODE_SUCCESS != code) {
    QW_SCH_TASK_ELOG("put drop task msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
    rpcFreeCont(req);
    QW_ERR_RET(code);
  }

  QW_SCH_TASK_DLOG("drop task msg put to queue, vgId:%d", mgmt->nodeId);

  return TSDB_CODE_SUCCESS;
}


S
Shengliang Guan 已提交
211
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
L
Liu Jicong 已提交
212
  SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
D
dapan1121 已提交
213 214 215 216 217 218 219 220 221
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  req->header.vgId = mgmt->nodeId;
  req->sId = sId;
  req->queryId = qId;
  req->taskId = tId;
D
dapan1121 已提交
222
  req->execId = eId;
D
dapan1121 已提交
223 224

  SRpcMsg pNewMsg = {
D
dapan1121 已提交
225
      .msgType = TDMT_SCH_QUERY_CONTINUE,
L
Liu Jicong 已提交
226 227 228
      .pCont = req,
      .contLen = sizeof(SQueryContinueReq),
      .code = 0,
229
      .info = *pConn,
D
dapan1121 已提交
230 231
  };

S
Shengliang Guan 已提交
232
  int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg);
D
dapan1121 已提交
233
  if (TSDB_CODE_SUCCESS != code) {
H
Haojun Liao 已提交
234
    QW_SCH_TASK_ELOG("put query continue msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
D
dapan1121 已提交
235 236 237 238
    rpcFreeCont(req);
    QW_ERR_RET(code);
  }

D
dapan1121 已提交
239
  QW_SCH_TASK_DLOG("query continue msg put to queue, vgId:%d", mgmt->nodeId);
D
dapan1121 已提交
240 241 242 243

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
244
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
L
Liu Jicong 已提交
245
  STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
D
dapan1121 已提交
246 247 248
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
L
Liu Jicong 已提交
249
  }
D
dapan1121 已提交
250

D
dapan1121 已提交
251 252 253 254 255
  req->header.vgId = htonl(mgmt->nodeId);
  req->sId = htobe64(sId);
  req->queryId = htobe64(qId);
  req->taskId = htobe64(tId);
  req->refId = htobe64(rId);
L
Liu Jicong 已提交
256

S
Shengliang Guan 已提交
257
  SRpcMsg brokenMsg = {
D
dapan1121 已提交
258
      .msgType = TDMT_SCH_DROP_TASK,
L
Liu Jicong 已提交
259 260
      .pCont = req,
      .contLen = sizeof(STaskDropReq),
D
dapan1121 已提交
261
      .code = TSDB_CODE_RPC_BROKEN_LINK,
262
      .info = *pConn,
D
dapan1121 已提交
263
  };
L
Liu Jicong 已提交
264

S
Shengliang Guan 已提交
265
  tmsgRegisterBrokenLinkArg(&brokenMsg);
D
dapan1121 已提交
266 267 268 269

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
270
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn) {
D
dapan1121 已提交
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
  SSchedulerHbReq req = {0};
  req.header.vgId = mgmt->nodeId;
  req.sId = sId;

  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
  if (msgSize < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  void *msg = rpcMallocCont(msgSize);
  if (NULL == msg) {
    QW_SCH_ELOG("calloc %d failed", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    taosMemoryFree(msg);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
290

S
Shengliang Guan 已提交
291
  SRpcMsg brokenMsg = {
D
dapan1121 已提交
292
      .msgType = TDMT_SCH_QUERY_HEARTBEAT,
L
Liu Jicong 已提交
293 294
      .pCont = msg,
      .contLen = msgSize,
D
dapan1121 已提交
295
      .code = TSDB_CODE_RPC_BROKEN_LINK,
296
      .info = *pConn,
D
dapan1121 已提交
297
  };
L
Liu Jicong 已提交
298

S
Shengliang Guan 已提交
299
  tmsgRegisterBrokenLinkArg(&brokenMsg);
D
dapan1121 已提交
300 301 302 303

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t       code = 0;
  SSubQueryMsg *msg = pMsg->pCont;
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;

  if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
    QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
322
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
323 324 325 326 327 328 329
  msg->phyLen = ntohl(msg->phyLen);
  msg->sqlLen = ntohl(msg->sqlLen);

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;
D
dapan1121 已提交
330
  int32_t  eId = msg->execId;
D
dapan1121 已提交
331

D
dapan1121 已提交
332
  SQWMsg qwMsg = {.msgType = pMsg->msgType, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
D
dapan1121 已提交
333 334

  QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p", pMsg->info.handle);
D
dapan1121 已提交
335
  QW_ERR_RET(qwPreprocessQuery(QW_FPARAMS(), &qwMsg));
D
dapan1121 已提交
336 337 338 339 340
  QW_SCH_TASK_DLOG("prerocessQuery end, handle:%p", pMsg->info.handle);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
341 342 343 344 345 346 347 348 349 350 351 352
int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SSubQueryMsg *msg = pMsg->pCont;
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;
D
dapan1121 已提交
353
  int32_t  eId = msg->execId;
D
dapan1121 已提交
354 355 356 357 358 359 360 361

  QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle);
  qwAbortPrerocessQuery(QW_FPARAMS());
  QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
362
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
363 364 365 366
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

L
Liu Jicong 已提交
367
  int32_t       code = 0;
D
dapan1121 已提交
368
  SSubQueryMsg *msg = pMsg->pCont;
dengyihao's avatar
dengyihao 已提交
369
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
370

D
dapan1121 已提交
371 372
  qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1);
D
dapan1121 已提交
373

D
dapan1121 已提交
374
  if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
D
dapan1121 已提交
375
    QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
376
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
377 378 379 380 381
  }

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
382
  int64_t  rId = msg->refId;
D
dapan1121 已提交
383
  int32_t  eId = msg->execId;
D
dapan1121 已提交
384

D
dapan1121 已提交
385
  SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info, .msgType = pMsg->msgType};
D
dapan1121 已提交
386 387 388 389
  qwMsg.msgInfo.explain = msg->explain;
  qwMsg.msgInfo.taskType = msg->taskType;
  qwMsg.msgInfo.needFetch = msg->needFetch;
  
dengyihao's avatar
dengyihao 已提交
390
  char * sql = strndup(msg->msg, msg->sqlLen);
D
dapan1121 已提交
391
  QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql);
D
dapan1121 已提交
392
  QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql));
D
dapan1121 已提交
393
  QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
394

L
Liu Jicong 已提交
395
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
396 397
}

D
dapan1121 已提交
398
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
L
Liu Jicong 已提交
399 400 401
  int32_t            code = 0;
  int8_t             status = 0;
  bool               queryDone = false;
402
  SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
L
Liu Jicong 已提交
403
  bool               needStop = false;
dengyihao's avatar
dengyihao 已提交
404 405
  SQWTaskCtx *       handles = NULL;
  SQWorker *         mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
406

D
dapan1121 已提交
407 408
  qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1);
D
dapan1121 已提交
409

D
dapan1121 已提交
410
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
411
    QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
412
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
413 414
  }

415 416 417
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
L
Liu Jicong 已提交
418
  int64_t  rId = 0;
D
dapan1121 已提交
419
  int32_t  eId = msg->execId;
D
dapan1121 已提交
420

421
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
D
dapan1121 已提交
422

S
Shengliang Guan 已提交
423
  QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->info.handle);
424 425 426

  QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
427
  QW_SCH_TASK_DLOG("processCQuery end, node:%p", node);
428

L
Liu Jicong 已提交
429
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
430 431
}

D
dapan1121 已提交
432
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
433 434 435 436 437
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  SResFetchReq *msg = pMsg->pCont;
dengyihao's avatar
dengyihao 已提交
438
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
439

D
dapan1121 已提交
440 441
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1);
D
dapan1121 已提交
442

D
dapan1121 已提交
443
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
L
Liu Jicong 已提交
444
    QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
445
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
446
  }
D
dapan1121 已提交
447

D
dapan1121 已提交
448 449 450
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
451
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
452 453 454 455

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
L
Liu Jicong 已提交
456
  int64_t  rId = 0;
D
dapan1121 已提交
457
  int32_t  eId = msg->execId;
D
dapan1121 已提交
458

D
dapan1121 已提交
459
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info, .msgType = pMsg->msgType};
D
dapan1121 已提交
460

S
Shengliang Guan 已提交
461
  QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
462 463 464

  QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
465
  QW_SCH_TASK_DLOG("processFetch end, node:%p", node);
466

L
Liu Jicong 已提交
467
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
468 469
}

D
dapan1121 已提交
470
int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
471
  SQWorker *      mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
472 473
  if (mgmt) {
    qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
D
dapan1121 已提交
474
    QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1);
D
dapan1121 已提交
475
  }
D
dapan1121 已提交
476

D
dapan1121 已提交
477
  qProcessRspMsg(NULL, pMsg, NULL);
478
  pMsg->pCont = NULL;
S
Shengliang 已提交
479 480 481
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
482
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
483 484 485 486
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

dengyihao's avatar
dengyihao 已提交
487
  SQWorker *      mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
488
  int32_t         code = 0;
D
dapan1121 已提交
489
  STaskCancelReq *msg = pMsg->pCont;
D
dapan1121 已提交
490

D
dapan1121 已提交
491 492
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1);
D
dapan1121 已提交
493

D
dapan1121 已提交
494
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
L
Liu Jicong 已提交
495
    qError("invalid task cancel msg");
D
dapan1121 已提交
496
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
497
  }
D
dapan1121 已提交
498

D
dapan1121 已提交
499 500 501 502
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
503
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
504

D
dapan1121 已提交
505 506 507 508
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;
D
dapan1121 已提交
509
  int32_t  eId = msg->execId;
D
dapan1121 已提交
510

511
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
D
dapan1121 已提交
512

L
Liu Jicong 已提交
513
  // QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
D
dapan1121 已提交
514 515 516

_return:

S
shm  
Shengliang Guan 已提交
517
  QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code));
D
dapan1121 已提交
518
  QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
519 520 521 522

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
523
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
524 525 526 527
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

L
Liu Jicong 已提交
528
  int32_t       code = 0;
D
dapan1121 已提交
529
  STaskDropReq *msg = pMsg->pCont;
dengyihao's avatar
dengyihao 已提交
530
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
531

D
dapan1121 已提交
532 533
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1);
D
dapan1121 已提交
534

D
dapan1121 已提交
535
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
536
    QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
537
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
538
  }
D
dapan1121 已提交
539

D
dapan1121 已提交
540 541 542
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
543
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
544
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
545 546 547 548

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
549
  int64_t  rId = msg->refId;
D
dapan1121 已提交
550
  int32_t  eId = msg->execId;
D
dapan1121 已提交
551

552
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
D
dapan1121 已提交
553

D
dapan1121 已提交
554
  if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) {
L
Liu Jicong 已提交
555
    QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));
D
dapan1121 已提交
556
  }
D
dapan1121 已提交
557

S
Shengliang Guan 已提交
558
  QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle);
559 560 561

  QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
562
  QW_SCH_TASK_DLOG("processDrop end, node:%p", node);
563 564

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
565 566
}

D
dapan1121 已提交
567
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
568 569 570 571
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

L
Liu Jicong 已提交
572
  int32_t         code = 0;
D
dapan1121 已提交
573
  SSchedulerHbReq req = {0};
dengyihao's avatar
dengyihao 已提交
574
  SQWorker *      mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
575

D
dapan1121 已提交
576 577
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1);
D
dapan1121 已提交
578

D
dapan1121 已提交
579 580 581
  if (NULL == pMsg->pCont) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
582
  }
D
dapan1121 已提交
583 584 585 586 587 588 589 590

  if (tDeserializeSSchedulerHbReq(pMsg->pCont, pMsg->contLen, &req)) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    tFreeSSchedulerHbReq(&req);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  uint64_t sId = req.sId;
591
  SQWMsg   qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
D
dapan1121 已提交
592
  if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) {
L
Liu Jicong 已提交
593
    QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));
D
dapan1121 已提交
594 595
  }

S
Shengliang Guan 已提交
596
  QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle);
D
dapan1121 已提交
597 598 599 600 601 602 603

  QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));

  QW_SCH_DLOG("processHb end, node:%p", node);

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
604 605


606 607
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
D
dapan1121 已提交
608 609 610 611 612 613 614 615 616 617 618 619 620 621 622
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t       code = 0;
  SVDeleteReq req = {0};
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;

  QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1);

  tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req);
  
  uint64_t sId = req.sId;
  uint64_t qId = req.queryId;
  uint64_t tId = req.taskId;
  int64_t  rId = 0;
D
dapan1121 已提交
623
  int32_t  eId = -1;
D
dapan1121 已提交
624 625 626 627 628

  SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info};
  QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql);
  taosMemoryFreeClear(req.sql);

629
  QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRes));
D
dapan1121 已提交
630 631 632 633 634 635 636 637 638

  QW_SCH_TASK_DLOG("processDelete end, node:%p", node);

_return:

  QW_RET(code);
}