qwMsg.c 14.0 KB
Newer Older
D
dapan1121 已提交
1
#include "qwMsg.h"
L
Liu Jicong 已提交
2
#include "dataSinkMgt.h"
D
dapan1121 已提交
3 4 5
#include "executor.h"
#include "planner.h"
#include "query.h"
L
Liu Jicong 已提交
6
#include "qworker.h"
D
dapan1121 已提交
7
#include "qwInt.h"
L
Liu Jicong 已提交
8
#include "tcommon.h"
D
dapan1121 已提交
9 10 11 12 13
#include "tmsg.h"
#include "tname.h"

int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
  int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
L
Liu Jicong 已提交
14

D
dapan1121 已提交
15 16 17 18 19 20 21 22 23 24 25 26 27
  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(msgSize);
  if (NULL == pRsp) {
    qError("rpcMallocCont %d failed", msgSize);
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  memset(pRsp, 0, sizeof(SRetrieveTableRsp));

  *rsp = pRsp;

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
28
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) {
D
dapan1121 已提交
29
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
L
Liu Jicong 已提交
30

D
dapan1121 已提交
31
  rsp->useconds = htobe64(input->useconds);
D
dapan1121 已提交
32
  rsp->completed = qComplete;
D
dapan1121 已提交
33 34 35 36
  rsp->precision = input->precision;
  rsp->compressed = input->compressed;
  rsp->compLen = htonl(len);
  rsp->numOfRows = htonl(input->numOfRows);
37
  rsp->numOfCols = htonl(input->numOfCols);
D
dapan1121 已提交
38 39 40
}

void qwFreeFetchRsp(void *msg) {
D
dapan 已提交
41 42 43
  if (msg) {
    rpcFreeCont(msg);
  }
D
dapan1121 已提交
44 45
}

D
dapan1121 已提交
46 47
int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo) {
  SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
D
dapan1121 已提交
48
  pRsp->code = code;
49 50 51 52 53
  if (tbInfo) {
    strcpy(pRsp->tbFName, tbInfo->tbFName);
    pRsp->sversion = tbInfo->sversion;
    pRsp->tversion = tbInfo->tversion;
  }
D
dapan1121 已提交
54 55

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
56
      .msgType = TDMT_VND_QUERY_RSP,
L
Liu Jicong 已提交
57 58 59
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
60
      .info = *pConn,
D
dapan1121 已提交
61 62
  };

S
shm  
Shengliang Guan 已提交
63
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
64 65 66 67

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
68
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num) {
D
dapan1121 已提交
69 70 71
  SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo};

  int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
dengyihao's avatar
dengyihao 已提交
72
  void *  pRsp = rpcMallocCont(contLen);
D
dapan1121 已提交
73 74 75
  tSerializeSExplainRsp(pRsp, contLen, &rsp);

  SRpcMsg rpcRsp = {
L
Liu Jicong 已提交
76 77 78 79
      .msgType = TDMT_VND_EXPLAIN_RSP,
      .pCont = pRsp,
      .contLen = contLen,
      .code = 0,
80
      .info = *pConn,
D
dapan1121 已提交
81
  };
82
  rpcRsp.info.ahandle = NULL;
D
dapan1121 已提交
83 84 85 86 87 88

  tmsgSendRsp(&rpcRsp);

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
89
int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
D
dapan1121 已提交
90
  int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
dengyihao's avatar
dengyihao 已提交
91
  void *  pRsp = rpcMallocCont(contLen);
D
dapan1121 已提交
92
  tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
D
dapan1121 已提交
93 94

  SRpcMsg rpcRsp = {
L
Liu Jicong 已提交
95 96
      .msgType = TDMT_VND_QUERY_HEARTBEAT_RSP,
      .contLen = contLen,
97
      .pCont = pRsp,
L
Liu Jicong 已提交
98
      .code = code,
99
      .info = *pConn,
D
dapan1121 已提交
100 101
  };

S
shm  
Shengliang Guan 已提交
102
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
103 104 105 106

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
107
int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
D
dapan1121 已提交
108 109 110 111 112 113 114
  if (NULL == pRsp) {
    pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRsp, 0, sizeof(SRetrieveTableRsp));
    dataLength = 0;
  }

  SRpcMsg rpcRsp = {
L
Liu Jicong 已提交
115 116 117 118
      .msgType = TDMT_VND_FETCH_RSP,
      .pCont = pRsp,
      .contLen = sizeof(*pRsp) + dataLength,
      .code = code,
119
      .info = *pConn,
D
dapan1121 已提交
120 121
  };

S
shm  
Shengliang Guan 已提交
122
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
123 124 125 126

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
127
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
D
dapan1121 已提交
128 129 130 131
  STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
L
Liu Jicong 已提交
132 133 134 135
      .msgType = TDMT_VND_CANCEL_TASK_RSP,
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
136
      .info = *pConn,
D
dapan1121 已提交
137 138
  };

S
shm  
Shengliang Guan 已提交
139
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
140 141 142
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
143
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
D
dapan1121 已提交
144 145 146 147
  STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
L
Liu Jicong 已提交
148 149 150 151
      .msgType = TDMT_VND_DROP_TASK_RSP,
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
152
      .info = *pConn,
D
dapan1121 已提交
153 154
  };

S
shm  
Shengliang Guan 已提交
155
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
156 157 158
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
159
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
L
Liu Jicong 已提交
160
  SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
D
dapan1121 已提交
161 162 163 164 165 166 167 168 169 170 171
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  req->header.vgId = mgmt->nodeId;
  req->sId = sId;
  req->queryId = qId;
  req->taskId = tId;

  SRpcMsg pNewMsg = {
L
Liu Jicong 已提交
172 173 174 175
      .msgType = TDMT_VND_QUERY_CONTINUE,
      .pCont = req,
      .contLen = sizeof(SQueryContinueReq),
      .code = 0,
176
      .info = *pConn,
D
dapan1121 已提交
177 178
  };

S
Shengliang Guan 已提交
179
  int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg);
D
dapan1121 已提交
180
  if (TSDB_CODE_SUCCESS != code) {
H
Haojun Liao 已提交
181
    QW_SCH_TASK_ELOG("put query continue msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
D
dapan1121 已提交
182 183 184 185
    rpcFreeCont(req);
    QW_ERR_RET(code);
  }

D
dapan1121 已提交
186
  QW_SCH_TASK_DLOG("query continue msg put to queue, vgId:%d", mgmt->nodeId);
D
dapan1121 已提交
187 188 189 190

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
191
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
L
Liu Jicong 已提交
192
  STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
D
dapan1121 已提交
193 194 195
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
L
Liu Jicong 已提交
196
  }
D
dapan1121 已提交
197

D
dapan1121 已提交
198 199 200 201 202
  req->header.vgId = htonl(mgmt->nodeId);
  req->sId = htobe64(sId);
  req->queryId = htobe64(qId);
  req->taskId = htobe64(tId);
  req->refId = htobe64(rId);
L
Liu Jicong 已提交
203

S
Shengliang Guan 已提交
204
  SRpcMsg brokenMsg = {
L
Liu Jicong 已提交
205 206 207 208
      .msgType = TDMT_VND_DROP_TASK,
      .pCont = req,
      .contLen = sizeof(STaskDropReq),
      .code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
209
      .info = *pConn,
D
dapan1121 已提交
210
  };
L
Liu Jicong 已提交
211

S
Shengliang Guan 已提交
212
  tmsgRegisterBrokenLinkArg(&brokenMsg);
D
dapan1121 已提交
213 214 215 216

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
217
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn) {
D
dapan1121 已提交
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
  SSchedulerHbReq req = {0};
  req.header.vgId = mgmt->nodeId;
  req.sId = sId;

  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
  if (msgSize < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  void *msg = rpcMallocCont(msgSize);
  if (NULL == msg) {
    QW_SCH_ELOG("calloc %d failed", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    taosMemoryFree(msg);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
237

S
Shengliang Guan 已提交
238
  SRpcMsg brokenMsg = {
L
Liu Jicong 已提交
239 240 241 242
      .msgType = TDMT_VND_QUERY_HEARTBEAT,
      .pCont = msg,
      .contLen = msgSize,
      .code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
243
      .info = *pConn,
D
dapan1121 已提交
244
  };
L
Liu Jicong 已提交
245

S
Shengliang Guan 已提交
246
  tmsgRegisterBrokenLinkArg(&brokenMsg);
D
dapan1121 已提交
247 248 249 250

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
251
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
252 253 254 255
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

L
Liu Jicong 已提交
256
  int32_t       code = 0;
D
dapan1121 已提交
257
  SSubQueryMsg *msg = pMsg->pCont;
dengyihao's avatar
dengyihao 已提交
258
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
259

D
dapan1121 已提交
260 261
  qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1);
D
dapan1121 已提交
262

D
dapan1121 已提交
263
  if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
D
dapan1121 已提交
264
    QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
265
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
266 267
  }

L
Liu Jicong 已提交
268
  msg->sId = be64toh(msg->sId);
D
dapan1121 已提交
269
  msg->queryId = be64toh(msg->queryId);
L
Liu Jicong 已提交
270 271 272 273
  msg->taskId = be64toh(msg->taskId);
  msg->refId = be64toh(msg->refId);
  msg->phyLen = ntohl(msg->phyLen);
  msg->sqlLen = ntohl(msg->sqlLen);
274

D
dapan1121 已提交
275 276 277
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
278
  int64_t  rId = msg->refId;
D
dapan1121 已提交
279

280
  SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
dengyihao's avatar
dengyihao 已提交
281
  char * sql = strndup(msg->msg, msg->sqlLen);
S
Shengliang Guan 已提交
282
  QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, sql);
wafwerar's avatar
wafwerar 已提交
283
  taosMemoryFreeClear(sql);
284

D
dapan1121 已提交
285
  QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain));
286

D
dapan1121 已提交
287
  QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
288

L
Liu Jicong 已提交
289
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
290 291
}

D
dapan1121 已提交
292
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
L
Liu Jicong 已提交
293 294 295
  int32_t            code = 0;
  int8_t             status = 0;
  bool               queryDone = false;
296
  SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
L
Liu Jicong 已提交
297
  bool               needStop = false;
dengyihao's avatar
dengyihao 已提交
298 299
  SQWTaskCtx *       handles = NULL;
  SQWorker *         mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
300

D
dapan1121 已提交
301 302
  qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1);
D
dapan1121 已提交
303

D
dapan1121 已提交
304
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
305
    QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
306
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
307 308
  }

309 310 311
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
L
Liu Jicong 已提交
312
  int64_t  rId = 0;
D
dapan1121 已提交
313

314
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
D
dapan1121 已提交
315

S
Shengliang Guan 已提交
316
  QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->info.handle);
317 318 319

  QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
320
  QW_SCH_TASK_DLOG("processCQuery end, node:%p", node);
321

L
Liu Jicong 已提交
322
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
323 324
}

D
dapan1121 已提交
325
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
326 327 328 329 330
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  SResFetchReq *msg = pMsg->pCont;
dengyihao's avatar
dengyihao 已提交
331
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
332

D
dapan1121 已提交
333 334
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1);
D
dapan1121 已提交
335

D
dapan1121 已提交
336
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
L
Liu Jicong 已提交
337
    QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
338
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
339
  }
D
dapan1121 已提交
340

D
dapan1121 已提交
341 342 343
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
344 345 346 347

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
L
Liu Jicong 已提交
348
  int64_t  rId = 0;
D
dapan1121 已提交
349

350
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
D
dapan1121 已提交
351

S
Shengliang Guan 已提交
352
  QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
353 354 355

  QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
356
  QW_SCH_TASK_DLOG("processFetch end, node:%p", node);
357

L
Liu Jicong 已提交
358
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
359 360
}

D
dapan1121 已提交
361 362
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
  SQWorker *      mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
363 364 365 366
  if (mgmt) {
    qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
    QW_STAT_INC(mgmt->stat.msgStat.fetchRspProcessed, 1);
  }
D
dapan1121 已提交
367

S
Shengliang Guan 已提交
368
  qProcessFetchRsp(NULL, pMsg, NULL);
369
  pMsg->pCont = NULL;
S
Shengliang 已提交
370 371 372
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
373
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
374 375 376 377
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

dengyihao's avatar
dengyihao 已提交
378
  SQWorker *      mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
379
  int32_t         code = 0;
D
dapan1121 已提交
380
  STaskCancelReq *msg = pMsg->pCont;
D
dapan1121 已提交
381

D
dapan1121 已提交
382 383
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1);
D
dapan1121 已提交
384

D
dapan1121 已提交
385
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
L
Liu Jicong 已提交
386
    qError("invalid task cancel msg");
D
dapan1121 已提交
387
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
388
  }
D
dapan1121 已提交
389

D
dapan1121 已提交
390 391 392 393
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
394

D
dapan1121 已提交
395 396 397 398 399
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;

400
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
D
dapan1121 已提交
401

L
Liu Jicong 已提交
402
  // QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
D
dapan1121 已提交
403 404 405

_return:

S
shm  
Shengliang Guan 已提交
406
  QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code));
D
dapan1121 已提交
407
  QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
408 409 410 411

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
412
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
413 414 415 416
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

L
Liu Jicong 已提交
417
  int32_t       code = 0;
D
dapan1121 已提交
418
  STaskDropReq *msg = pMsg->pCont;
dengyihao's avatar
dengyihao 已提交
419
  SQWorker *    mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
420

D
dapan1121 已提交
421 422
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1);
D
dapan1121 已提交
423

D
dapan1121 已提交
424
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
425
    QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
426
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
427
  }
D
dapan1121 已提交
428

D
dapan1121 已提交
429 430 431
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
432
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
433 434 435 436

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
437
  int64_t  rId = msg->refId;
D
dapan1121 已提交
438

439
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
D
dapan1121 已提交
440

D
dapan1121 已提交
441
  if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
L
Liu Jicong 已提交
442
    QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));
D
dapan1121 已提交
443
  }
D
dapan1121 已提交
444

S
Shengliang Guan 已提交
445
  QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle);
446 447 448

  QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
449
  QW_SCH_TASK_DLOG("processDrop end, node:%p", node);
450 451

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
452 453
}

D
dapan1121 已提交
454
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
D
dapan1121 已提交
455 456 457 458
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

L
Liu Jicong 已提交
459
  int32_t         code = 0;
D
dapan1121 已提交
460
  SSchedulerHbReq req = {0};
dengyihao's avatar
dengyihao 已提交
461
  SQWorker *      mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
462

D
dapan1121 已提交
463 464
  qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
  QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1);
D
dapan1121 已提交
465

D
dapan1121 已提交
466 467 468
  if (NULL == pMsg->pCont) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
469
  }
D
dapan1121 已提交
470 471 472 473 474 475 476 477

  if (tDeserializeSSchedulerHbReq(pMsg->pCont, pMsg->contLen, &req)) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    tFreeSSchedulerHbReq(&req);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  uint64_t sId = req.sId;
478
  SQWMsg   qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
D
dapan1121 已提交
479
  if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
L
Liu Jicong 已提交
480
    QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));
D
dapan1121 已提交
481 482
  }

S
Shengliang Guan 已提交
483
  QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle);
D
dapan1121 已提交
484 485 486 487 488 489 490

  QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));

  QW_SCH_DLOG("processHb end, node:%p", node);

  return TSDB_CODE_SUCCESS;
}