qwMsg.c 19.0 KB
Newer Older
D
dapan1121 已提交
1
#include "qwMsg.h"
L
Liu Jicong 已提交
2
#include "dataSinkMgt.h"
D
dapan1121 已提交
3 4 5
#include "executor.h"
#include "planner.h"
#include "query.h"
D
dapan1121 已提交
6
#include "qwInt.h"
H
Hongze Cheng 已提交
7
#include "qworker.h"
L
Liu Jicong 已提交
8
#include "tcommon.h"
D
dapan1121 已提交
9 10
#include "tmsg.h"
#include "tname.h"
11
#include "tgrant.h"
D
dapan1121 已提交
12

D
dapan1121 已提交
13
int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp) {
D
dapan1121 已提交
14
  int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
L
Liu Jicong 已提交
15

H
Hongze Cheng 已提交
16 17
  SRetrieveTableRsp *pRsp =
      (SRetrieveTableRsp *)(rpcMalloc ? rpcReallocCont(*rsp, msgSize) : taosMemoryRealloc(*rsp, msgSize));
D
dapan1121 已提交
18 19 20 21 22
  if (NULL == pRsp) {
    qError("rpcMallocCont %d failed", msgSize);
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

23 24 25
  if (NULL == *rsp) {
    memset(pRsp, 0, sizeof(SRetrieveTableRsp));
  }
H
Hongze Cheng 已提交
26

D
dapan1121 已提交
27 28 29 30 31
  *rsp = pRsp;

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
32
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) {
D
dapan1121 已提交
33
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
L
Liu Jicong 已提交
34

D
dapan1121 已提交
35
  rsp->useconds = htobe64(input->useconds);
D
dapan1121 已提交
36
  rsp->completed = qComplete;
D
dapan1121 已提交
37 38 39 40
  rsp->precision = input->precision;
  rsp->compressed = input->compressed;
  rsp->compLen = htonl(len);
  rsp->numOfRows = htonl(input->numOfRows);
41
  rsp->numOfCols = htonl(input->numOfCols);
42
  rsp->numOfBlocks = htonl(input->numOfBlocks);
D
dapan1121 已提交
43 44 45
}

void qwFreeFetchRsp(void *msg) {
D
dapan 已提交
46 47 48
  if (msg) {
    rpcFreeCont(msg);
  }
D
dapan1121 已提交
49 50
}

D
dapan1121 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64
int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code) {
  SRpcMsg rpcRsp = {
      .msgType = rspType,
      .pCont = NULL,
      .contLen = 0,
      .code = code,
      .info = *pConn,
  };

  tmsgSendRsp(&rpcRsp);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
65
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) {
H
Hongze Cheng 已提交
66 67
  STbVerInfo     *tbInfo = ctx ? &ctx->tbInfo : NULL;
  int64_t         affectedRows = ctx ? ctx->affectedRows : 0;
D
dapan1121 已提交
68
  SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
D
dapan1121 已提交
69 70
  pRsp->code = htonl(code);
  pRsp->affectedRows = htobe64(affectedRows);
71 72
  if (tbInfo) {
    strcpy(pRsp->tbFName, tbInfo->tbFName);
D
dapan1121 已提交
73 74
    pRsp->sversion = htonl(tbInfo->sversion);
    pRsp->tversion = htonl(tbInfo->tversion);
75
  }
D
dapan1121 已提交
76 77

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
78
      .msgType = rspType,
L
Liu Jicong 已提交
79 80 81
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
82
      .info = *pConn,
D
dapan1121 已提交
83 84
  };

S
shm  
Shengliang Guan 已提交
85
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
86 87 88 89

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
90 91 92
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray *pExecList) {
  SExplainExecInfo *pInfo = taosArrayGet(pExecList, 0);
  SExplainRsp       rsp = {.numOfPlans = taosArrayGetSize(pExecList), .subplanInfo = pInfo};
D
dapan1121 已提交
93 94

  int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
H
Hongze Cheng 已提交
95
  void   *pRsp = rpcMallocCont(contLen);
D
dapan1121 已提交
96 97 98
  tSerializeSExplainRsp(pRsp, contLen, &rsp);

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
99
      .msgType = TDMT_SCH_EXPLAIN_RSP,
L
Liu Jicong 已提交
100 101 102
      .pCont = pRsp,
      .contLen = contLen,
      .code = 0,
103
      .info = *pConn,
D
dapan1121 已提交
104 105
  };

H
Haojun Liao 已提交
106
  rpcRsp.info.ahandle = NULL;
D
dapan1121 已提交
107 108 109 110
  tmsgSendRsp(&rpcRsp);
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
111
int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
D
dapan1121 已提交
112
  int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
H
Hongze Cheng 已提交
113
  void   *pRsp = rpcMallocCont(contLen);
D
dapan1121 已提交
114
  tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
D
dapan1121 已提交
115 116

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
117
      .msgType = TDMT_SCH_QUERY_HEARTBEAT_RSP,
L
Liu Jicong 已提交
118
      .contLen = contLen,
119
      .pCont = pRsp,
L
Liu Jicong 已提交
120
      .code = code,
121
      .info = *pConn,
D
dapan1121 已提交
122 123
  };

S
shm  
Shengliang Guan 已提交
124
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
125 126 127 128

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
129 130
int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength,
                               int32_t code) {
D
dapan1121 已提交
131 132 133 134 135 136 137
  if (NULL == pRsp) {
    pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRsp, 0, sizeof(SRetrieveTableRsp));
    dataLength = 0;
  }

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
138
      .msgType = rspType,
L
Liu Jicong 已提交
139 140 141
      .pCont = pRsp,
      .contLen = sizeof(*pRsp) + dataLength,
      .code = code,
142
      .info = *pConn,
D
dapan1121 已提交
143 144
  };

S
shm  
Shengliang Guan 已提交
145
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
146 147 148 149

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
150
#if 0
S
Shengliang Guan 已提交
151
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
D
dapan1121 已提交
152 153 154 155
  STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
156
      .msgType = TDMT_SCH_CANCEL_TASK_RSP,
L
Liu Jicong 已提交
157 158 159
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
160
      .info = *pConn,
D
dapan1121 已提交
161 162
  };

S
shm  
Shengliang Guan 已提交
163
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
164 165 166
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
167
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
D
dapan1121 已提交
168 169 170 171
  STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
172
      .msgType = TDMT_SCH_DROP_TASK_RSP,
L
Liu Jicong 已提交
173 174 175
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
176
      .info = *pConn,
D
dapan1121 已提交
177 178
  };

S
shm  
Shengliang Guan 已提交
179
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
180 181
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
182
#endif
D
dapan1121 已提交
183

D
dapan1121 已提交
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
  STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  req->header.vgId = mgmt->nodeId;
  req->sId = sId;
  req->queryId = qId;
  req->taskId = tId;
  req->refId = rId;
  req->execId = eId;

  SRpcMsg pNewMsg = {
      .msgType = TDMT_SCH_DROP_TASK,
      .pCont = req,
      .contLen = sizeof(STaskDropReq),
      .code = 0,
      .info = *pConn,
  };

  int32_t code = tmsgPutToQueue(&mgmt->msgCb, FETCH_QUEUE, &pNewMsg);
  if (TSDB_CODE_SUCCESS != code) {
    QW_SCH_TASK_ELOG("put drop task msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
    QW_ERR_RET(code);
  }

  QW_SCH_TASK_DLOG("drop task msg put to queue, vgId:%d", mgmt->nodeId);

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
217
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
L
Liu Jicong 已提交
218
  SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
D
dapan1121 已提交
219 220 221 222 223 224 225 226 227
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  req->header.vgId = mgmt->nodeId;
  req->sId = sId;
  req->queryId = qId;
  req->taskId = tId;
D
dapan1121 已提交
228
  req->execId = eId;
D
dapan1121 已提交
229 230

  SRpcMsg pNewMsg = {
D
dapan1121 已提交
231
      .msgType = TDMT_SCH_QUERY_CONTINUE,
L
Liu Jicong 已提交
232 233 234
      .pCont = req,
      .contLen = sizeof(SQueryContinueReq),
      .code = 0,
235
      .info = *pConn,
D
dapan1121 已提交
236 237
  };

S
Shengliang Guan 已提交
238
  int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg);
D
dapan1121 已提交
239
  if (TSDB_CODE_SUCCESS != code) {
H
Haojun Liao 已提交
240
    QW_SCH_TASK_ELOG("put query continue msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
D
dapan1121 已提交
241 242 243
    QW_ERR_RET(code);
  }

D
dapan1121 已提交
244
  QW_SCH_TASK_DLOG("query continue msg put to queue, vgId:%d", mgmt->nodeId);
D
dapan1121 已提交
245 246 247 248

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
249
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
L
Liu Jicong 已提交
250
  STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
D
dapan1121 已提交
251 252 253
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
L
Liu Jicong 已提交
254
  }
D
dapan1121 已提交
255

D
dapan1121 已提交
256 257 258 259 260
  req->header.vgId = htonl(mgmt->nodeId);
  req->sId = htobe64(sId);
  req->queryId = htobe64(qId);
  req->taskId = htobe64(tId);
  req->refId = htobe64(rId);
L
Liu Jicong 已提交
261

S
Shengliang Guan 已提交
262
  SRpcMsg brokenMsg = {
D
dapan1121 已提交
263
      .msgType = TDMT_SCH_DROP_TASK,
L
Liu Jicong 已提交
264 265
      .pCont = req,
      .contLen = sizeof(STaskDropReq),
D
dapan1121 已提交
266
      .code = TSDB_CODE_RPC_BROKEN_LINK,
267
      .info = *pConn,
D
dapan1121 已提交
268
  };
L
Liu Jicong 已提交
269

S
Shengliang Guan 已提交
270
  tmsgRegisterBrokenLinkArg(&brokenMsg);
D
dapan1121 已提交
271 272 273 274

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
275
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn) {
D
dapan1121 已提交
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
  SSchedulerHbReq req = {0};
  req.header.vgId = mgmt->nodeId;
  req.sId = sId;

  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
  if (msgSize < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  void *msg = rpcMallocCont(msgSize);
  if (NULL == msg) {
    QW_SCH_ELOG("calloc %d failed", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    taosMemoryFree(msg);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
295

S
Shengliang Guan 已提交
296
  SRpcMsg brokenMsg = {
D
dapan1121 已提交
297
      .msgType = TDMT_SCH_QUERY_HEARTBEAT,
L
Liu Jicong 已提交
298 299
      .pCont = msg,
      .contLen = msgSize,
D
dapan1121 已提交
300
      .code = TSDB_CODE_RPC_BROKEN_LINK,
301
      .info = *pConn,
D
dapan1121 已提交
302
  };
L
Liu Jicong 已提交
303

S
Shengliang Guan 已提交
304
  tmsgRegisterBrokenLinkArg(&brokenMsg);
D
dapan1121 已提交
305 306 307 308

  return TSDB_CODE_SUCCESS;
}

309
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGrant) {
D
dapan1121 已提交
310 311 312 313 314 315
  if (NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t       code = 0;
  SSubQueryMsg *msg = pMsg->pCont;
H
Hongze Cheng 已提交
316
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
317 318 319 320 321 322 323 324 325 326

  if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
    QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
327
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
328 329
  msg->phyLen = ntohl(msg->phyLen);
  msg->sqlLen = ntohl(msg->sqlLen);
330 331 332 333 334 335
  msg->msgMask = ntohl(msg->msgMask);

  if (chkGrant && (!TEST_SHOW_REWRITE_MASK(msg->msgMask)) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) {
    QW_ELOG("query failed cause of grant expired, msgMask:%d", msg->msgMask);
    QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED);
  }
D
dapan1121 已提交
336 337 338 339 340

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;
D
dapan1121 已提交
341
  int32_t  eId = msg->execId;
D
dapan1121 已提交
342

H
Hongze Cheng 已提交
343 344
  SQWMsg qwMsg = {
      .msgType = pMsg->msgType, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
D
dapan1121 已提交
345 346

  QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p", pMsg->info.handle);
D
dapan1121 已提交
347
  QW_ERR_RET(qwPreprocessQuery(QW_FPARAMS(), &qwMsg));
D
dapan1121 已提交
348 349 350 351 352
  QW_SCH_TASK_DLOG("prerocessQuery end, handle:%p", pMsg->info.handle);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
353 354 355 356 357 358
int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SSubQueryMsg *msg = pMsg->pCont;
H
Hongze Cheng 已提交
359
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
360 361 362 363 364

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;
D
dapan1121 已提交
365
  int32_t  eId = msg->execId;
D
dapan1121 已提交
366 367 368 369 370 371 372 373

  QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle);
  qwAbortPrerocessQuery(QW_FPARAMS());
  QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
374
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
375 376 377 378
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

L
Liu Jicong 已提交
379
  int32_t       code = 0;
D
dapan1121 已提交
380
  SSubQueryMsg *msg = pMsg->pCont;
H
Hongze Cheng 已提交
381
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
382

D
dapan1121 已提交
383 384
  qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1);
D
dapan1121 已提交
385

D
dapan1121 已提交
386
  if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
D
dapan1121 已提交
387
    QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
388
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
389 390 391 392 393
  }

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
394
  int64_t  rId = msg->refId;
D
dapan1121 已提交
395
  int32_t  eId = msg->execId;
D
dapan1121 已提交
396

H
Hongze Cheng 已提交
397 398 399 400 401
  SQWMsg qwMsg = {.node = node,
                  .msg = msg->msg + msg->sqlLen,
                  .msgLen = msg->phyLen,
                  .connInfo = pMsg->info,
                  .msgType = pMsg->msgType};
D
dapan1121 已提交
402 403 404
  qwMsg.msgInfo.explain = msg->explain;
  qwMsg.msgInfo.taskType = msg->taskType;
  qwMsg.msgInfo.needFetch = msg->needFetch;
H
Hongze Cheng 已提交
405 406 407 408

  char *sql = strndup(msg->msg, msg->sqlLen);
  QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType),
                   pMsg->info.handle, sql);
D
dapan1121 已提交
409
  QW_ERR_JRET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql));
410

D
dapan1121 已提交
411 412
_return:

D
dapan1121 已提交
413 414
  QW_SCH_TASK_DLOG("processQuery end, node:%p, code:%d", node, code);

D
dapan1121 已提交
415
  return code;
D
dapan1121 已提交
416 417
}

D
dapan1121 已提交
418
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
L
Liu Jicong 已提交
419 420 421
  int32_t            code = 0;
  int8_t             status = 0;
  bool               queryDone = false;
422
  SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
L
Liu Jicong 已提交
423
  bool               needStop = false;
H
Hongze Cheng 已提交
424 425
  SQWTaskCtx        *handles = NULL;
  SQWorker          *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
426

D
dapan1121 已提交
427 428
  qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1);
D
dapan1121 已提交
429

D
dapan1121 已提交
430
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
431
    QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
432
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
433 434
  }

435 436 437
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
L
Liu Jicong 已提交
438
  int64_t  rId = 0;
D
dapan1121 已提交
439
  int32_t  eId = msg->execId;
D
dapan1121 已提交
440

441
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
D
dapan1121 已提交
442

S
Shengliang Guan 已提交
443
  QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->info.handle);
444 445 446

  QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
447
  QW_SCH_TASK_DLOG("processCQuery end, node:%p", node);
448

L
Liu Jicong 已提交
449
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
450 451
}

D
dapan1121 已提交
452
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
453 454 455 456 457
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  SResFetchReq *msg = pMsg->pCont;
H
Hongze Cheng 已提交
458
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
459

D
dapan1121 已提交
460 461
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1);
D
dapan1121 已提交
462

D
dapan1121 已提交
463
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
L
Liu Jicong 已提交
464
    QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
465
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
466
  }
D
dapan1121 已提交
467

D
dapan1121 已提交
468 469 470
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
471
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
472 473 474 475

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
L
Liu Jicong 已提交
476
  int64_t  rId = 0;
D
dapan1121 已提交
477
  int32_t  eId = msg->execId;
D
dapan1121 已提交
478

D
dapan1121 已提交
479
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info, .msgType = pMsg->msgType};
D
dapan1121 已提交
480

S
Shengliang Guan 已提交
481
  QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
482 483 484

  QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
485
  QW_SCH_TASK_DLOG("processFetch end, node:%p", node);
486

L
Liu Jicong 已提交
487
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
488 489
}

D
dapan1121 已提交
490
int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
H
Hongze Cheng 已提交
491
  SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
492 493
  if (mgmt) {
    qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
D
dapan1121 已提交
494
    QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1);
D
dapan1121 已提交
495
  }
D
dapan1121 已提交
496

D
dapan1121 已提交
497
  qProcessRspMsg(NULL, pMsg, NULL);
498
  pMsg->pCont = NULL;
S
Shengliang 已提交
499 500 501
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
502
#if 0
D
dapan1121 已提交
503
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
504 505 506 507
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

H
Hongze Cheng 已提交
508
  SQWorker       *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
509
  int32_t         code = 0;
D
dapan1121 已提交
510
  STaskCancelReq *msg = pMsg->pCont;
D
dapan1121 已提交
511

D
dapan1121 已提交
512 513
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1);
D
dapan1121 已提交
514

D
dapan1121 已提交
515
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
L
Liu Jicong 已提交
516
    qError("invalid task cancel msg");
D
dapan1121 已提交
517
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
518
  }
D
dapan1121 已提交
519

D
dapan1121 已提交
520 521 522 523
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
524
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
525

D
dapan1121 已提交
526 527 528 529
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;
D
dapan1121 已提交
530
  int32_t  eId = msg->execId;
D
dapan1121 已提交
531

532
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
D
dapan1121 已提交
533

L
Liu Jicong 已提交
534
  // QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
D
dapan1121 已提交
535 536 537

_return:

S
shm  
Shengliang Guan 已提交
538
  QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code));
D
dapan1121 已提交
539
  QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
540 541 542

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
543
#endif
D
dapan1121 已提交
544

D
dapan1121 已提交
545
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
546 547 548 549
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

L
Liu Jicong 已提交
550
  int32_t       code = 0;
D
dapan1121 已提交
551
  STaskDropReq *msg = pMsg->pCont;
H
Hongze Cheng 已提交
552
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
553

D
dapan1121 已提交
554 555
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1);
D
dapan1121 已提交
556

D
dapan1121 已提交
557
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
558
    QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
559
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
560
  }
D
dapan1121 已提交
561

D
dapan1121 已提交
562 563 564
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
565
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
566
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
567 568 569 570

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
571
  int64_t  rId = msg->refId;
D
dapan1121 已提交
572
  int32_t  eId = msg->execId;
D
dapan1121 已提交
573

574
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
D
dapan1121 已提交
575

D
dapan1121 已提交
576
  if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) {
L
Liu Jicong 已提交
577
    QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));
D
dapan1121 已提交
578
  }
D
dapan1121 已提交
579

S
Shengliang Guan 已提交
580
  QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle);
581 582 583

  QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
584
  QW_SCH_TASK_DLOG("processDrop end, node:%p", node);
585 586

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
587 588
}

D
dapan1121 已提交
589
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
590 591 592 593
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

L
Liu Jicong 已提交
594
  int32_t         code = 0;
D
dapan1121 已提交
595
  SSchedulerHbReq req = {0};
H
Hongze Cheng 已提交
596
  SQWorker       *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
597

D
dapan1121 已提交
598 599
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1);
D
dapan1121 已提交
600

D
dapan1121 已提交
601 602 603
  if (NULL == pMsg->pCont) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
604
  }
D
dapan1121 已提交
605 606 607 608 609 610 611 612

  if (tDeserializeSSchedulerHbReq(pMsg->pCont, pMsg->contLen, &req)) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    tFreeSSchedulerHbReq(&req);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  uint64_t sId = req.sId;
613
  SQWMsg   qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
D
dapan1121 已提交
614
  if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) {
L
Liu Jicong 已提交
615
    QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));
D
dapan1121 已提交
616 617
  }

S
Shengliang Guan 已提交
618
  QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle);
D
dapan1121 已提交
619 620 621 622 623 624 625

  QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));

  QW_SCH_DLOG("processHb end, node:%p", node);

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
626

627 628
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
D
dapan1121 已提交
629 630 631
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

H
Hongze Cheng 已提交
632
  int32_t     code = 0;
D
dapan1121 已提交
633
  SVDeleteReq req = {0};
H
Hongze Cheng 已提交
634
  SQWorker   *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
635 636 637 638

  QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1);

  tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req);
H
Hongze Cheng 已提交
639

D
dapan1121 已提交
640 641 642 643
  uint64_t sId = req.sId;
  uint64_t qId = req.queryId;
  uint64_t tId = req.taskId;
  int64_t  rId = 0;
D
dapan1121 已提交
644
  int32_t  eId = -1;
D
dapan1121 已提交
645 646 647 648 649

  SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info};
  QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql);
  taosMemoryFreeClear(req.sql);

650
  QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRes));
D
dapan1121 已提交
651

652
  taosMemoryFreeClear(req.msg);
D
dapan1121 已提交
653 654 655 656 657 658
  QW_SCH_TASK_DLOG("processDelete end, node:%p", node);

_return:

  QW_RET(code);
}