qworkerMsg.c 18.3 KB
Newer Older
D
dapan1121 已提交
1
#include "qworker.h"
S
common  
Shengliang Guan 已提交
2
#include "tcommon.h"
D
dapan1121 已提交
3 4 5 6
#include "executor.h"
#include "planner.h"
#include "query.h"
#include "qworkerInt.h"
D
dapan1121 已提交
7
#include "qworkerMsg.h"
D
dapan1121 已提交
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#include "tmsg.h"
#include "tname.h"
#include "dataSinkMgt.h"


int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
  int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
  
  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(msgSize);
  if (NULL == pRsp) {
    qError("rpcMallocCont %d failed", msgSize);
    QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  memset(pRsp, 0, sizeof(SRetrieveTableRsp));

  *rsp = pRsp;

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
29 30


D
dapan1121 已提交
31
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) {
D
dapan1121 已提交
32 33
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
  
D
dapan1121 已提交
34
  rsp->useconds = htobe64(input->useconds);
D
dapan1121 已提交
35
  rsp->completed = qComplete;
D
dapan1121 已提交
36 37 38 39 40 41 42 43
  rsp->precision = input->precision;
  rsp->compressed = input->compressed;
  rsp->compLen = htonl(len);
  rsp->numOfRows = htonl(input->numOfRows);
}


void qwFreeFetchRsp(void *msg) {
D
dapan 已提交
44 45 46
  if (msg) {
    rpcFreeCont(msg);
  }
D
dapan1121 已提交
47 48
}

S
shm  
Shengliang Guan 已提交
49
int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) {
D
dapan1121 已提交
50 51 52 53 54
  SQueryTableRsp rsp = {.code = code};
  
  int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp);
  void *msg = rpcMallocCont(contLen);
  tSerializeSQueryTableRsp(msg, contLen, &rsp);
D
dapan1121 已提交
55 56

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
57
    .msgType = TDMT_VND_QUERY_RSP,
D
dapan1121 已提交
58 59
    .handle  = pConn->handle,
    .ahandle = pConn->ahandle,
D
dapan1121 已提交
60 61
    .pCont   = msg,
    .contLen = contLen,
D
dapan1121 已提交
62 63 64
    .code    = code,
  };

S
shm  
Shengliang Guan 已提交
65
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
66 67 68 69

  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
70
int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) {
D
dapan1121 已提交
71 72 73 74
  SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
75
    .msgType = TDMT_VND_RES_READY_RSP,
D
dapan1121 已提交
76
    .handle  = pConn->handle,
D
dapan1121 已提交
77
    .ahandle = NULL,
D
dapan1121 已提交
78 79 80 81 82
    .pCont   = pRsp,
    .contLen = sizeof(*pRsp),
    .code    = code,
  };

S
shm  
Shengliang Guan 已提交
83
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
84 85 86 87

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, int32_t num) {  
  SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo};

  int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
  void *pRsp = rpcMallocCont(contLen);
  tSerializeSExplainRsp(pRsp, contLen, &rsp);

  SRpcMsg rpcRsp = {
    .msgType = TDMT_VND_EXPLAIN_RSP,
    .handle  = pConn->handle,
    .ahandle = pConn->ahandle,
    .pCont   = pRsp,
    .contLen = contLen,
    .code    = 0,
  };

  tmsgSendRsp(&rpcRsp);

  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
109
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
D
dapan1121 已提交
110 111 112
  int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
  void *pRsp = rpcMallocCont(contLen);
  tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
D
dapan1121 已提交
113 114

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
115
    .msgType = TDMT_VND_QUERY_HEARTBEAT_RSP,
D
dapan1121 已提交
116 117
    .handle  = pConn->handle,
    .ahandle = pConn->ahandle,
D
dapan1121 已提交
118
    .pCont   = pRsp,
D
dapan1121 已提交
119 120
    .contLen = contLen,
    .code    = code,
D
dapan1121 已提交
121 122
  };

S
shm  
Shengliang Guan 已提交
123
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
124 125 126 127

  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
128
int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
D
dapan1121 已提交
129 130 131 132 133 134 135
  if (NULL == pRsp) {
    pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRsp, 0, sizeof(SRetrieveTableRsp));
    dataLength = 0;
  }

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
136
    .msgType = TDMT_VND_FETCH_RSP,
D
dapan1121 已提交
137 138
    .handle  = pConn->handle,
    .ahandle = pConn->ahandle,
D
dapan1121 已提交
139 140 141 142 143
    .pCont   = pRsp,
    .contLen = sizeof(*pRsp) + dataLength,
    .code    = code,
  };

S
shm  
Shengliang Guan 已提交
144
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
145 146 147 148

  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
149
int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) {
D
dapan1121 已提交
150 151 152 153
  STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
154
    .msgType = TDMT_VND_CANCEL_TASK_RSP,
D
dapan1121 已提交
155 156
    .handle  = pConn->handle,
    .ahandle = pConn->ahandle,
D
dapan1121 已提交
157 158 159 160 161
    .pCont   = pRsp,
    .contLen = sizeof(*pRsp),
    .code    = code,
  };

S
shm  
Shengliang Guan 已提交
162
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
163 164 165
  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
166
int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) {
D
dapan1121 已提交
167 168 169 170
  STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
  pRsp->code = code;

  SRpcMsg rpcRsp = {
D
dapan1121 已提交
171
    .msgType = TDMT_VND_DROP_TASK_RSP,
D
dapan1121 已提交
172 173
    .handle  = pConn->handle,
    .ahandle = pConn->ahandle,
D
dapan1121 已提交
174 175 176 177 178
    .pCont   = pRsp,
    .contLen = sizeof(*pRsp),
    .code    = code,
  };

S
shm  
Shengliang Guan 已提交
179
  tmsgSendRsp(&rpcRsp);
D
dapan1121 已提交
180 181 182
  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
183
int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
S
Shengliang Guan 已提交
184 185 186 187
  int32_t         numOfCols = 6;
  SVShowTablesRsp showRsp = {0};

  // showRsp.showId = 1;
wafwerar's avatar
wafwerar 已提交
188
  showRsp.tableMeta.pSchemas = taosMemoryCalloc(numOfCols, sizeof(SSchema));
S
Shengliang Guan 已提交
189 190 191 192
  if (showRsp.tableMeta.pSchemas == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
D
dapan1121 已提交
193

194
  col_id_t  cols = 0;
S
Shengliang Guan 已提交
195
  SSchema *pSchema = showRsp.tableMeta.pSchemas;
D
dapan1121 已提交
196 197

  const SSchema *s = tGetTbnameColumnSchema();
S
Shengliang Guan 已提交
198
  *pSchema = createSchema(s->type, s->bytes, ++cols, "name");
D
dapan1121 已提交
199 200 201
  pSchema++;

  int32_t type = TSDB_DATA_TYPE_TIMESTAMP;
S
Shengliang Guan 已提交
202
  *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "created");
D
dapan1121 已提交
203 204 205
  pSchema++;

  type = TSDB_DATA_TYPE_SMALLINT;
S
Shengliang Guan 已提交
206
  *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "columns");
D
dapan1121 已提交
207 208
  pSchema++;

S
Shengliang Guan 已提交
209
  *pSchema = createSchema(s->type, s->bytes, ++cols, "stable");
D
dapan1121 已提交
210 211 212
  pSchema++;

  type = TSDB_DATA_TYPE_BIGINT;
S
Shengliang Guan 已提交
213
  *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "uid");
D
dapan1121 已提交
214 215 216
  pSchema++;

  type = TSDB_DATA_TYPE_INT;
S
Shengliang Guan 已提交
217
  *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "vgId");
D
dapan1121 已提交
218 219

  assert(cols == numOfCols);
S
Shengliang Guan 已提交
220 221 222 223 224
  showRsp.tableMeta.numOfColumns = cols;

  int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp);
  void   *pBuf = rpcMallocCont(bufLen);
  tSerializeSShowRsp(pBuf, bufLen, &showRsp);
D
dapan1121 已提交
225 226

  SRpcMsg rpcMsg = {
S
Shengliang Guan 已提交
227
      .handle = pMsg->handle,
D
dapan1121 已提交
228
      .ahandle = pMsg->ahandle,
S
Shengliang Guan 已提交
229 230 231
      .pCont = pBuf,
      .contLen = bufLen,
      .code = code,
D
dapan1121 已提交
232 233
  };

S
shm  
Shengliang Guan 已提交
234
  tmsgSendRsp(&rpcMsg);
D
dapan1121 已提交
235 236 237
  return TSDB_CODE_SUCCESS;
}

S
shm  
Shengliang Guan 已提交
238
int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) {
D
dapan1121 已提交
239 240 241 242 243 244 245 246 247 248 249 250
  SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp));
  int32_t handle = htonl(pFetchReq->id);

  pRsp->numOfRows = 0;
  SRpcMsg rpcMsg = {
      .handle  = pMsg->handle,
      .ahandle = pMsg->ahandle,
      .pCont   = pRsp,
      .contLen = sizeof(*pRsp),
      .code    = 0,
  };

S
shm  
Shengliang Guan 已提交
251
  tmsgSendRsp(&rpcMsg);
D
dapan1121 已提交
252 253 254
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
255
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
D
dapan1121 已提交
256 257 258 259 260 261 262 263 264 265 266 267
  SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  req->header.vgId = mgmt->nodeId;
  req->sId = sId;
  req->queryId = qId;
  req->taskId = tId;

  SRpcMsg pNewMsg = {
D
dapan1121 已提交
268 269
    .handle  = pConn->handle,
    .ahandle = pConn->ahandle,
D
dapan1121 已提交
270 271 272 273 274 275
    .msgType = TDMT_VND_QUERY_CONTINUE,
    .pCont   = req,
    .contLen = sizeof(SQueryContinueReq),
    .code    = 0,
  };

S
Shengliang Guan 已提交
276
  int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg);
D
dapan1121 已提交
277
  if (TSDB_CODE_SUCCESS != code) {
H
Haojun Liao 已提交
278
    QW_SCH_TASK_ELOG("put query continue msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
D
dapan1121 已提交
279 280 281 282
    rpcFreeCont(req);
    QW_ERR_RET(code);
  }

D
dapan1121 已提交
283
  QW_SCH_TASK_DLOG("query continue msg put to queue, vgId:%d", mgmt->nodeId);
D
dapan1121 已提交
284 285 286 287

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
288

D
dapan1121 已提交
289
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
D
dapan1121 已提交
290 291 292 293 294 295
  STaskDropReq * req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
  if (NULL == req) {
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }  

D
dapan1121 已提交
296 297 298 299 300
  req->header.vgId = htonl(mgmt->nodeId);
  req->sId = htobe64(sId);
  req->queryId = htobe64(qId);
  req->taskId = htobe64(tId);
  req->refId = htobe64(rId);
D
dapan1121 已提交
301 302 303 304 305 306 307 308 309 310
  
  SRpcMsg pMsg = {
    .handle  = pConn->handle,
    .ahandle = pConn->ahandle,
    .msgType = TDMT_VND_DROP_TASK,
    .pCont   = req,
    .contLen = sizeof(STaskDropReq),
    .code    = TSDB_CODE_RPC_NETWORK_UNAVAIL,
  };
  
S
Shengliang Guan 已提交
311
  tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg);
D
dapan1121 已提交
312 313 314 315

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo *pConn) {
  SSchedulerHbReq req = {0};
  req.header.vgId = mgmt->nodeId;
  req.sId = sId;

  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
  if (msgSize < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  void *msg = rpcMallocCont(msgSize);
  if (NULL == msg) {
    QW_SCH_ELOG("calloc %d failed", msgSize);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
    taosMemoryFree(msg);
    QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  
  SRpcMsg pMsg = {
    .handle  = pConn->handle,
    .ahandle = pConn->ahandle,
    .msgType = TDMT_VND_QUERY_HEARTBEAT,
    .pCont   = msg,
D
dapan1121 已提交
342
    .contLen = msgSize,
D
dapan1121 已提交
343 344 345 346 347 348 349 350 351
    .code    = TSDB_CODE_RPC_NETWORK_UNAVAIL,
  };
  
  tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg);

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
352

D
dapan1121 已提交
353 354 355 356 357 358 359 360 361 362
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
  SSubQueryMsg *msg = pMsg->pCont;
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
  
  if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
D
dapan1121 已提交
363
    QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
364
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
365 366
  }

367
  msg->sId     = be64toh(msg->sId);
D
dapan1121 已提交
368
  msg->queryId = be64toh(msg->queryId);
369
  msg->taskId  = be64toh(msg->taskId);
D
dapan1121 已提交
370
  msg->refId   = be64toh(msg->refId);
371 372 373
  msg->phyLen  = ntohl(msg->phyLen);
  msg->sqlLen  = ntohl(msg->sqlLen);

D
dapan1121 已提交
374 375 376
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
377
  int64_t  rId = msg->refId;
D
dapan1121 已提交
378

D
dapan1121 已提交
379 380 381
  SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen};
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;
D
dapan1121 已提交
382

383
  char* sql = strndup(msg->msg, msg->sqlLen);
D
dapan1121 已提交
384
  QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->handle, sql);
wafwerar's avatar
wafwerar 已提交
385
  taosMemoryFreeClear(sql);
386

D
dapan1121 已提交
387
  QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain));
388

D
dapan1121 已提交
389
  QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
390 391

  return TSDB_CODE_SUCCESS;  
D
dapan1121 已提交
392 393
}

394
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
D
dapan1121 已提交
395 396 397
  int32_t code = 0;
  int8_t status = 0;
  bool queryDone = false;
398
  SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
D
dapan1121 已提交
399 400
  bool needStop = false;
  SQWTaskCtx *handles = NULL;
D
dapan1121 已提交
401
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
D
dapan1121 已提交
402

D
dapan1121 已提交
403
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
404
    QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
405
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
406 407
  }

408 409 410
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
411
  int64_t rId = 0;
D
dapan1121 已提交
412

D
dapan1121 已提交
413 414 415
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;
D
dapan1121 已提交
416

D
dapan1121 已提交
417
  QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->handle);
418 419 420

  QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
421
  QW_SCH_TASK_DLOG("processCQuery end, node:%p", node);
422 423

  return TSDB_CODE_SUCCESS;    
D
dapan1121 已提交
424 425 426 427 428 429 430
}

int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

D
dapan1121 已提交
431
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
D
dapan1121 已提交
432 433
  SResReadyReq *msg = pMsg->pCont;
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
434
    QW_ELOG("invalid task ready msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
435 436 437
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }  

D
dapan1121 已提交
438 439 440 441 442 443 444
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
445
  int64_t rId = 0;
D
dapan1121 已提交
446

D
dapan1121 已提交
447 448 449
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;
D
dapan1121 已提交
450

D
dapan1121 已提交
451
  QW_SCH_TASK_DLOG("processReady start, node:%p, handle:%p", node, pMsg->handle);
452

D
dapan1121 已提交
453
  QW_ERR_RET(qwProcessReady(QW_FPARAMS(), &qwMsg));
454

D
dapan1121 已提交
455
  QW_SCH_TASK_DLOG("processReady end, node:%p", node);
D
dapan1121 已提交
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
  
  return TSDB_CODE_SUCCESS;
}

int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  int32_t code = 0;
  SSchTasksStatusReq *msg = pMsg->pCont;
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
    qError("invalid task status msg");
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }  

D
dapan1121 已提交
472
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
D
dapan1121 已提交
473
  msg->sId = htobe64(msg->sId);
D
dapan1121 已提交
474
  uint64_t sId = msg->sId;
D
dapan1121 已提交
475 476 477

  SSchedulerStatusRsp *sStatus = NULL;
  
D
dapan1121 已提交
478
  //QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
D
dapan1121 已提交
479 480 481

_return:

D
dapan1121 已提交
482
  //QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus));
D
dapan1121 已提交
483 484 485 486 487 488 489 490 491 492 493 494 495

  return TSDB_CODE_SUCCESS;
}

int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  SResFetchReq *msg = pMsg->pCont;
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
  
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
496
    QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);  
D
dapan1121 已提交
497 498 499
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }  

D
dapan1121 已提交
500 501 502
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
503 504 505 506

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
507
  int64_t rId = 0;
D
dapan1121 已提交
508

D
dapan1121 已提交
509 510 511
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;
D
dapan1121 已提交
512

D
dapan1121 已提交
513
  QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->handle);
514 515 516

  QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
517
  QW_SCH_TASK_DLOG("processFetch end, node:%p", node);
518 519

  return TSDB_CODE_SUCCESS;  
D
dapan1121 已提交
520 521
}

S
Shengliang 已提交
522
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
523
  qProcessFetchRsp(NULL, pMsg, NULL);
S
Shengliang 已提交
524 525 526
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
527 528 529 530 531
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

D
dapan1121 已提交
532
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
D
dapan1121 已提交
533 534 535 536 537 538 539
  int32_t code = 0;
  STaskCancelReq *msg = pMsg->pCont;
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
    qError("invalid task cancel msg");  
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }  

D
dapan1121 已提交
540 541 542 543
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
544

D
dapan1121 已提交
545 546 547 548 549
  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
  int64_t  rId = msg->refId;

D
dapan1121 已提交
550 551 552 553
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;

D
dapan1121 已提交
554
  //QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
D
dapan1121 已提交
555 556 557

_return:

S
shm  
Shengliang Guan 已提交
558
  QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code));
D
dapan1121 已提交
559
  QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code));
D
dapan1121 已提交
560 561 562 563 564 565 566 567 568 569 570 571 572 573

  return TSDB_CODE_SUCCESS;
}

int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  int32_t code = 0;
  STaskDropReq *msg = pMsg->pCont;
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
  
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
D
dapan1121 已提交
574
    QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
D
dapan1121 已提交
575 576 577
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }  

D
dapan1121 已提交
578 579 580
  msg->sId = be64toh(msg->sId);
  msg->queryId = be64toh(msg->queryId);
  msg->taskId = be64toh(msg->taskId);
D
dapan1121 已提交
581
  msg->refId = be64toh(msg->refId);
D
dapan1121 已提交
582 583 584 585

  uint64_t sId = msg->sId;
  uint64_t qId = msg->queryId;
  uint64_t tId = msg->taskId;
D
dapan1121 已提交
586
  int64_t  rId = msg->refId;
D
dapan1121 已提交
587

D
dapan1121 已提交
588
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
D
dapan1121 已提交
589 590
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;
D
dapan1121 已提交
591

D
dapan1121 已提交
592 593 594
  if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
    QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));    
  }
D
dapan1121 已提交
595

D
dapan1121 已提交
596
  QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->handle);
597 598 599

  QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));

D
dapan1121 已提交
600
  QW_SCH_TASK_DLOG("processDrop end, node:%p", node);
601 602

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
603 604
}

D
dapan1121 已提交
605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  int32_t code = 0;
  SSchedulerHbReq req = {0};
  SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
  
  if (NULL == pMsg->pCont) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }  

  if (tDeserializeSSchedulerHbReq(pMsg->pCont, pMsg->contLen, &req)) {
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
    tFreeSSchedulerHbReq(&req);
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  uint64_t sId = req.sId;
D
dapan1121 已提交
626
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
D
dapan1121 已提交
627 628
  qwMsg.connInfo.handle = pMsg->handle;
  qwMsg.connInfo.ahandle = pMsg->ahandle;
D
dapan1121 已提交
629

D
dapan1121 已提交
630 631 632 633
  if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
    QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));    
  }

D
dapan1121 已提交
634
  QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->handle);
D
dapan1121 已提交
635 636 637 638 639 640 641 642

  QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));

  QW_SCH_DLOG("processHb end, node:%p", node);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
643 644 645 646 647 648 649
int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  int32_t code = 0;
  SVShowTablesReq *pReq = pMsg->pCont;
S
shm  
Shengliang Guan 已提交
650
  QW_RET(qwBuildAndSendShowRsp(pMsg, code));
D
dapan1121 已提交
651 652 653 654 655 656 657 658
}

int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
    return TSDB_CODE_QRY_INVALID_INPUT;
  }

  SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
S
shm  
Shengliang Guan 已提交
659
  QW_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq));
D
dapan1121 已提交
660
}