qworkerMsg.c 18.6 KB
Newer Older
L
Liu Jicong 已提交
1 2
#include "qworkerMsg.h"
#include "dataSinkMgt.h"
D
dapan1121 已提交
3 4 5
#include "executor.h"
#include "planner.h"
#include "query.h"
L
Liu Jicong 已提交
6
#include "qworker.h"
D
dapan1121 已提交
7
#include "qworkerInt.h"
L
Liu Jicong 已提交
8
#include "tcommon.h"
D
dapan1121 已提交
9 10 11 12 13
#include "tmsg.h"
#include "tname.h"

int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
  int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
L
Liu Jicong 已提交
14

D
dapan1121 已提交
15 16 17 18 19 20 21 22 23 24 25 26 27
  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(msgSize);
  if (NULL == pRsp) {
    qError("rpcMallocCont %d failed", msgSize);
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  memset(pRsp, 0, sizeof(SRetrieveTableRsp));

  *rsp = pRsp;

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
28
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) {
D
dapan1121 已提交
29
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
L
Liu Jicong 已提交
30

D
dapan1121 已提交
31
  rsp->useconds = htobe64(input->useconds);
D
dapan1121 已提交
32
  rsp->completed = qComplete;
D
dapan1121 已提交
33 34 35 36
  rsp->precision = input->precision;
  rsp->compressed = input->compressed;
  rsp->compLen = htonl(len);
  rsp->numOfRows = htonl(input->numOfRows);
37
  rsp->numOfCols = htonl(input->numOfCols);
D
dapan1121 已提交
38 39 40
}

void qwFreeFetchRsp(void *msg) {
D
dapan 已提交
41 42 43
  if (msg) {
    rpcFreeCont(msg);
  }
D
dapan1121 已提交
44 45
}

S
Shengliang Guan 已提交
46
int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code) {
D
dapan1121 已提交
47
  SQueryTableRsp rsp = {.code = code};
L
Liu Jicong 已提交
48

D
dapan1121 已提交
49
  int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp);
L
Liu Jicong 已提交
50
  void   *msg = rpcMallocCont(contLen);
D
dapan1121 已提交
51
  tSerializeSQueryTableRsp(msg, contLen, &rsp);
D
dapan1121 已提交
52 53

  SRpcMsg rpcRsp = {
L
Liu Jicong 已提交
54
      .msgType = TDMT_VND_QUERY_RSP,
S
Shengliang Guan 已提交
55
      .info = pConn,
L
Liu Jicong 已提交
56 57 58
      .pCont = msg,
      .contLen = contLen,
      .code = code,
D
dapan1121 已提交
59 60
  };

S
shm  
Shengliang Guan 已提交
61
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
62 63 64 65

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
66
int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code) {
D
dapan1121 已提交
67 68 69 70
  SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
L
Liu Jicong 已提交
71
      .msgType = TDMT_VND_RES_READY_RSP,
S
Shengliang Guan 已提交
72
      .info = pConn,
S
Shengliang Guan 已提交
73
      .info.ahandle = NULL,
L
Liu Jicong 已提交
74 75 76
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
D
dapan1121 已提交
77 78
  };

S
shm  
Shengliang Guan 已提交
79
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
80 81 82 83

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
84
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num) {
D
dapan1121 已提交
85 86 87
  SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo};

  int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
L
Liu Jicong 已提交
88
  void   *pRsp = rpcMallocCont(contLen);
D
dapan1121 已提交
89 90 91
  tSerializeSExplainRsp(pRsp, contLen, &rsp);

  SRpcMsg rpcRsp = {
L
Liu Jicong 已提交
92
      .msgType = TDMT_VND_EXPLAIN_RSP,
S
Shengliang Guan 已提交
93
      .info = pConn,
L
Liu Jicong 已提交
94 95 96
      .pCont = pRsp,
      .contLen = contLen,
      .code = 0,
D
dapan1121 已提交
97 98 99 100 101 102 103
  };

  tmsgSendRsp(&rpcRsp);

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
104
int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
D
dapan1121 已提交
105
  int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
L
Liu Jicong 已提交
106
  void   *pRsp = rpcMallocCont(contLen);
D
dapan1121 已提交
107
  tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
D
dapan1121 已提交
108 109

  SRpcMsg rpcRsp = {
L
Liu Jicong 已提交
110
      .msgType = TDMT_VND_QUERY_HEARTBEAT_RSP,
S
Shengliang Guan 已提交
111
      .info = pConn,
L
Liu Jicong 已提交
112 113 114
      .pCont = pRsp,
      .contLen = contLen,
      .code = code,
D
dapan1121 已提交
115 116
  };

S
shm  
Shengliang Guan 已提交
117
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
118 119 120 121

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
122
int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
D
dapan1121 已提交
123 124 125 126 127 128 129
  if (NULL == pRsp) {
    pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRsp, 0, sizeof(SRetrieveTableRsp));
    dataLength = 0;
  }

  SRpcMsg rpcRsp = {
L
Liu Jicong 已提交
130
      .msgType = TDMT_VND_FETCH_RSP,
S
Shengliang Guan 已提交
131
      .info = pConn,
L
Liu Jicong 已提交
132 133 134
      .pCont = pRsp,
      .contLen = sizeof(*pRsp) + dataLength,
      .code = code,
D
dapan1121 已提交
135 136
  };

S
shm  
Shengliang Guan 已提交
137
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
138 139 140 141

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
142
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
D
dapan1121 已提交
143 144 145 146
  STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
L
Liu Jicong 已提交
147
      .msgType = TDMT_VND_CANCEL_TASK_RSP,
S
Shengliang Guan 已提交
148
      .info = pConn,
L
Liu Jicong 已提交
149 150 151
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
D
dapan1121 已提交
152 153
  };

S
shm  
Shengliang Guan 已提交
154
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
155 156 157
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
158
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
D
dapan1121 已提交
159 160 161 162
  STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
L
Liu Jicong 已提交
163
      .msgType = TDMT_VND_DROP_TASK_RSP,
S
Shengliang Guan 已提交
164
      .info = pConn,
L
Liu Jicong 已提交
165 166 167
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
D
dapan1121 已提交
168 169
  };

S
shm  
Shengliang Guan 已提交
170
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
171 172 173
  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
174
int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
S
Shengliang Guan 已提交
175 176 177 178
  int32_t         numOfCols = 6;
  SVShowTablesRsp showRsp = {0};

  // showRsp.showId = 1;
wafwerar's avatar
wafwerar 已提交
179
  showRsp.tableMeta.pSchemas = taosMemoryCalloc(numOfCols, sizeof(SSchema));
S
Shengliang Guan 已提交
180 181 182 183
  if (showRsp.tableMeta.pSchemas == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
D
dapan1121 已提交
184

L
Liu Jicong 已提交
185
  col_id_t cols = 0;
S
Shengliang Guan 已提交
186
  SSchema *pSchema = showRsp.tableMeta.pSchemas;
D
dapan1121 已提交
187 188

  const SSchema *s = tGetTbnameColumnSchema();
S
Shengliang Guan 已提交
189
  *pSchema = createSchema(s->type, s->bytes, ++cols, "name");
D
dapan1121 已提交
190 191 192
  pSchema++;

  int32_t type = TSDB_DATA_TYPE_TIMESTAMP;
S
Shengliang Guan 已提交
193
  *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "created");
D
dapan1121 已提交
194 195 196
  pSchema++;

  type = TSDB_DATA_TYPE_SMALLINT;
S
Shengliang Guan 已提交
197
  *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "columns");
D
dapan1121 已提交
198 199
  pSchema++;

S
Shengliang Guan 已提交
200
  *pSchema = createSchema(s->type, s->bytes, ++cols, "stable");
D
dapan1121 已提交
201 202 203
  pSchema++;

  type = TSDB_DATA_TYPE_BIGINT;
S
Shengliang Guan 已提交
204
  *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "uid");
D
dapan1121 已提交
205 206 207
  pSchema++;

  type = TSDB_DATA_TYPE_INT;
S
Shengliang Guan 已提交
208
  *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "vgId");
D
dapan1121 已提交
209 210

  assert(cols == numOfCols);
S
Shengliang Guan 已提交
211 212 213 214 215
  showRsp.tableMeta.numOfColumns = cols;

  int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp);
  void   *pBuf = rpcMallocCont(bufLen);
  tSerializeSShowRsp(pBuf, bufLen, &showRsp);
D
dapan1121 已提交
216 217

  SRpcMsg rpcMsg = {
S
Shengliang Guan 已提交
218
      .info = pMsg->info,
S
Shengliang Guan 已提交
219 220 221
      .pCont = pBuf,
      .contLen = bufLen,
      .code = code,
D
dapan1121 已提交
222 223
  };

S
shm  
Shengliang Guan 已提交
224
  tmsgSendRsp(&rpcMsg);
D
dapan1121 已提交
225 226 227
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
228
int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq *pFetchReq) {
D
dapan1121 已提交
229
  SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp));
L
Liu Jicong 已提交
230
  int32_t               handle = htonl(pFetchReq->id);
D
dapan1121 已提交
231 232 233

  pRsp->numOfRows = 0;
  SRpcMsg rpcMsg = {
S
Shengliang Guan 已提交
234
      .info = pMsg->info,
L
Liu Jicong 已提交
235
      .pCont = pRsp,
D
dapan1121 已提交
236
      .contLen = sizeof(*pRsp),
L
Liu Jicong 已提交
237
      .code = 0,
D
dapan1121 已提交
238 239
  };

S
shm  
Shengliang Guan 已提交
240
  tmsgSendRsp(&rpcMsg);
D
dapan1121 已提交
241 242 243
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
244
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
L
Liu Jicong 已提交
245
  SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
D
dapan1121 已提交
246 247 248 249 250 251 252 253 254 255 256
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  req->header.vgId = mgmt->nodeId;
  req->sId = sId;
  req->queryId = qId;
  req->taskId = tId;

  SRpcMsg pNewMsg = {
S
Shengliang Guan 已提交
257 258
      .info.handle = pConn->handle,
      .info.ahandle = pConn->ahandle,
L
Liu Jicong 已提交
259
      .msgType = TDMT_VND_QUERY_CONTINUE,
S
Shengliang Guan 已提交
260
      .info.refId = pConn->refId,
L
Liu Jicong 已提交
261 262 263
      .pCont = req,
      .contLen = sizeof(SQueryContinueReq),
      .code = 0,
D
dapan1121 已提交
264 265
  };

S
Shengliang Guan 已提交
266
  int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg);
D
dapan1121 已提交
267
  if (TSDB_CODE_SUCCESS != code) {
H
Haojun Liao 已提交
268
    QW_SCH_TASK_ELOG("put query continue msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
D
dapan1121 已提交
269 270 271 272
    rpcFreeCont(req);
    QW_ERR_RET(code);
  }

D
dapan1121 已提交
273
  QW_SCH_TASK_DLOG("query continue msg put to queue, vgId:%d", mgmt->nodeId);
D
dapan1121 已提交
274 275 276 277

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
278
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
L
Liu Jicong 已提交
279
  STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
D
dapan1121 已提交
280 281 282
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
L
Liu Jicong 已提交
283
  }
D
dapan1121 已提交
284

D
dapan1121 已提交
285 286 287 288 289
  req->header.vgId = htonl(mgmt->nodeId);
  req->sId = htobe64(sId);
  req->queryId = htobe64(qId);
  req->taskId = htobe64(tId);
  req->refId = htobe64(rId);
L
Liu Jicong 已提交
290

D
dapan1121 已提交
291
  SRpcMsg pMsg = {
S
Shengliang Guan 已提交
292 293 294
      .info.handle = pConn->handle,
      .info.ahandle = pConn->ahandle,
      .info.refId = pConn->refId,
L
Liu Jicong 已提交
295 296 297 298
      .msgType = TDMT_VND_DROP_TASK,
      .pCont = req,
      .contLen = sizeof(STaskDropReq),
      .code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
D
dapan1121 已提交
299
  };
L
Liu Jicong 已提交
300

S
Shengliang Guan 已提交
301
  tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg);
D
dapan1121 已提交
302 303 304 305

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
306
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn) {
D
dapan1121 已提交
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
  SSchedulerHbReq req = {0};
  req.header.vgId = mgmt->nodeId;
  req.sId = sId;

  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
  if (msgSize < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  void *msg = rpcMallocCont(msgSize);
  if (NULL == msg) {
    QW_SCH_ELOG("calloc %d failed", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    taosMemoryFree(msg);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
L
Liu Jicong 已提交
326

D
dapan1121 已提交
327
  SRpcMsg pMsg = {
S
Shengliang Guan 已提交
328 329 330
      .info.handle = pConn->handle,
      .info.ahandle = pConn->ahandle,
      .info.refId = pConn->refId,
L
Liu Jicong 已提交
331 332 333 334
      .msgType = TDMT_VND_QUERY_HEARTBEAT,
      .pCont = msg,
      .contLen = msgSize,
      .code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
D
dapan1121 已提交
335
  };
L
Liu Jicong 已提交
336

D
dapan1121 已提交
337 338 339 340 341
  tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
342 343 344 345 346
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

L
Liu Jicong 已提交
347
  int32_t       code = 0;
D
dapan1121 已提交
348
  SSubQueryMsg *msg = pMsg->pCont;
D
dapan1121 已提交
349
  SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
350

D
dapan1121 已提交
351
  if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
D
dapan1121 已提交
352
    QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
353
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
354 355
  }

L
Liu Jicong 已提交
356
  msg->sId = be64toh(msg->sId);
D
dapan1121 已提交
357
  msg->queryId = be64toh(msg->queryId);
L
Liu Jicong 已提交
358 359 360 361
  msg->taskId = be64toh(msg->taskId);
  msg->refId = be64toh(msg->refId);
  msg->phyLen = ntohl(msg->phyLen);
  msg->sqlLen = ntohl(msg->sqlLen);
362

D
dapan1121 已提交
363 364 365
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
366
  int64_t  rId = msg->refId;
D
dapan1121 已提交
367

D
dapan1121 已提交
368
  SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen};
S
Shengliang Guan 已提交
369 370 371
  qwMsg.connInfo.handle = pMsg->info.handle;
  qwMsg.connInfo.ahandle = pMsg->info.ahandle;
  qwMsg.connInfo.refId = pMsg->info.refId;
D
dapan1121 已提交
372

L
Liu Jicong 已提交
373
  char *sql = strndup(msg->msg, msg->sqlLen);
S
Shengliang Guan 已提交
374
  QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, sql);
wafwerar's avatar
wafwerar 已提交
375
  taosMemoryFreeClear(sql);
376

D
dapan1121 已提交
377
  QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain));
378

D
dapan1121 已提交
379
  QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
380

L
Liu Jicong 已提交
381
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
382 383
}

384
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
L
Liu Jicong 已提交
385 386 387
  int32_t            code = 0;
  int8_t             status = 0;
  bool               queryDone = false;
388
  SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
L
Liu Jicong 已提交
389 390
  bool               needStop = false;
  SQWTaskCtx        *handles = NULL;
D
dapan1121 已提交
391
  SQWorker      *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
392

D
dapan1121 已提交
393
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
394
    QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
395
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
396 397
  }

398 399 400
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
L
Liu Jicong 已提交
401
  int64_t  rId = 0;
D
dapan1121 已提交
402

D
dapan1121 已提交
403
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
S
Shengliang Guan 已提交
404 405 406
  qwMsg.connInfo.handle = pMsg->info.handle;
  qwMsg.connInfo.ahandle = pMsg->info.ahandle;
  qwMsg.connInfo.refId = pMsg->info.refId;
D
dapan1121 已提交
407

S
Shengliang Guan 已提交
408
  QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->info.handle);
409 410 411

  QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
412
  QW_SCH_TASK_DLOG("processCQuery end, node:%p", node);
413

L
Liu Jicong 已提交
414
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
415 416
}

L
Liu Jicong 已提交
417
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
D
dapan1121 已提交
418 419 420 421
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

D
dapan1121 已提交
422
  SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
423 424
  SResReadyReq *msg = pMsg->pCont;
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
425
    QW_ELOG("invalid task ready msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
426
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
427
  }
D
dapan1121 已提交
428

D
dapan1121 已提交
429 430 431 432 433 434 435
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
L
Liu Jicong 已提交
436
  int64_t  rId = 0;
D
dapan1121 已提交
437

D
dapan1121 已提交
438
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
S
Shengliang Guan 已提交
439 440 441
  qwMsg.connInfo.handle = pMsg->info.handle;
  qwMsg.connInfo.ahandle = pMsg->info.ahandle;
  qwMsg.connInfo.refId = pMsg->info.refId;
D
dapan1121 已提交
442

S
Shengliang Guan 已提交
443
  QW_SCH_TASK_DLOG("processReady start, node:%p, handle:%p", node, pMsg->info.handle);
444

D
dapan1121 已提交
445
  QW_ERR_RET(qwProcessReady(QW_FPARAMS(), &qwMsg));
446

D
dapan1121 已提交
447
  QW_SCH_TASK_DLOG("processReady end, node:%p", node);
L
Liu Jicong 已提交
448

D
dapan1121 已提交
449 450 451 452 453 454 455 456
  return TSDB_CODE_SUCCESS;
}

int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

L
Liu Jicong 已提交
457
  int32_t             code = 0;
D
dapan1121 已提交
458 459 460 461
  SSchTasksStatusReq *msg = pMsg->pCont;
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
    qError("invalid task status msg");
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
462
  }
D
dapan1121 已提交
463

D
dapan1121 已提交
464
  SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
D
dapan1121 已提交
465
  msg->sId = htobe64(msg->sId);
D
dapan1121 已提交
466
  uint64_t sId = msg->sId;
D
dapan1121 已提交
467 468

  SSchedulerStatusRsp *sStatus = NULL;
L
Liu Jicong 已提交
469 470

  // QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
D
dapan1121 已提交
471 472 473

_return:

L
Liu Jicong 已提交
474
  // QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus));
D
dapan1121 已提交
475 476 477 478 479 480 481 482 483 484

  return TSDB_CODE_SUCCESS;
}

int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  SResFetchReq *msg = pMsg->pCont;
D
dapan1121 已提交
485
  SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
486

D
dapan1121 已提交
487
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
L
Liu Jicong 已提交
488
    QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
489
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
490
  }
D
dapan1121 已提交
491

D
dapan1121 已提交
492 493 494
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
495 496 497 498

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
L
Liu Jicong 已提交
499
  int64_t  rId = 0;
D
dapan1121 已提交
500

D
dapan1121 已提交
501
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
S
Shengliang Guan 已提交
502 503 504
  qwMsg.connInfo.handle = pMsg->info.handle;
  qwMsg.connInfo.ahandle = pMsg->info.ahandle;
  qwMsg.connInfo.refId = pMsg->info.refId;
D
dapan1121 已提交
505

S
Shengliang Guan 已提交
506
  QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
507 508 509

  QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
510
  QW_SCH_TASK_DLOG("processFetch end, node:%p", node);
511

L
Liu Jicong 已提交
512
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
513 514
}

S
Shengliang 已提交
515
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
516
  qProcessFetchRsp(NULL, pMsg, NULL);
S
Shengliang 已提交
517 518 519
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
520 521 522 523 524
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

D
dapan1121 已提交
525
  SQWorker   *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
526
  int32_t         code = 0;
D
dapan1121 已提交
527 528
  STaskCancelReq *msg = pMsg->pCont;
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
L
Liu Jicong 已提交
529
    qError("invalid task cancel msg");
D
dapan1121 已提交
530
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
531
  }
D
dapan1121 已提交
532

D
dapan1121 已提交
533 534 535 536
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
537

D
dapan1121 已提交
538 539 540 541 542
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;

D
dapan1121 已提交
543
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
S
Shengliang Guan 已提交
544 545 546
  qwMsg.connInfo.handle = pMsg->info.handle;
  qwMsg.connInfo.ahandle = pMsg->info.ahandle;
  qwMsg.connInfo.refId = pMsg->info.refId;
D
dapan1121 已提交
547

L
Liu Jicong 已提交
548
  // QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
D
dapan1121 已提交
549 550 551

_return:

S
shm  
Shengliang Guan 已提交
552
  QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code));
D
dapan1121 已提交
553
  QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
554 555 556 557 558 559 560 561 562

  return TSDB_CODE_SUCCESS;
}

int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

L
Liu Jicong 已提交
563
  int32_t       code = 0;
D
dapan1121 已提交
564
  STaskDropReq *msg = pMsg->pCont;
D
dapan1121 已提交
565
  SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
566

D
dapan1121 已提交
567
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
568
    QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
569
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
570
  }
D
dapan1121 已提交
571

D
dapan1121 已提交
572 573 574
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
575
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
576 577 578 579

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
580
  int64_t  rId = msg->refId;
D
dapan1121 已提交
581

D
dapan1121 已提交
582
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
S
Shengliang Guan 已提交
583 584 585
  qwMsg.connInfo.handle = pMsg->info.handle;
  qwMsg.connInfo.ahandle = pMsg->info.ahandle;
  qwMsg.connInfo.refId = pMsg->info.refId;
D
dapan1121 已提交
586

D
dapan1121 已提交
587
  if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
L
Liu Jicong 已提交
588
    QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));
D
dapan1121 已提交
589
  }
D
dapan1121 已提交
590

S
Shengliang Guan 已提交
591
  QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle);
592 593 594

  QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
595
  QW_SCH_TASK_DLOG("processDrop end, node:%p", node);
596 597

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
598 599
}

D
dapan1121 已提交
600 601 602 603 604
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

L
Liu Jicong 已提交
605
  int32_t         code = 0;
D
dapan1121 已提交
606
  SSchedulerHbReq req = {0};
D
dapan1121 已提交
607
  SQWorker   *mgmt = (SQWorker *)qWorkerMgmt;
L
Liu Jicong 已提交
608

D
dapan1121 已提交
609 610 611
  if (NULL == pMsg->pCont) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
L
Liu Jicong 已提交
612
  }
D
dapan1121 已提交
613 614 615 616 617 618 619 620

  if (tDeserializeSSchedulerHbReq(pMsg->pCont, pMsg->contLen, &req)) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    tFreeSSchedulerHbReq(&req);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  uint64_t sId = req.sId;
L
Liu Jicong 已提交
621
  SQWMsg   qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
S
Shengliang Guan 已提交
622 623 624
  qwMsg.connInfo.handle = pMsg->info.handle;
  qwMsg.connInfo.ahandle = pMsg->info.ahandle;
  qwMsg.connInfo.refId = pMsg->info.refId;
D
dapan1121 已提交
625

D
dapan1121 已提交
626
  if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
L
Liu Jicong 已提交
627
    QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));
D
dapan1121 已提交
628 629
  }

S
Shengliang Guan 已提交
630
  QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle);
D
dapan1121 已提交
631 632 633 634 635 636 637 638

  QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));

  QW_SCH_DLOG("processHb end, node:%p", node);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
639 640 641 642 643
int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

L
Liu Jicong 已提交
644
  int32_t          code = 0;
D
dapan1121 已提交
645
  SVShowTablesReq *pReq = pMsg->pCont;
S
shm  
Shengliang Guan 已提交
646
  QW_RET(qwBuildAndSendShowRsp(pMsg, code));
D
dapan1121 已提交
647 648 649 650 651 652 653 654
}

int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
S
shm  
Shengliang Guan 已提交
655
  QW_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq));
D
dapan1121 已提交
656
}