qwMsg.c 19.9 KB
Newer Older
D
dapan1121 已提交
1
#include "qwMsg.h"
L
Liu Jicong 已提交
2
#include "dataSinkMgt.h"
D
dapan1121 已提交
3 4 5
#include "executor.h"
#include "planner.h"
#include "query.h"
D
dapan1121 已提交
6
#include "qwInt.h"
H
Hongze Cheng 已提交
7
#include "qworker.h"
L
Liu Jicong 已提交
8
#include "tcommon.h"
D
dapan1121 已提交
9 10
#include "tmsg.h"
#include "tname.h"
11
#include "tgrant.h"
D
dapan1121 已提交
12

D
dapan1121 已提交
13
int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp) {
D
dapan1121 已提交
14
  int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
L
Liu Jicong 已提交
15

H
Hongze Cheng 已提交
16 17
  SRetrieveTableRsp *pRsp =
      (SRetrieveTableRsp *)(rpcMalloc ? rpcReallocCont(*rsp, msgSize) : taosMemoryRealloc(*rsp, msgSize));
D
dapan1121 已提交
18 19 20 21 22
  if (NULL == pRsp) {
    qError("rpcMallocCont %d failed", msgSize);
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

23 24 25
  if (NULL == *rsp) {
    memset(pRsp, 0, sizeof(SRetrieveTableRsp));
  }
H
Hongze Cheng 已提交
26

D
dapan1121 已提交
27 28 29 30 31
  *rsp = pRsp;

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
32
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) {
D
dapan1121 已提交
33
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
L
Liu Jicong 已提交
34

D
dapan1121 已提交
35
  rsp->useconds = htobe64(input->useconds);
D
dapan1121 已提交
36
  rsp->completed = qComplete;
D
dapan1121 已提交
37 38 39 40
  rsp->precision = input->precision;
  rsp->compressed = input->compressed;
  rsp->compLen = htonl(len);
  rsp->numOfRows = htonl(input->numOfRows);
41
  rsp->numOfCols = htonl(input->numOfCols);
42
  rsp->numOfBlocks = htonl(input->numOfBlocks);
D
dapan1121 已提交
43 44 45
}

void qwFreeFetchRsp(void *msg) {
D
dapan 已提交
46 47 48
  if (msg) {
    rpcFreeCont(msg);
  }
D
dapan1121 已提交
49 50
}

D
dapan1121 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64
int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code) {
  SRpcMsg rpcRsp = {
      .msgType = rspType,
      .pCont = NULL,
      .contLen = 0,
      .code = code,
      .info = *pConn,
  };

  tmsgSendRsp(&rpcRsp);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
65
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) {
H
Hongze Cheng 已提交
66 67
  STbVerInfo     *tbInfo = ctx ? &ctx->tbInfo : NULL;
  int64_t         affectedRows = ctx ? ctx->affectedRows : 0;
D
dapan1121 已提交
68 69 70 71
  SQueryTableRsp  rsp = {0};
  rsp.code = code;
  rsp.affectedRows = affectedRows;

72
  if (tbInfo) {
D
dapan1121 已提交
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
    strcpy(rsp.tbFName, tbInfo->tbFName);
    rsp.sversion = tbInfo->sversion;
    rsp.tversion = tbInfo->tversion;
  }

  int32_t msgSize = tSerializeSQueryTableRsp(NULL, 0, &rsp);
  if (msgSize < 0) {
    qError("tSerializeSQueryTableRsp failed");
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  
  void *pRsp = rpcMallocCont(msgSize);
  if (NULL == pRsp) {
    qError("rpcMallocCont %d failed", msgSize);
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  if (tSerializeSQueryTableRsp(pRsp, msgSize, &rsp) < 0) {
    qError("tSerializeSQueryTableRsp %d failed", msgSize);
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
93
  }
D
dapan1121 已提交
94 95

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
96
      .msgType = rspType,
L
Liu Jicong 已提交
97
      .pCont = pRsp,
D
dapan1121 已提交
98
      .contLen = msgSize,
L
Liu Jicong 已提交
99
      .code = code,
100
      .info = *pConn,
D
dapan1121 已提交
101 102
  };

S
shm  
Shengliang Guan 已提交
103
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
104 105 106 107

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
108 109 110
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray *pExecList) {
  SExplainExecInfo *pInfo = taosArrayGet(pExecList, 0);
  SExplainRsp       rsp = {.numOfPlans = taosArrayGetSize(pExecList), .subplanInfo = pInfo};
D
dapan1121 已提交
111 112

  int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
H
Hongze Cheng 已提交
113
  void   *pRsp = rpcMallocCont(contLen);
D
dapan1121 已提交
114 115 116
  tSerializeSExplainRsp(pRsp, contLen, &rsp);

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
117
      .msgType = TDMT_SCH_EXPLAIN_RSP,
L
Liu Jicong 已提交
118 119 120
      .pCont = pRsp,
      .contLen = contLen,
      .code = 0,
121
      .info = *pConn,
D
dapan1121 已提交
122 123
  };

H
Haojun Liao 已提交
124
  rpcRsp.info.ahandle = NULL;
D
dapan1121 已提交
125 126 127 128
  tmsgSendRsp(&rpcRsp);
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
129
int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
D
dapan1121 已提交
130
  int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
H
Hongze Cheng 已提交
131
  void   *pRsp = rpcMallocCont(contLen);
D
dapan1121 已提交
132
  tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
D
dapan1121 已提交
133 134

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
135
      .msgType = TDMT_SCH_QUERY_HEARTBEAT_RSP,
L
Liu Jicong 已提交
136
      .contLen = contLen,
137
      .pCont = pRsp,
L
Liu Jicong 已提交
138
      .code = code,
139
      .info = *pConn,
D
dapan1121 已提交
140 141
  };

S
shm  
Shengliang Guan 已提交
142
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
143 144 145 146

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
147 148
int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength,
                               int32_t code) {
D
dapan1121 已提交
149 150 151 152 153 154 155
  if (NULL == pRsp) {
    pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRsp, 0, sizeof(SRetrieveTableRsp));
    dataLength = 0;
  }

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
156
      .msgType = rspType,
L
Liu Jicong 已提交
157 158 159
      .pCont = pRsp,
      .contLen = sizeof(*pRsp) + dataLength,
      .code = code,
160
      .info = *pConn,
D
dapan1121 已提交
161 162
  };

S
shm  
Shengliang Guan 已提交
163
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
164 165 166 167

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
168
#if 0
S
Shengliang Guan 已提交
169
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
D
dapan1121 已提交
170 171 172 173
  STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
174
      .msgType = TDMT_SCH_CANCEL_TASK_RSP,
L
Liu Jicong 已提交
175 176 177
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
178
      .info = *pConn,
D
dapan1121 已提交
179 180
  };

S
shm  
Shengliang Guan 已提交
181
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
182 183 184
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
185
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
D
dapan1121 已提交
186 187 188 189
  STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
190
      .msgType = TDMT_SCH_DROP_TASK_RSP,
L
Liu Jicong 已提交
191 192 193
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
194
      .info = *pConn,
D
dapan1121 已提交
195 196
  };

S
shm  
Shengliang Guan 已提交
197
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
198 199
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
200
#endif
D
dapan1121 已提交
201

D
dapan1121 已提交
202
int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
D
dapan1121 已提交
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
  STaskDropReq qMsg;
  qMsg.header.vgId = mgmt->nodeId;
  qMsg.header.contLen = 0;
  qMsg.sId = sId;
  qMsg.queryId = qId;
  qMsg.taskId = tId;
  qMsg.refId = rId;
  qMsg.execId = eId;
  
  int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg);
  if (msgSize < 0) {
    QW_SCH_TASK_ELOG("tSerializeSTaskDropReq get size, msgSize:%d", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  
  void *msg = rpcMallocCont(msgSize);
  if (NULL == msg) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  
  if (tSerializeSTaskDropReq(msg, msgSize, &qMsg) < 0) {
    QW_SCH_TASK_ELOG("tSerializeSTaskDropReq failed, msgSize:%d", msgSize);
    rpcFreeCont(msg);
D
dapan1121 已提交
227 228 229 230 231
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  SRpcMsg pNewMsg = {
      .msgType = TDMT_SCH_DROP_TASK,
D
dapan1121 已提交
232 233
      .pCont = msg,
      .contLen = msgSize,
D
dapan1121 已提交
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
      .code = 0,
      .info = *pConn,
  };

  int32_t code = tmsgPutToQueue(&mgmt->msgCb, FETCH_QUEUE, &pNewMsg);
  if (TSDB_CODE_SUCCESS != code) {
    QW_SCH_TASK_ELOG("put drop task msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
    QW_ERR_RET(code);
  }

  QW_SCH_TASK_DLOG("drop task msg put to queue, vgId:%d", mgmt->nodeId);

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
249
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
L
Liu Jicong 已提交
250
  SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
D
dapan1121 已提交
251 252 253 254 255 256 257 258 259
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  req->header.vgId = mgmt->nodeId;
  req->sId = sId;
  req->queryId = qId;
  req->taskId = tId;
D
dapan1121 已提交
260
  req->execId = eId;
D
dapan1121 已提交
261 262

  SRpcMsg pNewMsg = {
D
dapan1121 已提交
263
      .msgType = TDMT_SCH_QUERY_CONTINUE,
L
Liu Jicong 已提交
264 265 266
      .pCont = req,
      .contLen = sizeof(SQueryContinueReq),
      .code = 0,
267
      .info = *pConn,
D
dapan1121 已提交
268 269
  };

S
Shengliang Guan 已提交
270
  int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg);
D
dapan1121 已提交
271
  if (TSDB_CODE_SUCCESS != code) {
H
Haojun Liao 已提交
272
    QW_SCH_TASK_ELOG("put query continue msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
D
dapan1121 已提交
273 274 275
    QW_ERR_RET(code);
  }

D
dapan1121 已提交
276
  QW_SCH_TASK_DLOG("query continue msg put to queue, vgId:%d", mgmt->nodeId);
D
dapan1121 已提交
277 278 279 280

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
281
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
D
dapan1121 已提交
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
  STaskDropReq qMsg;
  qMsg.header.vgId = mgmt->nodeId;
  qMsg.header.contLen = 0;
  qMsg.sId = sId;
  qMsg.queryId = qId;
  qMsg.taskId = tId;
  qMsg.refId = rId;
  qMsg.execId = eId;
  
  int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg);
  if (msgSize < 0) {
    QW_SCH_TASK_ELOG("tSerializeSTaskDropReq get size, msgSize:%d", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  
  void *msg = rpcMallocCont(msgSize);
  if (NULL == msg) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  
  if (tSerializeSTaskDropReq(msg, msgSize, &qMsg) < 0) {
    QW_SCH_TASK_ELOG("tSerializeSTaskDropReq failed, msgSize:%d", msgSize);
    rpcFreeCont(msg);
D
dapan1121 已提交
306
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
L
Liu Jicong 已提交
307 308
  }

S
Shengliang Guan 已提交
309
  SRpcMsg brokenMsg = {
D
dapan1121 已提交
310
      .msgType = TDMT_SCH_DROP_TASK,
D
dapan1121 已提交
311 312
      .pCont = msg,
      .contLen = msgSize,
D
dapan1121 已提交
313
      .code = TSDB_CODE_RPC_BROKEN_LINK,
314
      .info = *pConn,
D
dapan1121 已提交
315
  };
L
Liu Jicong 已提交
316

S
Shengliang Guan 已提交
317
  tmsgRegisterBrokenLinkArg(&brokenMsg);
D
dapan1121 已提交
318 319 320 321

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
322
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn) {
D
dapan1121 已提交
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
  SSchedulerHbReq req = {0};
  req.header.vgId = mgmt->nodeId;
  req.sId = sId;

  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
  if (msgSize < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  void *msg = rpcMallocCont(msgSize);
  if (NULL == msg) {
    QW_SCH_ELOG("calloc %d failed", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    taosMemoryFree(msg);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
342

S
Shengliang Guan 已提交
343
  SRpcMsg brokenMsg = {
D
dapan1121 已提交
344
      .msgType = TDMT_SCH_QUERY_HEARTBEAT,
L
Liu Jicong 已提交
345 346
      .pCont = msg,
      .contLen = msgSize,
D
dapan1121 已提交
347
      .code = TSDB_CODE_RPC_BROKEN_LINK,
348
      .info = *pConn,
D
dapan1121 已提交
349
  };
L
Liu Jicong 已提交
350

S
Shengliang Guan 已提交
351
  tmsgRegisterBrokenLinkArg(&brokenMsg);
D
dapan1121 已提交
352 353 354 355

  return TSDB_CODE_SUCCESS;
}

356
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGrant) {
D
dapan1121 已提交
357 358 359 360 361
  if (NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t       code = 0;
H
Hongze Cheng 已提交
362
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
363 364 365
  SSubQueryMsg  msg = {0};
  if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) {
    QW_ELOG("tDeserializeSSubQueryMsg failed, contLen:%d", pMsg->contLen);
D
dapan1121 已提交
366 367 368
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

D
dapan1121 已提交
369 370 371
  if (chkGrant && (!TEST_SHOW_REWRITE_MASK(msg.msgMask)) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) {
    QW_ELOG("query failed cause of grant expired, msgMask:%d", msg.msgMask);
    tFreeSSubQueryMsg(&msg);
372 373
    QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED);
  }
D
dapan1121 已提交
374

D
dapan1121 已提交
375 376 377 378 379
  uint64_t sId = msg.sId;
  uint64_t qId = msg.queryId;
  uint64_t tId = msg.taskId;
  int64_t  rId = msg.refId;
  int32_t  eId = msg.execId;
D
dapan1121 已提交
380

H
Hongze Cheng 已提交
381
  SQWMsg qwMsg = {
D
dapan1121 已提交
382
      .msgType = pMsg->msgType, .msg = msg.msg, .msgLen = msg.msgLen, .connInfo = pMsg->info};
D
dapan1121 已提交
383

D
dapan1121 已提交
384 385 386
  QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p, SQL:%s", pMsg->info.handle, msg.sql);
  code = qwPreprocessQuery(QW_FPARAMS(), &qwMsg);
  QW_SCH_TASK_DLOG("prerocessQuery end, handle:%p, code:%x", pMsg->info.handle, code);
D
dapan1121 已提交
387 388 389 390

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
391 392 393 394 395
int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

H
Hongze Cheng 已提交
396
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
397 398 399 400 401
  SSubQueryMsg msg = {0};
  if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) {
    QW_ELOG("tDeserializeSSubQueryMsg failed, contLen:%d", pMsg->contLen);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
D
dapan1121 已提交
402

D
dapan1121 已提交
403 404 405 406 407
  uint64_t sId = msg.sId;
  uint64_t qId = msg.queryId;
  uint64_t tId = msg.taskId;
  int64_t  rId = msg.refId;
  int32_t  eId = msg.execId;
D
dapan1121 已提交
408 409 410 411 412

  QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle);
  qwAbortPrerocessQuery(QW_FPARAMS());
  QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle);

D
dapan1121 已提交
413 414
  tFreeSSubQueryMsg(&msg);

D
dapan1121 已提交
415 416 417
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
418
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
419 420 421 422
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

L
Liu Jicong 已提交
423
  int32_t       code = 0;
H
Hongze Cheng 已提交
424
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
425

D
dapan1121 已提交
426 427
  qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1);
D
dapan1121 已提交
428

D
dapan1121 已提交
429 430 431
  SSubQueryMsg  msg = {0};
  if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) {
    QW_ELOG("tDeserializeSSubQueryMsg failed, contLen:%d", pMsg->contLen);
D
dapan1121 已提交
432
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
433 434
  }

D
dapan1121 已提交
435 436 437 438 439
  uint64_t sId = msg.sId;
  uint64_t qId = msg.queryId;
  uint64_t tId = msg.taskId;
  int64_t  rId = msg.refId;
  int32_t  eId = msg.execId;
D
dapan1121 已提交
440

H
Hongze Cheng 已提交
441
  SQWMsg qwMsg = {.node = node,
D
dapan1121 已提交
442 443
                  .msg = msg.msg,
                  .msgLen = msg.msgLen,
H
Hongze Cheng 已提交
444 445
                  .connInfo = pMsg->info,
                  .msgType = pMsg->msgType};
D
dapan1121 已提交
446 447 448
  qwMsg.msgInfo.explain = msg.explain;
  qwMsg.msgInfo.taskType = msg.taskType;
  qwMsg.msgInfo.needFetch = msg.needFetch;
H
Hongze Cheng 已提交
449 450

  QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType),
D
dapan1121 已提交
451 452 453 454
                   pMsg->info.handle, msg.sql);
  code = qwProcessQuery(QW_FPARAMS(), &qwMsg, msg.sql);
  msg.sql = NULL;
  QW_SCH_TASK_DLOG("processQuery end, node:%p, code:%x", node, code);
455

D
dapan1121 已提交
456
  tFreeSSubQueryMsg(&msg);
D
dapan1121 已提交
457

D
dapan1121 已提交
458
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
459 460
}

D
dapan1121 已提交
461
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
L
Liu Jicong 已提交
462 463 464
  int32_t            code = 0;
  int8_t             status = 0;
  bool               queryDone = false;
465
  SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
L
Liu Jicong 已提交
466
  bool               needStop = false;
H
Hongze Cheng 已提交
467 468
  SQWTaskCtx        *handles = NULL;
  SQWorker          *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
469

D
dapan1121 已提交
470 471
  qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1);
D
dapan1121 已提交
472

D
dapan1121 已提交
473
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
474
    QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
475
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
476 477
  }

478 479 480
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
L
Liu Jicong 已提交
481
  int64_t  rId = 0;
D
dapan1121 已提交
482
  int32_t  eId = msg->execId;
D
dapan1121 已提交
483

484
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
D
dapan1121 已提交
485

S
Shengliang Guan 已提交
486
  QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->info.handle);
487 488 489

  QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
490
  QW_SCH_TASK_DLOG("processCQuery end, node:%p", node);
491

L
Liu Jicong 已提交
492
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
493 494
}

D
dapan1121 已提交
495
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
496 497 498 499 500
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  SResFetchReq *msg = pMsg->pCont;
H
Hongze Cheng 已提交
501
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
502

D
dapan1121 已提交
503 504
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1);
D
dapan1121 已提交
505

D
dapan1121 已提交
506
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
L
Liu Jicong 已提交
507
    QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
508
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
509
  }
D
dapan1121 已提交
510

D
dapan1121 已提交
511 512 513
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
514
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
515 516 517 518

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
L
Liu Jicong 已提交
519
  int64_t  rId = 0;
D
dapan1121 已提交
520
  int32_t  eId = msg->execId;
D
dapan1121 已提交
521

D
dapan1121 已提交
522
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info, .msgType = pMsg->msgType};
D
dapan1121 已提交
523

S
Shengliang Guan 已提交
524
  QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
525 526 527

  QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
528
  QW_SCH_TASK_DLOG("processFetch end, node:%p", node);
529

L
Liu Jicong 已提交
530
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
531 532
}

D
dapan1121 已提交
533
int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
H
Hongze Cheng 已提交
534
  SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
535 536
  if (mgmt) {
    qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
D
dapan1121 已提交
537
    QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1);
D
dapan1121 已提交
538
  }
D
dapan1121 已提交
539

D
dapan1121 已提交
540
  qProcessRspMsg(NULL, pMsg, NULL);
541
  pMsg->pCont = NULL;
S
Shengliang 已提交
542 543 544
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
545
#if 0
D
dapan1121 已提交
546
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
547 548 549 550
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

H
Hongze Cheng 已提交
551
  SQWorker       *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
552
  int32_t         code = 0;
D
dapan1121 已提交
553
  STaskCancelReq *msg = pMsg->pCont;
D
dapan1121 已提交
554

D
dapan1121 已提交
555 556
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1);
D
dapan1121 已提交
557

D
dapan1121 已提交
558
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
L
Liu Jicong 已提交
559
    qError("invalid task cancel msg");
D
dapan1121 已提交
560
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
561
  }
D
dapan1121 已提交
562

D
dapan1121 已提交
563 564 565 566
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
567
  msg->execId = ntohl(msg->execId);
D
dapan1121 已提交
568

D
dapan1121 已提交
569 570 571 572
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;
D
dapan1121 已提交
573
  int32_t  eId = msg->execId;
D
dapan1121 已提交
574

575
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
D
dapan1121 已提交
576

L
Liu Jicong 已提交
577
  // QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
D
dapan1121 已提交
578 579 580

_return:

S
shm  
Shengliang Guan 已提交
581
  QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code));
D
dapan1121 已提交
582
  QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
583 584 585

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
586
#endif
D
dapan1121 已提交
587

D
dapan1121 已提交
588
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
589 590 591 592
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

L
Liu Jicong 已提交
593
  int32_t       code = 0;
H
Hongze Cheng 已提交
594
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
595

D
dapan1121 已提交
596 597
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1);
D
dapan1121 已提交
598

D
dapan1121 已提交
599 600 601
  STaskDropReq  msg = {0};
  if (tDeserializeSTaskDropReq(pMsg->pCont, pMsg->contLen, &msg) < 0) {
    QW_ELOG("tDeserializeSTaskDropReq failed, contLen:%d", pMsg->contLen);
D
dapan1121 已提交
602
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
603
  }
D
dapan1121 已提交
604

D
dapan1121 已提交
605 606 607 608 609
  uint64_t sId = msg.sId;
  uint64_t qId = msg.queryId;
  uint64_t tId = msg.taskId;
  int64_t  rId = msg.refId;
  int32_t  eId = msg.execId;
D
dapan1121 已提交
610

611
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
D
dapan1121 已提交
612

D
dapan1121 已提交
613
  if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) {
L
Liu Jicong 已提交
614
    QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));
D
dapan1121 已提交
615
  }
D
dapan1121 已提交
616

S
Shengliang Guan 已提交
617
  QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle);
618 619 620

  QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
621
  QW_SCH_TASK_DLOG("processDrop end, node:%p", node);
622 623

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
624 625
}

D
dapan1121 已提交
626
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
627 628 629 630
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

L
Liu Jicong 已提交
631
  int32_t         code = 0;
D
dapan1121 已提交
632
  SSchedulerHbReq req = {0};
H
Hongze Cheng 已提交
633
  SQWorker       *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
634

D
dapan1121 已提交
635 636
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1);
D
dapan1121 已提交
637

D
dapan1121 已提交
638 639 640
  if (NULL == pMsg->pCont) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
641
  }
D
dapan1121 已提交
642 643 644 645 646 647 648 649

  if (tDeserializeSSchedulerHbReq(pMsg->pCont, pMsg->contLen, &req)) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    tFreeSSchedulerHbReq(&req);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  uint64_t sId = req.sId;
650
  SQWMsg   qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
D
dapan1121 已提交
651
  if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) {
L
Liu Jicong 已提交
652
    QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));
D
dapan1121 已提交
653 654
  }

S
Shengliang Guan 已提交
655
  QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle);
D
dapan1121 已提交
656 657 658 659 660 661 662

  QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));

  QW_SCH_DLOG("processHb end, node:%p", node);

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
663

664 665
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
D
dapan1121 已提交
666 667 668
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

H
Hongze Cheng 已提交
669
  int32_t     code = 0;
D
dapan1121 已提交
670
  SVDeleteReq req = {0};
H
Hongze Cheng 已提交
671
  SQWorker   *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
672 673 674 675

  QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1);

  tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req);
H
Hongze Cheng 已提交
676

D
dapan1121 已提交
677 678 679 680
  uint64_t sId = req.sId;
  uint64_t qId = req.queryId;
  uint64_t tId = req.taskId;
  int64_t  rId = 0;
D
dapan1121 已提交
681
  int32_t  eId = -1;
D
dapan1121 已提交
682 683 684 685 686

  SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info};
  QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql);
  taosMemoryFreeClear(req.sql);

687
  QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRes));
D
dapan1121 已提交
688

689
  taosMemoryFreeClear(req.msg);
D
dapan1121 已提交
690 691 692 693 694 695
  QW_SCH_TASK_DLOG("processDelete end, node:%p", node);

_return:

  QW_RET(code);
}