qworkerMsg.c 18.3 KB
Newer Older
D
dapan1121 已提交
1
#include "qworker.h"
S
common  
Shengliang Guan 已提交
2
#include "tcommon.h"
D
dapan1121 已提交
3 4 5 6
#include "executor.h"
#include "planner.h"
#include "query.h"
#include "qworkerInt.h"
D
dapan1121 已提交
7
#include "qworkerMsg.h"
D
dapan1121 已提交
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#include "tmsg.h"
#include "tname.h"
#include "dataSinkMgt.h"


int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
  int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
  
  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(msgSize);
  if (NULL == pRsp) {
    qError("rpcMallocCont %d failed", msgSize);
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  memset(pRsp, 0, sizeof(SRetrieveTableRsp));

  *rsp = pRsp;

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
29 30


D
dapan1121 已提交
31
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) {
D
dapan1121 已提交
32 33
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
  
D
dapan1121 已提交
34
  rsp->useconds = htobe64(input->useconds);
D
dapan1121 已提交
35
  rsp->completed = qComplete;
D
dapan1121 已提交
36 37 38 39 40 41 42 43
  rsp->precision = input->precision;
  rsp->compressed = input->compressed;
  rsp->compLen = htonl(len);
  rsp->numOfRows = htonl(input->numOfRows);
}


void qwFreeFetchRsp(void *msg) {
D
dapan 已提交
44 45 46
  if (msg) {
    rpcFreeCont(msg);
  }
D
dapan1121 已提交
47 48
}

S
shm  
Shengliang Guan 已提交
49
int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) {
D
dapan1121 已提交
50 51 52 53 54
  SQueryTableRsp rsp = {.code = code};
  
  int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp);
  void *msg = rpcMallocCont(contLen);
  tSerializeSQueryTableRsp(msg, contLen, &rsp);
D
dapan1121 已提交
55 56

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
57
    .msgType = TDMT_VND_QUERY_RSP,
D
dapan1121 已提交
58 59
    .handle  = pConn->handle,
    .ahandle = pConn->ahandle,
D
dapan1121 已提交
60 61
    .pCont   = msg,
    .contLen = contLen,
D
dapan1121 已提交
62 63 64
    .code    = code,
  };

S
shm  
Shengliang Guan 已提交
65
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
66 67 68 69

  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
70
int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) {
D
dapan1121 已提交
71 72 73 74
  SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
75
    .msgType = TDMT_VND_RES_READY_RSP,
D
dapan1121 已提交
76
    .handle  = pConn->handle,
D
dapan1121 已提交
77
    .ahandle = NULL,
D
dapan1121 已提交
78 79 80 81 82
    .pCont   = pRsp,
    .contLen = sizeof(*pRsp),
    .code    = code,
  };

S
shm  
Shengliang Guan 已提交
83
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
84 85 86 87

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, int32_t num) {  
  SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo};

  int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
  void *pRsp = rpcMallocCont(contLen);
  tSerializeSExplainRsp(pRsp, contLen, &rsp);

  SRpcMsg rpcRsp = {
    .msgType = TDMT_VND_EXPLAIN_RSP,
    .handle  = pConn->handle,
    .ahandle = pConn->ahandle,
    .pCont   = pRsp,
    .contLen = contLen,
    .code    = 0,
  };

  tmsgSendRsp(&rpcRsp);

  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
109
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
D
dapan1121 已提交
110 111 112
  int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
  void *pRsp = rpcMallocCont(contLen);
  tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
D
dapan1121 已提交
113 114

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
115
    .msgType = TDMT_VND_QUERY_HEARTBEAT_RSP,
D
dapan1121 已提交
116 117
    .handle  = pConn->handle,
    .ahandle = pConn->ahandle,
D
dapan1121 已提交
118
    .pCont   = pRsp,
D
dapan1121 已提交
119 120
    .contLen = contLen,
    .code    = code,
D
dapan1121 已提交
121 122
  };

S
shm  
Shengliang Guan 已提交
123
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
124 125 126 127

  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
128
int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
D
dapan1121 已提交
129 130 131 132 133 134 135
  if (NULL == pRsp) {
    pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRsp, 0, sizeof(SRetrieveTableRsp));
    dataLength = 0;
  }

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
136
    .msgType = TDMT_VND_FETCH_RSP,
D
dapan1121 已提交
137 138
    .handle  = pConn->handle,
    .ahandle = pConn->ahandle,
D
dapan1121 已提交
139 140 141 142 143
    .pCont   = pRsp,
    .contLen = sizeof(*pRsp) + dataLength,
    .code    = code,
  };

S
shm  
Shengliang Guan 已提交
144
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
145 146 147 148

  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
149
int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) {
D
dapan1121 已提交
150 151 152 153
  STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
154
    .msgType = TDMT_VND_CANCEL_TASK_RSP,
D
dapan1121 已提交
155 156
    .handle  = pConn->handle,
    .ahandle = pConn->ahandle,
D
dapan1121 已提交
157 158 159 160 161
    .pCont   = pRsp,
    .contLen = sizeof(*pRsp),
    .code    = code,
  };

S
shm  
Shengliang Guan 已提交
162
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
163 164 165
  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
166
int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) {
D
dapan1121 已提交
167 168 169 170
  STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
171
    .msgType = TDMT_VND_DROP_TASK_RSP,
D
dapan1121 已提交
172 173
    .handle  = pConn->handle,
    .ahandle = pConn->ahandle,
D
dapan1121 已提交
174 175 176 177 178
    .pCont   = pRsp,
    .contLen = sizeof(*pRsp),
    .code    = code,
  };

S
shm  
Shengliang Guan 已提交
179
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
180 181 182
  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
183
int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
S
Shengliang Guan 已提交
184 185 186 187
  int32_t         numOfCols = 6;
  SVShowTablesRsp showRsp = {0};

  // showRsp.showId = 1;
wafwerar's avatar
wafwerar 已提交
188
  showRsp.tableMeta.pSchemas = taosMemoryCalloc(numOfCols, sizeof(SSchema));
S
Shengliang Guan 已提交
189 190 191 192
  if (showRsp.tableMeta.pSchemas == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
D
dapan1121 已提交
193

194
  col_id_t  cols = 0;
S
Shengliang Guan 已提交
195
  SSchema *pSchema = showRsp.tableMeta.pSchemas;
D
dapan1121 已提交
196 197

  const SSchema *s = tGetTbnameColumnSchema();
S
Shengliang Guan 已提交
198
  *pSchema = createSchema(s->type, s->bytes, ++cols, "name");
D
dapan1121 已提交
199 200 201
  pSchema++;

  int32_t type = TSDB_DATA_TYPE_TIMESTAMP;
S
Shengliang Guan 已提交
202
  *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "created");
D
dapan1121 已提交
203 204 205
  pSchema++;

  type = TSDB_DATA_TYPE_SMALLINT;
S
Shengliang Guan 已提交
206
  *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "columns");
D
dapan1121 已提交
207 208
  pSchema++;

S
Shengliang Guan 已提交
209
  *pSchema = createSchema(s->type, s->bytes, ++cols, "stable");
D
dapan1121 已提交
210 211 212
  pSchema++;

  type = TSDB_DATA_TYPE_BIGINT;
S
Shengliang Guan 已提交
213
  *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "uid");
D
dapan1121 已提交
214 215 216
  pSchema++;

  type = TSDB_DATA_TYPE_INT;
S
Shengliang Guan 已提交
217
  *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "vgId");
D
dapan1121 已提交
218 219

  assert(cols == numOfCols);
S
Shengliang Guan 已提交
220 221 222 223 224
  showRsp.tableMeta.numOfColumns = cols;

  int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp);
  void   *pBuf = rpcMallocCont(bufLen);
  tSerializeSShowRsp(pBuf, bufLen, &showRsp);
D
dapan1121 已提交
225 226

  SRpcMsg rpcMsg = {
S
Shengliang Guan 已提交
227
      .handle = pMsg->handle,
D
dapan1121 已提交
228
      .ahandle = pMsg->ahandle,
S
Shengliang Guan 已提交
229 230 231
      .pCont = pBuf,
      .contLen = bufLen,
      .code = code,
D
dapan1121 已提交
232 233
  };

S
shm  
Shengliang Guan 已提交
234
  tmsgSendRsp(&rpcMsg);
D
dapan1121 已提交
235 236 237
  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
238
int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) {
D
dapan1121 已提交
239 240 241 242 243 244 245 246 247 248 249 250
  SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp));
  int32_t handle = htonl(pFetchReq->id);

  pRsp->numOfRows = 0;
  SRpcMsg rpcMsg = {
      .handle  = pMsg->handle,
      .ahandle = pMsg->ahandle,
      .pCont   = pRsp,
      .contLen = sizeof(*pRsp),
      .code    = 0,
  };

S
shm  
Shengliang Guan 已提交
251
  tmsgSendRsp(&rpcMsg);
D
dapan1121 已提交
252 253 254
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
255
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
D
dapan1121 已提交
256 257 258 259 260 261 262 263 264 265 266 267
  SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  req->header.vgId = mgmt->nodeId;
  req->sId = sId;
  req->queryId = qId;
  req->taskId = tId;

  SRpcMsg pNewMsg = {
D
dapan1121 已提交
268 269
    .handle  = pConn->handle,
    .ahandle = pConn->ahandle,
D
dapan1121 已提交
270 271 272 273 274 275
    .msgType = TDMT_VND_QUERY_CONTINUE,
    .pCont   = req,
    .contLen = sizeof(SQueryContinueReq),
    .code    = 0,
  };

S
Shengliang Guan 已提交
276
  int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg);
D
dapan1121 已提交
277
  if (TSDB_CODE_SUCCESS != code) {
H
Haojun Liao 已提交
278
    QW_SCH_TASK_ELOG("put query continue msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
D
dapan1121 已提交
279 280 281 282
    rpcFreeCont(req);
    QW_ERR_RET(code);
  }

D
dapan1121 已提交
283
  QW_SCH_TASK_DLOG("query continue msg put to queue, vgId:%d", mgmt->nodeId);
D
dapan1121 已提交
284 285 286 287

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
288

D
dapan1121 已提交
289
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
D
dapan1121 已提交
290 291 292 293 294 295
  STaskDropReq * req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }  

D
dapan1121 已提交
296 297 298 299 300
  req->header.vgId = htonl(mgmt->nodeId);
  req->sId = htobe64(sId);
  req->queryId = htobe64(qId);
  req->taskId = htobe64(tId);
  req->refId = htobe64(rId);
D
dapan1121 已提交
301 302 303 304 305 306 307 308 309 310
  
  SRpcMsg pMsg = {
    .handle  = pConn->handle,
    .ahandle = pConn->ahandle,
    .msgType = TDMT_VND_DROP_TASK,
    .pCont   = req,
    .contLen = sizeof(STaskDropReq),
    .code    = TSDB_CODE_RPC_NETWORK_UNAVAIL,
  };
  
S
Shengliang Guan 已提交
311
  tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg);
D
dapan1121 已提交
312 313 314 315

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo *pConn) {
  SSchedulerHbReq req = {0};
  req.header.vgId = mgmt->nodeId;
  req.sId = sId;

  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
  if (msgSize < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  void *msg = rpcMallocCont(msgSize);
  if (NULL == msg) {
    QW_SCH_ELOG("calloc %d failed", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    taosMemoryFree(msg);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  
  SRpcMsg pMsg = {
    .handle  = pConn->handle,
    .ahandle = pConn->ahandle,
    .msgType = TDMT_VND_QUERY_HEARTBEAT,
    .pCont   = msg,
    .contLen = sizeof(SSchedulerHbReq),
    .code    = TSDB_CODE_RPC_NETWORK_UNAVAIL,
  };
  
  tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg);

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
352

D
dapan1121 已提交
353 354 355 356 357 358 359 360 361 362
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
  SSubQueryMsg *msg = pMsg->pCont;
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
  
  if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
D
dapan1121 已提交
363
    QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
364
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
365 366
  }

367
  msg->sId     = be64toh(msg->sId);
D
dapan1121 已提交
368
  msg->queryId = be64toh(msg->queryId);
369
  msg->taskId  = be64toh(msg->taskId);
D
dapan1121 已提交
370
  msg->refId   = be64toh(msg->refId);
371 372 373
  msg->phyLen  = ntohl(msg->phyLen);
  msg->sqlLen  = ntohl(msg->sqlLen);

D
dapan1121 已提交
374 375 376
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
377
  int64_t  rId = msg->refId;
D
dapan1121 已提交
378

D
dapan1121 已提交
379 380 381
  SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen};
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;
D
dapan1121 已提交
382

383
  char* sql = strndup(msg->msg, msg->sqlLen);
D
dapan1121 已提交
384
  QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->handle, sql);
wafwerar's avatar
wafwerar 已提交
385
  taosMemoryFreeClear(sql);
386

D
dapan1121 已提交
387
  QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain));
388

D
dapan1121 已提交
389
  QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
390 391

  return TSDB_CODE_SUCCESS;  
D
dapan1121 已提交
392 393
}

394
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
D
dapan1121 已提交
395 396 397
  int32_t code = 0;
  int8_t status = 0;
  bool queryDone = false;
398
  SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
D
dapan1121 已提交
399 400
  bool needStop = false;
  SQWTaskCtx *handles = NULL;
D
dapan1121 已提交
401
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
D
dapan1121 已提交
402

D
dapan1121 已提交
403
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
404
    QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
405
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
406 407
  }

408 409 410
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
411
  int64_t rId = 0;
D
dapan1121 已提交
412

D
dapan1121 已提交
413 414 415
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;
D
dapan1121 已提交
416

D
dapan1121 已提交
417
  QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->handle);
418 419 420

  QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
421
  QW_SCH_TASK_DLOG("processCQuery end, node:%p", node);
422 423

  return TSDB_CODE_SUCCESS;    
D
dapan1121 已提交
424 425 426 427 428 429 430
}

int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

D
dapan1121 已提交
431
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
D
dapan1121 已提交
432 433
  SResReadyReq *msg = pMsg->pCont;
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
434
    QW_ELOG("invalid task ready msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
435 436 437
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }  

D
dapan1121 已提交
438 439 440 441 442 443 444
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
445
  int64_t rId = 0;
D
dapan1121 已提交
446

D
dapan1121 已提交
447 448 449
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;
D
dapan1121 已提交
450

D
dapan1121 已提交
451
  QW_SCH_TASK_DLOG("processReady start, node:%p, handle:%p", node, pMsg->handle);
452

D
dapan1121 已提交
453
  QW_ERR_RET(qwProcessReady(QW_FPARAMS(), &qwMsg));
454

D
dapan1121 已提交
455
  QW_SCH_TASK_DLOG("processReady end, node:%p", node);
D
dapan1121 已提交
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
  
  return TSDB_CODE_SUCCESS;
}

int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  int32_t code = 0;
  SSchTasksStatusReq *msg = pMsg->pCont;
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
    qError("invalid task status msg");
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }  

D
dapan1121 已提交
472
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
D
dapan1121 已提交
473
  msg->sId = htobe64(msg->sId);
D
dapan1121 已提交
474
  uint64_t sId = msg->sId;
D
dapan1121 已提交
475 476 477

  SSchedulerStatusRsp *sStatus = NULL;
  
D
dapan1121 已提交
478
  //QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
D
dapan1121 已提交
479 480 481

_return:

D
dapan1121 已提交
482
  //QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus));
D
dapan1121 已提交
483 484 485 486 487 488 489 490 491 492 493 494 495

  return TSDB_CODE_SUCCESS;
}

int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  SResFetchReq *msg = pMsg->pCont;
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
  
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
496
    QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);  
D
dapan1121 已提交
497 498 499
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }  

D
dapan1121 已提交
500 501 502
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
503 504 505 506

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
507
  int64_t rId = 0;
D
dapan1121 已提交
508

D
dapan1121 已提交
509 510 511
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;
D
dapan1121 已提交
512

D
dapan1121 已提交
513
  QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->handle);
514 515 516

  QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
517
  QW_SCH_TASK_DLOG("processFetch end, node:%p", node);
518 519

  return TSDB_CODE_SUCCESS;  
D
dapan1121 已提交
520 521
}

S
Shengliang 已提交
522
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
523
  qProcessFetchRsp(NULL, pMsg, NULL);
S
Shengliang 已提交
524 525 526
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
527 528 529 530 531
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

D
dapan1121 已提交
532
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
D
dapan1121 已提交
533 534 535 536 537 538 539
  int32_t code = 0;
  STaskCancelReq *msg = pMsg->pCont;
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
    qError("invalid task cancel msg");  
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }  

D
dapan1121 已提交
540 541 542 543
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
544

D
dapan1121 已提交
545 546 547 548 549
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;

D
dapan1121 已提交
550 551 552 553
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;

D
dapan1121 已提交
554
  //QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
D
dapan1121 已提交
555 556 557

_return:

S
shm  
Shengliang Guan 已提交
558
  QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code));
D
dapan1121 已提交
559
  QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
560 561 562 563 564 565 566 567 568 569 570 571 572 573

  return TSDB_CODE_SUCCESS;
}

int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  int32_t code = 0;
  STaskDropReq *msg = pMsg->pCont;
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
  
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
574
    QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
575 576 577
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }  

D
dapan1121 已提交
578 579 580
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
581
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
582 583 584 585

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
586
  int64_t  rId = msg->refId;
D
dapan1121 已提交
587

D
dapan1121 已提交
588
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
D
dapan1121 已提交
589 590
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;
D
dapan1121 已提交
591

D
dapan1121 已提交
592 593 594
  if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
    QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));    
  }
D
dapan1121 已提交
595

D
dapan1121 已提交
596
  QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->handle);
597 598 599

  QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
600
  QW_SCH_TASK_DLOG("processDrop end, node:%p", node);
601 602

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
603 604
}

D
dapan1121 已提交
605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  int32_t code = 0;
  SSchedulerHbReq req = {0};
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
  
  if (NULL == pMsg->pCont) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }  

  if (tDeserializeSSchedulerHbReq(pMsg->pCont, pMsg->contLen, &req)) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    tFreeSSchedulerHbReq(&req);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  uint64_t sId = req.sId;
D
dapan1121 已提交
626
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
D
dapan1121 已提交
627 628
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;
D
dapan1121 已提交
629

D
dapan1121 已提交
630 631 632 633
  if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
    QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));    
  }

D
dapan1121 已提交
634
  QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->handle);
D
dapan1121 已提交
635 636 637 638 639 640 641 642

  QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));

  QW_SCH_DLOG("processHb end, node:%p", node);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
643 644 645 646 647 648 649
int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  int32_t code = 0;
  SVShowTablesReq *pReq = pMsg->pCont;
S
shm  
Shengliang Guan 已提交
650
  QW_RET(qwBuildAndSendShowRsp(pMsg, code));
D
dapan1121 已提交
651 652 653 654 655 656 657 658
}

int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
S
shm  
Shengliang Guan 已提交
659
  QW_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq));
D
dapan1121 已提交
660
}