From df1ebbc25ba497fe2500b81d9c58c9ef71850668 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sun, 24 Apr 2022 21:51:27 +0800 Subject: [PATCH] refactor --- source/libs/qworker/src/qworkerMsg.c | 288 ++++++++++++++------------- 1 file changed, 150 insertions(+), 138 deletions(-) diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index e0bcfabdd3..64df4f02bf 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -1,18 +1,17 @@ -#include "qworker.h" -#include "tcommon.h" +#include "qworkerMsg.h" +#include "dataSinkMgt.h" #include "executor.h" #include "planner.h" #include "query.h" +#include "qworker.h" #include "qworkerInt.h" -#include "qworkerMsg.h" +#include "tcommon.h" #include "tmsg.h" #include "tname.h" -#include "dataSinkMgt.h" - int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) { int32_t msgSize = sizeof(SRetrieveTableRsp) + length; - + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(msgSize); if (NULL == pRsp) { qError("rpcMallocCont %d failed", msgSize); @@ -26,11 +25,9 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) { return TSDB_CODE_SUCCESS; } - - void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; - + rsp->useconds = htobe64(input->useconds); rsp->completed = qComplete; rsp->precision = input->precision; @@ -39,7 +36,6 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) rsp->numOfRows = htonl(input->numOfRows); } - void qwFreeFetchRsp(void *msg) { if (msg) { rpcFreeCont(msg); @@ -48,18 +44,19 @@ void qwFreeFetchRsp(void *msg) { int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) { SQueryTableRsp rsp = {.code = code}; - + int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp); - void *msg = rpcMallocCont(contLen); + void *msg = rpcMallocCont(contLen); tSerializeSQueryTableRsp(msg, contLen, &rsp); SRpcMsg rpcRsp = { - .msgType = TDMT_VND_QUERY_RSP, - .handle = pConn->handle, - .ahandle = pConn->ahandle, - .pCont = msg, - .contLen = contLen, - .code = code, + .msgType = TDMT_VND_QUERY_RSP, + .handle = pConn->handle, + .ahandle = pConn->ahandle, + .refId = pConn->refId, + .pCont = msg, + .contLen = contLen, + .code = code, }; tmsgSendRsp(&rpcRsp); @@ -72,12 +69,13 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) { pRsp->code = code; SRpcMsg rpcRsp = { - .msgType = TDMT_VND_RES_READY_RSP, - .handle = pConn->handle, - .ahandle = NULL, - .pCont = pRsp, - .contLen = sizeof(*pRsp), - .code = code, + .msgType = TDMT_VND_RES_READY_RSP, + .handle = pConn->handle, + .refId = pConn->refId, + .ahandle = NULL, + .pCont = pRsp, + .contLen = sizeof(*pRsp), + .code = code, }; tmsgSendRsp(&rpcRsp); @@ -85,20 +83,21 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) { return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, int32_t num) { +int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, int32_t num) { SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo}; int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp); - void *pRsp = rpcMallocCont(contLen); + void *pRsp = rpcMallocCont(contLen); tSerializeSExplainRsp(pRsp, contLen, &rsp); SRpcMsg rpcRsp = { - .msgType = TDMT_VND_EXPLAIN_RSP, - .handle = pConn->handle, - .ahandle = pConn->ahandle, - .pCont = pRsp, - .contLen = contLen, - .code = 0, + .msgType = TDMT_VND_EXPLAIN_RSP, + .handle = pConn->handle, + .ahandle = pConn->ahandle, + .refId = pConn->refId, + .pCont = pRsp, + .contLen = contLen, + .code = 0, }; tmsgSendRsp(&rpcRsp); @@ -108,16 +107,17 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus); - void *pRsp = rpcMallocCont(contLen); + void *pRsp = rpcMallocCont(contLen); tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); SRpcMsg rpcRsp = { - .msgType = TDMT_VND_QUERY_HEARTBEAT_RSP, - .handle = pConn->handle, - .ahandle = pConn->ahandle, - .pCont = pRsp, - .contLen = contLen, - .code = code, + .msgType = TDMT_VND_QUERY_HEARTBEAT_RSP, + .handle = pConn->handle, + .ahandle = pConn->ahandle, + .refId = pConn->refId, + .pCont = pRsp, + .contLen = contLen, + .code = code, }; tmsgSendRsp(&rpcRsp); @@ -133,12 +133,13 @@ int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int3 } SRpcMsg rpcRsp = { - .msgType = TDMT_VND_FETCH_RSP, - .handle = pConn->handle, - .ahandle = pConn->ahandle, - .pCont = pRsp, - .contLen = sizeof(*pRsp) + dataLength, - .code = code, + .msgType = TDMT_VND_FETCH_RSP, + .handle = pConn->handle, + .ahandle = pConn->ahandle, + .refId = pConn->refId, + .pCont = pRsp, + .contLen = sizeof(*pRsp) + dataLength, + .code = code, }; tmsgSendRsp(&rpcRsp); @@ -151,12 +152,13 @@ int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) { pRsp->code = code; SRpcMsg rpcRsp = { - .msgType = TDMT_VND_CANCEL_TASK_RSP, - .handle = pConn->handle, - .ahandle = pConn->ahandle, - .pCont = pRsp, - .contLen = sizeof(*pRsp), - .code = code, + .msgType = TDMT_VND_CANCEL_TASK_RSP, + .handle = pConn->handle, + .ahandle = pConn->ahandle, + .refId = pConn->refId, + .pCont = pRsp, + .contLen = sizeof(*pRsp), + .code = code, }; tmsgSendRsp(&rpcRsp); @@ -168,12 +170,13 @@ int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) { pRsp->code = code; SRpcMsg rpcRsp = { - .msgType = TDMT_VND_DROP_TASK_RSP, - .handle = pConn->handle, - .ahandle = pConn->ahandle, - .pCont = pRsp, - .contLen = sizeof(*pRsp), - .code = code, + .msgType = TDMT_VND_DROP_TASK_RSP, + .handle = pConn->handle, + .ahandle = pConn->ahandle, + .refId = pConn->refId, + .pCont = pRsp, + .contLen = sizeof(*pRsp), + .code = code, }; tmsgSendRsp(&rpcRsp); @@ -191,7 +194,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { return -1; } - col_id_t cols = 0; + col_id_t cols = 0; SSchema *pSchema = showRsp.tableMeta.pSchemas; const SSchema *s = tGetTbnameColumnSchema(); @@ -226,6 +229,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { SRpcMsg rpcMsg = { .handle = pMsg->handle, .ahandle = pMsg->ahandle, + .refId = pMsg->refId, .pCont = pBuf, .contLen = bufLen, .code = code, @@ -235,17 +239,18 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) { +int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq *pFetchReq) { SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp)); - int32_t handle = htonl(pFetchReq->id); + int32_t handle = htonl(pFetchReq->id); pRsp->numOfRows = 0; SRpcMsg rpcMsg = { - .handle = pMsg->handle, + .handle = pMsg->handle, .ahandle = pMsg->ahandle, - .pCont = pRsp, + .refId = pMsg->refId, + .pCont = pRsp, .contLen = sizeof(*pRsp), - .code = 0, + .code = 0, }; tmsgSendRsp(&rpcMsg); @@ -253,7 +258,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe } int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { - SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq)); + SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq)); if (NULL == req) { QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq)); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -265,12 +270,13 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { req->taskId = tId; SRpcMsg pNewMsg = { - .handle = pConn->handle, - .ahandle = pConn->ahandle, - .msgType = TDMT_VND_QUERY_CONTINUE, - .pCont = req, - .contLen = sizeof(SQueryContinueReq), - .code = 0, + .handle = pConn->handle, + .ahandle = pConn->ahandle, + .msgType = TDMT_VND_QUERY_CONTINUE, + .refId = pConn->refId, + .pCont = req, + .contLen = sizeof(SQueryContinueReq), + .code = 0, }; int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg); @@ -285,29 +291,29 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { return TSDB_CODE_SUCCESS; } - int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { - STaskDropReq * req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq)); + STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq)); if (NULL == req) { QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq)); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } + } req->header.vgId = htonl(mgmt->nodeId); req->sId = htobe64(sId); req->queryId = htobe64(qId); req->taskId = htobe64(tId); req->refId = htobe64(rId); - + SRpcMsg pMsg = { - .handle = pConn->handle, - .ahandle = pConn->ahandle, - .msgType = TDMT_VND_DROP_TASK, - .pCont = req, - .contLen = sizeof(STaskDropReq), - .code = TSDB_CODE_RPC_NETWORK_UNAVAIL, + .handle = pConn->handle, + .ahandle = pConn->ahandle, + .refId = pConn->refId, + .msgType = TDMT_VND_DROP_TASK, + .pCont = req, + .contLen = sizeof(STaskDropReq), + .code = TSDB_CODE_RPC_NETWORK_UNAVAIL, }; - + tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg); return TSDB_CODE_SUCCESS; @@ -333,43 +339,42 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo taosMemoryFree(msg); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - + SRpcMsg pMsg = { - .handle = pConn->handle, - .ahandle = pConn->ahandle, - .msgType = TDMT_VND_QUERY_HEARTBEAT, - .pCont = msg, - .contLen = msgSize, - .code = TSDB_CODE_RPC_NETWORK_UNAVAIL, + .handle = pConn->handle, + .ahandle = pConn->ahandle, + .refId = pConn->refId, + .msgType = TDMT_VND_QUERY_HEARTBEAT, + .pCont = msg, + .contLen = msgSize, + .code = TSDB_CODE_RPC_NETWORK_UNAVAIL, }; - + tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg); return TSDB_CODE_SUCCESS; } - - int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - int32_t code = 0; + int32_t code = 0; SSubQueryMsg *msg = pMsg->pCont; SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; - + if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - msg->sId = be64toh(msg->sId); + msg->sId = be64toh(msg->sId); msg->queryId = be64toh(msg->queryId); - msg->taskId = be64toh(msg->taskId); - msg->refId = be64toh(msg->refId); - msg->phyLen = ntohl(msg->phyLen); - msg->sqlLen = ntohl(msg->sqlLen); + msg->taskId = be64toh(msg->taskId); + msg->refId = be64toh(msg->refId); + msg->phyLen = ntohl(msg->phyLen); + msg->sqlLen = ntohl(msg->sqlLen); uint64_t sId = msg->sId; uint64_t qId = msg->queryId; @@ -379,8 +384,9 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen}; qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.ahandle = pMsg->ahandle; + qwMsg.connInfo.refId = pMsg->refId; - char* sql = strndup(msg->msg, msg->sqlLen); + char *sql = strndup(msg->msg, msg->sqlLen); QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->handle, sql); taosMemoryFreeClear(sql); @@ -388,17 +394,17 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_SCH_TASK_DLOG("processQuery end, node:%p", node); - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; } int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { - int32_t code = 0; - int8_t status = 0; - bool queryDone = false; + int32_t code = 0; + int8_t status = 0; + bool queryDone = false; SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont; - bool needStop = false; - SQWTaskCtx *handles = NULL; - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + bool needStop = false; + SQWTaskCtx *handles = NULL; + SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -408,11 +414,12 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { uint64_t sId = msg->sId; uint64_t qId = msg->queryId; uint64_t tId = msg->taskId; - int64_t rId = 0; + int64_t rId = 0; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.ahandle = pMsg->ahandle; + qwMsg.connInfo.refId = pMsg->refId; QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->handle); @@ -420,10 +427,10 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_SCH_TASK_DLOG("processCQuery end, node:%p", node); - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ +int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } @@ -433,7 +440,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid task ready msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } + } msg->sId = be64toh(msg->sId); msg->queryId = be64toh(msg->queryId); @@ -442,18 +449,19 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ uint64_t sId = msg->sId; uint64_t qId = msg->queryId; uint64_t tId = msg->taskId; - int64_t rId = 0; + int64_t rId = 0; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.ahandle = pMsg->ahandle; + qwMsg.connInfo.refId = pMsg->refId; QW_SCH_TASK_DLOG("processReady start, node:%p, handle:%p", node, pMsg->handle); QW_ERR_RET(qwProcessReady(QW_FPARAMS(), &qwMsg)); QW_SCH_TASK_DLOG("processReady end, node:%p", node); - + return TSDB_CODE_SUCCESS; } @@ -462,24 +470,24 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } - int32_t code = 0; + int32_t code = 0; SSchTasksStatusReq *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { qError("invalid task status msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } + } SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; msg->sId = htobe64(msg->sId); uint64_t sId = msg->sId; SSchedulerStatusRsp *sStatus = NULL; - - //QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus)); + + // QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus)); _return: - //QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus)); + // QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus)); return TSDB_CODE_SUCCESS; } @@ -491,11 +499,11 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SResFetchReq *msg = pMsg->pCont; SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; - + if (NULL == msg || pMsg->contLen < sizeof(*msg)) { - QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen); + QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } + } msg->sId = be64toh(msg->sId); msg->queryId = be64toh(msg->queryId); @@ -504,11 +512,12 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { uint64_t sId = msg->sId; uint64_t qId = msg->queryId; uint64_t tId = msg->taskId; - int64_t rId = 0; + int64_t rId = 0; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.ahandle = pMsg->ahandle; + qwMsg.connInfo.refId = pMsg->refId; QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->handle); @@ -516,7 +525,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_SCH_TASK_DLOG("processFetch end, node:%p", node); - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; } int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { @@ -529,13 +538,13 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; - int32_t code = 0; + SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + int32_t code = 0; STaskCancelReq *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { - qError("invalid task cancel msg"); + qError("invalid task cancel msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } + } msg->sId = be64toh(msg->sId); msg->queryId = be64toh(msg->queryId); @@ -550,8 +559,9 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.ahandle = pMsg->ahandle; + qwMsg.connInfo.refId = pMsg->refId; - //QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId)); + // QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId)); _return: @@ -566,14 +576,14 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } - int32_t code = 0; + int32_t code = 0; STaskDropReq *msg = pMsg->pCont; SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; - + if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } + } msg->sId = be64toh(msg->sId); msg->queryId = be64toh(msg->queryId); @@ -588,9 +598,10 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code}; qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.ahandle = pMsg->ahandle; + qwMsg.connInfo.refId = pMsg->refId; if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) { - QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code)); + QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code)); } QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->handle); @@ -607,14 +618,14 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } - int32_t code = 0; + int32_t code = 0; SSchedulerHbReq req = {0}; - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; - + SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + if (NULL == pMsg->pCont) { QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } + } if (tDeserializeSSchedulerHbReq(pMsg->pCont, pMsg->contLen, &req)) { QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen); @@ -623,12 +634,13 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { } uint64_t sId = req.sId; - SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code}; + SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code}; qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.ahandle = pMsg->ahandle; + qwMsg.connInfo.refId = pMsg->refId; if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) { - QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code)); + QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code)); } QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->handle); @@ -645,7 +657,7 @@ int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } - int32_t code = 0; + int32_t code = 0; SVShowTablesReq *pReq = pMsg->pCont; QW_RET(qwBuildAndSendShowRsp(pMsg, code)); } -- GitLab