qworkerMsg.c 19.0 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2
#include "qworkerMsg.h"
#include "dataSinkMgt.h"
D
dapan1121 已提交
3 4 5
#include "executor.h"
#include "planner.h"
#include "query.h"
dengyihao's avatar
dengyihao 已提交
6
#include "qworker.h"
D
dapan1121 已提交
7
#include "qworkerInt.h"
dengyihao's avatar
dengyihao 已提交
8
#include "tcommon.h"
D
dapan1121 已提交
9 10 11 12 13
#include "tmsg.h"
#include "tname.h"

int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
  int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
dengyihao's avatar
dengyihao 已提交
14

D
dapan1121 已提交
15 16 17 18 19 20 21 22 23 24 25 26 27
  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(msgSize);
  if (NULL == pRsp) {
    qError("rpcMallocCont %d failed", msgSize);
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  memset(pRsp, 0, sizeof(SRetrieveTableRsp));

  *rsp = pRsp;

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
28
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) {
D
dapan1121 已提交
29
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
dengyihao's avatar
dengyihao 已提交
30

D
dapan1121 已提交
31
  rsp->useconds = htobe64(input->useconds);
D
dapan1121 已提交
32
  rsp->completed = qComplete;
D
dapan1121 已提交
33 34 35 36 37 38 39
  rsp->precision = input->precision;
  rsp->compressed = input->compressed;
  rsp->compLen = htonl(len);
  rsp->numOfRows = htonl(input->numOfRows);
}

void qwFreeFetchRsp(void *msg) {
D
dapan 已提交
40 41 42
  if (msg) {
    rpcFreeCont(msg);
  }
D
dapan1121 已提交
43 44
}

S
shm  
Shengliang Guan 已提交
45
int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) {
D
dapan1121 已提交
46
  SQueryTableRsp rsp = {.code = code};
dengyihao's avatar
dengyihao 已提交
47

D
dapan1121 已提交
48
  int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp);
dengyihao's avatar
dengyihao 已提交
49
  void *  msg = rpcMallocCont(contLen);
D
dapan1121 已提交
50
  tSerializeSQueryTableRsp(msg, contLen, &rsp);
D
dapan1121 已提交
51 52

  SRpcMsg rpcRsp = {
dengyihao's avatar
dengyihao 已提交
53 54 55 56 57 58 59
      .msgType = TDMT_VND_QUERY_RSP,
      .handle = pConn->handle,
      .ahandle = pConn->ahandle,
      .refId = pConn->refId,
      .pCont = msg,
      .contLen = contLen,
      .code = code,
D
dapan1121 已提交
60 61
  };

S
shm  
Shengliang Guan 已提交
62
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
63 64 65 66

  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
67
int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) {
D
dapan1121 已提交
68 69 70 71
  SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
dengyihao's avatar
dengyihao 已提交
72 73 74 75 76 77 78
      .msgType = TDMT_VND_RES_READY_RSP,
      .handle = pConn->handle,
      .refId = pConn->refId,
      .ahandle = NULL,
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
D
dapan1121 已提交
79 80
  };

S
shm  
Shengliang Guan 已提交
81
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
82 83 84 85

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
86
int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, int32_t num) {
D
dapan1121 已提交
87 88 89
  SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo};

  int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
dengyihao's avatar
dengyihao 已提交
90
  void *  pRsp = rpcMallocCont(contLen);
D
dapan1121 已提交
91 92 93
  tSerializeSExplainRsp(pRsp, contLen, &rsp);

  SRpcMsg rpcRsp = {
dengyihao's avatar
dengyihao 已提交
94 95 96 97 98 99 100
      .msgType = TDMT_VND_EXPLAIN_RSP,
      .handle = pConn->handle,
      .ahandle = pConn->ahandle,
      .refId = pConn->refId,
      .pCont = pRsp,
      .contLen = contLen,
      .code = 0,
D
dapan1121 已提交
101 102 103 104 105 106 107
  };

  tmsgSendRsp(&rpcRsp);

  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
108
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
D
dapan1121 已提交
109
  int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
dengyihao's avatar
dengyihao 已提交
110
  void *  pRsp = rpcMallocCont(contLen);
D
dapan1121 已提交
111
  tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
D
dapan1121 已提交
112 113

  SRpcMsg rpcRsp = {
dengyihao's avatar
dengyihao 已提交
114 115 116 117 118 119 120
      .msgType = TDMT_VND_QUERY_HEARTBEAT_RSP,
      .handle = pConn->handle,
      .ahandle = pConn->ahandle,
      .refId = pConn->refId,
      .pCont = pRsp,
      .contLen = contLen,
      .code = code,
D
dapan1121 已提交
121 122
  };

S
shm  
Shengliang Guan 已提交
123
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
124 125 126 127

  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
128
int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
D
dapan1121 已提交
129 130 131 132 133 134 135
  if (NULL == pRsp) {
    pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRsp, 0, sizeof(SRetrieveTableRsp));
    dataLength = 0;
  }

  SRpcMsg rpcRsp = {
dengyihao's avatar
dengyihao 已提交
136 137 138 139 140 141 142
      .msgType = TDMT_VND_FETCH_RSP,
      .handle = pConn->handle,
      .ahandle = pConn->ahandle,
      .refId = pConn->refId,
      .pCont = pRsp,
      .contLen = sizeof(*pRsp) + dataLength,
      .code = code,
D
dapan1121 已提交
143 144
  };

S
shm  
Shengliang Guan 已提交
145
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
146 147 148 149

  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
150
int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) {
D
dapan1121 已提交
151 152 153 154
  STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
dengyihao's avatar
dengyihao 已提交
155 156 157 158 159 160 161
      .msgType = TDMT_VND_CANCEL_TASK_RSP,
      .handle = pConn->handle,
      .ahandle = pConn->ahandle,
      .refId = pConn->refId,
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
D
dapan1121 已提交
162 163
  };

S
shm  
Shengliang Guan 已提交
164
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
165 166 167
  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
168
int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) {
D
dapan1121 已提交
169 170 171 172
  STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
dengyihao's avatar
dengyihao 已提交
173 174 175 176 177 178 179
      .msgType = TDMT_VND_DROP_TASK_RSP,
      .handle = pConn->handle,
      .ahandle = pConn->ahandle,
      .refId = pConn->refId,
      .pCont = pRsp,
      .contLen = sizeof(*pRsp),
      .code = code,
D
dapan1121 已提交
180 181
  };

S
shm  
Shengliang Guan 已提交
182
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
183 184 185
  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
186
int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
S
Shengliang Guan 已提交
187 188 189 190
  int32_t         numOfCols = 6;
  SVShowTablesRsp showRsp = {0};

  // showRsp.showId = 1;
wafwerar's avatar
wafwerar 已提交
191
  showRsp.tableMeta.pSchemas = taosMemoryCalloc(numOfCols, sizeof(SSchema));
S
Shengliang Guan 已提交
192 193 194 195
  if (showRsp.tableMeta.pSchemas == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
D
dapan1121 已提交
196

dengyihao's avatar
dengyihao 已提交
197
  col_id_t cols = 0;
S
Shengliang Guan 已提交
198
  SSchema *pSchema = showRsp.tableMeta.pSchemas;
D
dapan1121 已提交
199 200

  const SSchema *s = tGetTbnameColumnSchema();
S
Shengliang Guan 已提交
201
  *pSchema = createSchema(s->type, s->bytes, ++cols, "name");
D
dapan1121 已提交
202 203 204
  pSchema++;

  int32_t type = TSDB_DATA_TYPE_TIMESTAMP;
S
Shengliang Guan 已提交
205
  *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "created");
D
dapan1121 已提交
206 207 208
  pSchema++;

  type = TSDB_DATA_TYPE_SMALLINT;
S
Shengliang Guan 已提交
209
  *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "columns");
D
dapan1121 已提交
210 211
  pSchema++;

S
Shengliang Guan 已提交
212
  *pSchema = createSchema(s->type, s->bytes, ++cols, "stable");
D
dapan1121 已提交
213 214 215
  pSchema++;

  type = TSDB_DATA_TYPE_BIGINT;
S
Shengliang Guan 已提交
216
  *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "uid");
D
dapan1121 已提交
217 218 219
  pSchema++;

  type = TSDB_DATA_TYPE_INT;
S
Shengliang Guan 已提交
220
  *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "vgId");
D
dapan1121 已提交
221 222

  assert(cols == numOfCols);
S
Shengliang Guan 已提交
223 224 225
  showRsp.tableMeta.numOfColumns = cols;

  int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp);
dengyihao's avatar
dengyihao 已提交
226
  void *  pBuf = rpcMallocCont(bufLen);
S
Shengliang Guan 已提交
227
  tSerializeSShowRsp(pBuf, bufLen, &showRsp);
D
dapan1121 已提交
228 229

  SRpcMsg rpcMsg = {
S
Shengliang Guan 已提交
230
      .handle = pMsg->handle,
D
dapan1121 已提交
231
      .ahandle = pMsg->ahandle,
dengyihao's avatar
dengyihao 已提交
232
      .refId = pMsg->refId,
S
Shengliang Guan 已提交
233 234 235
      .pCont = pBuf,
      .contLen = bufLen,
      .code = code,
D
dapan1121 已提交
236 237
  };

S
shm  
Shengliang Guan 已提交
238
  tmsgSendRsp(&rpcMsg);
D
dapan1121 已提交
239 240 241
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
242
int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq *pFetchReq) {
D
dapan1121 已提交
243
  SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp));
dengyihao's avatar
dengyihao 已提交
244
  int32_t               handle = htonl(pFetchReq->id);
D
dapan1121 已提交
245 246 247

  pRsp->numOfRows = 0;
  SRpcMsg rpcMsg = {
dengyihao's avatar
dengyihao 已提交
248
      .handle = pMsg->handle,
D
dapan1121 已提交
249
      .ahandle = pMsg->ahandle,
dengyihao's avatar
dengyihao 已提交
250 251
      .refId = pMsg->refId,
      .pCont = pRsp,
D
dapan1121 已提交
252
      .contLen = sizeof(*pRsp),
dengyihao's avatar
dengyihao 已提交
253
      .code = 0,
D
dapan1121 已提交
254 255
  };

S
shm  
Shengliang Guan 已提交
256
  tmsgSendRsp(&rpcMsg);
D
dapan1121 已提交
257 258 259
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
260
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
dengyihao's avatar
dengyihao 已提交
261
  SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
D
dapan1121 已提交
262 263 264 265 266 267 268 269 270 271 272
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  req->header.vgId = mgmt->nodeId;
  req->sId = sId;
  req->queryId = qId;
  req->taskId = tId;

  SRpcMsg pNewMsg = {
dengyihao's avatar
dengyihao 已提交
273 274 275 276 277 278 279
      .handle = pConn->handle,
      .ahandle = pConn->ahandle,
      .msgType = TDMT_VND_QUERY_CONTINUE,
      .refId = pConn->refId,
      .pCont = req,
      .contLen = sizeof(SQueryContinueReq),
      .code = 0,
D
dapan1121 已提交
280 281
  };

S
Shengliang Guan 已提交
282
  int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg);
D
dapan1121 已提交
283
  if (TSDB_CODE_SUCCESS != code) {
H
Haojun Liao 已提交
284
    QW_SCH_TASK_ELOG("put query continue msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
D
dapan1121 已提交
285 286 287 288
    rpcFreeCont(req);
    QW_ERR_RET(code);
  }

D
dapan1121 已提交
289
  QW_SCH_TASK_DLOG("query continue msg put to queue, vgId:%d", mgmt->nodeId);
D
dapan1121 已提交
290 291 292 293

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
294
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
dengyihao's avatar
dengyihao 已提交
295
  STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
D
dapan1121 已提交
296 297 298
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
dengyihao's avatar
dengyihao 已提交
299
  }
D
dapan1121 已提交
300

D
dapan1121 已提交
301 302 303 304 305
  req->header.vgId = htonl(mgmt->nodeId);
  req->sId = htobe64(sId);
  req->queryId = htobe64(qId);
  req->taskId = htobe64(tId);
  req->refId = htobe64(rId);
dengyihao's avatar
dengyihao 已提交
306

D
dapan1121 已提交
307
  SRpcMsg pMsg = {
dengyihao's avatar
dengyihao 已提交
308 309 310 311 312 313 314
      .handle = pConn->handle,
      .ahandle = pConn->ahandle,
      .refId = pConn->refId,
      .msgType = TDMT_VND_DROP_TASK,
      .pCont = req,
      .contLen = sizeof(STaskDropReq),
      .code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
D
dapan1121 已提交
315
  };
dengyihao's avatar
dengyihao 已提交
316

S
Shengliang Guan 已提交
317
  tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg);
D
dapan1121 已提交
318 319 320 321

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo *pConn) {
  SSchedulerHbReq req = {0};
  req.header.vgId = mgmt->nodeId;
  req.sId = sId;

  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
  if (msgSize < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  void *msg = rpcMallocCont(msgSize);
  if (NULL == msg) {
    QW_SCH_ELOG("calloc %d failed", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    taosMemoryFree(msg);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
dengyihao's avatar
dengyihao 已提交
342

D
dapan1121 已提交
343
  SRpcMsg pMsg = {
dengyihao's avatar
dengyihao 已提交
344 345 346 347 348
      .handle = pConn->handle,
      .ahandle = pConn->ahandle,
      .refId = pConn->refId,
      .msgType = TDMT_VND_QUERY_HEARTBEAT,
      .pCont = msg,
349
      .contLen = msgSize,
dengyihao's avatar
dengyihao 已提交
350
      .code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
D
dapan1121 已提交
351
  };
dengyihao's avatar
dengyihao 已提交
352

D
dapan1121 已提交
353 354 355 356 357
  tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
358 359 360 361 362
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

dengyihao's avatar
dengyihao 已提交
363
  int32_t       code = 0;
D
dapan1121 已提交
364 365
  SSubQueryMsg *msg = pMsg->pCont;
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
dengyihao's avatar
dengyihao 已提交
366

D
dapan1121 已提交
367
  if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
D
dapan1121 已提交
368
    QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
369
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
370 371
  }

dengyihao's avatar
dengyihao 已提交
372
  msg->sId = be64toh(msg->sId);
D
dapan1121 已提交
373
  msg->queryId = be64toh(msg->queryId);
dengyihao's avatar
dengyihao 已提交
374 375 376 377
  msg->taskId = be64toh(msg->taskId);
  msg->refId = be64toh(msg->refId);
  msg->phyLen = ntohl(msg->phyLen);
  msg->sqlLen = ntohl(msg->sqlLen);
378

D
dapan1121 已提交
379 380 381
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
382
  int64_t  rId = msg->refId;
D
dapan1121 已提交
383

D
dapan1121 已提交
384 385 386
  SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen};
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;
dengyihao's avatar
dengyihao 已提交
387
  qwMsg.connInfo.refId = pMsg->refId;
D
dapan1121 已提交
388

dengyihao's avatar
dengyihao 已提交
389
  char *sql = strndup(msg->msg, msg->sqlLen);
D
dapan1121 已提交
390
  QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->handle, sql);
wafwerar's avatar
wafwerar 已提交
391
  taosMemoryFreeClear(sql);
392

D
dapan1121 已提交
393
  QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain));
394

D
dapan1121 已提交
395
  QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
396

dengyihao's avatar
dengyihao 已提交
397
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
398 399
}

400
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
dengyihao's avatar
dengyihao 已提交
401 402 403
  int32_t            code = 0;
  int8_t             status = 0;
  bool               queryDone = false;
404
  SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
dengyihao's avatar
dengyihao 已提交
405 406 407
  bool               needStop = false;
  SQWTaskCtx *       handles = NULL;
  SQWorkerMgmt *     mgmt = (SQWorkerMgmt *)qWorkerMgmt;
D
dapan1121 已提交
408

D
dapan1121 已提交
409
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
410
    QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
411
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
412 413
  }

414 415 416
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
dengyihao's avatar
dengyihao 已提交
417
  int64_t  rId = 0;
D
dapan1121 已提交
418

D
dapan1121 已提交
419 420 421
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;
dengyihao's avatar
dengyihao 已提交
422
  qwMsg.connInfo.refId = pMsg->refId;
D
dapan1121 已提交
423

D
dapan1121 已提交
424
  QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->handle);
425 426 427

  QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
428
  QW_SCH_TASK_DLOG("processCQuery end, node:%p", node);
429

dengyihao's avatar
dengyihao 已提交
430
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
431 432
}

dengyihao's avatar
dengyihao 已提交
433
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
D
dapan1121 已提交
434 435 436 437
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

D
dapan1121 已提交
438
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
D
dapan1121 已提交
439 440
  SResReadyReq *msg = pMsg->pCont;
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
441
    QW_ELOG("invalid task ready msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
442
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
dengyihao's avatar
dengyihao 已提交
443
  }
D
dapan1121 已提交
444

D
dapan1121 已提交
445 446 447 448 449 450 451
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
dengyihao's avatar
dengyihao 已提交
452
  int64_t  rId = 0;
D
dapan1121 已提交
453

D
dapan1121 已提交
454 455 456
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;
dengyihao's avatar
dengyihao 已提交
457
  qwMsg.connInfo.refId = pMsg->refId;
D
dapan1121 已提交
458

D
dapan1121 已提交
459
  QW_SCH_TASK_DLOG("processReady start, node:%p, handle:%p", node, pMsg->handle);
460

D
dapan1121 已提交
461
  QW_ERR_RET(qwProcessReady(QW_FPARAMS(), &qwMsg));
462

D
dapan1121 已提交
463
  QW_SCH_TASK_DLOG("processReady end, node:%p", node);
dengyihao's avatar
dengyihao 已提交
464

D
dapan1121 已提交
465 466 467 468 469 470 471 472
  return TSDB_CODE_SUCCESS;
}

int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

dengyihao's avatar
dengyihao 已提交
473
  int32_t             code = 0;
D
dapan1121 已提交
474 475 476 477
  SSchTasksStatusReq *msg = pMsg->pCont;
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
    qError("invalid task status msg");
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
dengyihao's avatar
dengyihao 已提交
478
  }
D
dapan1121 已提交
479

D
dapan1121 已提交
480
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
D
dapan1121 已提交
481
  msg->sId = htobe64(msg->sId);
D
dapan1121 已提交
482
  uint64_t sId = msg->sId;
D
dapan1121 已提交
483 484

  SSchedulerStatusRsp *sStatus = NULL;
dengyihao's avatar
dengyihao 已提交
485 486

  // QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
D
dapan1121 已提交
487 488 489

_return:

dengyihao's avatar
dengyihao 已提交
490
  // QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus));
D
dapan1121 已提交
491 492 493 494 495 496 497 498 499 500 501

  return TSDB_CODE_SUCCESS;
}

int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  SResFetchReq *msg = pMsg->pCont;
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
dengyihao's avatar
dengyihao 已提交
502

D
dapan1121 已提交
503
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
dengyihao's avatar
dengyihao 已提交
504
    QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
505
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
dengyihao's avatar
dengyihao 已提交
506
  }
D
dapan1121 已提交
507

D
dapan1121 已提交
508 509 510
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
511 512 513 514

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
dengyihao's avatar
dengyihao 已提交
515
  int64_t  rId = 0;
D
dapan1121 已提交
516

D
dapan1121 已提交
517 518 519
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;
dengyihao's avatar
dengyihao 已提交
520
  qwMsg.connInfo.refId = pMsg->refId;
D
dapan1121 已提交
521

D
dapan1121 已提交
522
  QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->handle);
523 524 525

  QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
526
  QW_SCH_TASK_DLOG("processFetch end, node:%p", node);
527

dengyihao's avatar
dengyihao 已提交
528
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
529 530
}

S
Shengliang 已提交
531
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
532
  qProcessFetchRsp(NULL, pMsg, NULL);
S
Shengliang 已提交
533 534 535
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
536 537 538 539 540
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

dengyihao's avatar
dengyihao 已提交
541 542
  SQWorkerMgmt *  mgmt = (SQWorkerMgmt *)qWorkerMgmt;
  int32_t         code = 0;
D
dapan1121 已提交
543 544
  STaskCancelReq *msg = pMsg->pCont;
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
dengyihao's avatar
dengyihao 已提交
545
    qError("invalid task cancel msg");
D
dapan1121 已提交
546
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
dengyihao's avatar
dengyihao 已提交
547
  }
D
dapan1121 已提交
548

D
dapan1121 已提交
549 550 551 552
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
553

D
dapan1121 已提交
554 555 556 557 558
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;

D
dapan1121 已提交
559 560 561
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;
dengyihao's avatar
dengyihao 已提交
562
  qwMsg.connInfo.refId = pMsg->refId;
D
dapan1121 已提交
563

dengyihao's avatar
dengyihao 已提交
564
  // QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
D
dapan1121 已提交
565 566 567

_return:

S
shm  
Shengliang Guan 已提交
568
  QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code));
D
dapan1121 已提交
569
  QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
570 571 572 573 574 575 576 577 578

  return TSDB_CODE_SUCCESS;
}

int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

dengyihao's avatar
dengyihao 已提交
579
  int32_t       code = 0;
D
dapan1121 已提交
580 581
  STaskDropReq *msg = pMsg->pCont;
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
dengyihao's avatar
dengyihao 已提交
582

D
dapan1121 已提交
583
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
584
    QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
585
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
dengyihao's avatar
dengyihao 已提交
586
  }
D
dapan1121 已提交
587

D
dapan1121 已提交
588 589 590
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
591
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
592 593 594 595

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
596
  int64_t  rId = msg->refId;
D
dapan1121 已提交
597

D
dapan1121 已提交
598
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
D
dapan1121 已提交
599 600
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;
dengyihao's avatar
dengyihao 已提交
601
  qwMsg.connInfo.refId = pMsg->refId;
D
dapan1121 已提交
602

D
dapan1121 已提交
603
  if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
dengyihao's avatar
dengyihao 已提交
604
    QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));
D
dapan1121 已提交
605
  }
D
dapan1121 已提交
606

D
dapan1121 已提交
607
  QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->handle);
608 609 610

  QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
611
  QW_SCH_TASK_DLOG("processDrop end, node:%p", node);
612 613

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
614 615
}

D
dapan1121 已提交
616 617 618 619 620
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

dengyihao's avatar
dengyihao 已提交
621
  int32_t         code = 0;
D
dapan1121 已提交
622
  SSchedulerHbReq req = {0};
dengyihao's avatar
dengyihao 已提交
623 624
  SQWorkerMgmt *  mgmt = (SQWorkerMgmt *)qWorkerMgmt;

D
dapan1121 已提交
625 626 627
  if (NULL == pMsg->pCont) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
dengyihao's avatar
dengyihao 已提交
628
  }
D
dapan1121 已提交
629 630 631 632 633 634 635 636

  if (tDeserializeSSchedulerHbReq(pMsg->pCont, pMsg->contLen, &req)) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    tFreeSSchedulerHbReq(&req);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  uint64_t sId = req.sId;
dengyihao's avatar
dengyihao 已提交
637
  SQWMsg   qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
D
dapan1121 已提交
638 639
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;
dengyihao's avatar
dengyihao 已提交
640
  qwMsg.connInfo.refId = pMsg->refId;
D
dapan1121 已提交
641

D
dapan1121 已提交
642
  if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
dengyihao's avatar
dengyihao 已提交
643
    QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));
D
dapan1121 已提交
644 645
  }

D
dapan1121 已提交
646
  QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->handle);
D
dapan1121 已提交
647 648 649 650 651 652 653 654

  QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));

  QW_SCH_DLOG("processHb end, node:%p", node);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
655 656 657 658 659
int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

dengyihao's avatar
dengyihao 已提交
660
  int32_t          code = 0;
D
dapan1121 已提交
661
  SVShowTablesReq *pReq = pMsg->pCont;
S
shm  
Shengliang Guan 已提交
662
  QW_RET(qwBuildAndSendShowRsp(pMsg, code));
D
dapan1121 已提交
663 664 665 666 667 668 669 670
}

int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
S
shm  
Shengliang Guan 已提交
671
  QW_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq));
D
dapan1121 已提交
672
}