diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 556c4a9a1ff6e82491c17dedbb8938814b5ae103..02d4c2279c543aab42f1d0dd933a282d99ef7386 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1615,15 +1615,21 @@ typedef struct SSubQueryMsg { uint64_t taskId; int64_t refId; int32_t execId; + int32_t msgMask; int8_t taskType; int8_t explain; int8_t needFetch; - uint32_t sqlLen; // the query sql, - uint32_t phyLen; - int32_t msgMask; - char msg[]; + uint32_t sqlLen; + char *sql; + uint32_t msgLen; + char *msg; } SSubQueryMsg; +int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq); +int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq); +void tFreeSSubQueryMsg(SSubQueryMsg *pReq); + + typedef struct { SMsgHead header; uint64_t sId; @@ -1732,6 +1738,13 @@ typedef struct { int32_t execId; } STaskDropReq; +int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq); +int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq); + +int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp); +int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp); + + typedef struct { int32_t code; } STaskDropRsp; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 52d9c5b1991001ecf25dc53465d64bdb14166a49..35900638dd9f3b1c03f13744ce1ceb0f48a11129 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4643,6 +4643,178 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { return 0; } +int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) { + int32_t headLen = sizeof(SMsgHead); + if (buf != NULL) { + buf = (char *)buf + headLen; + bufLen -= headLen; + } + + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeU64(&encoder, pReq->sId) < 0) return -1; + if (tEncodeU64(&encoder, pReq->queryId) < 0) return -1; + if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1; + if (tEncodeI64(&encoder, pReq->refId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->execId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->msgMask) < 0) return -1; + if (tEncodeI8(&encoder, pReq->taskType) < 0) return -1; + if (tEncodeI8(&encoder, pReq->explain) < 0) return -1; + if (tEncodeI8(&encoder, pReq->needFetch) < 0) return -1; + if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1; + if (tEncodeCStrWithLen(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1; + if (tEncodeU32(&encoder, pReq->msgLen) < 0) return -1; + if (tEncodeBinary(&encoder, (uint8_t*)pReq->msg, pReq->msgLen) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + if (buf != NULL) { + SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen); + pHead->vgId = htonl(pReq->header.vgId); + pHead->contLen = htonl(tlen + headLen); + } + + return tlen + headLen; +} + +int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) { + int32_t headLen = sizeof(SMsgHead); + + SMsgHead *pHead = buf; + pHead->vgId = pReq->header.vgId; + pHead->contLen = pReq->header.contLen; + + SDecoder decoder = {0}; + tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeU64(&decoder, &pReq->sId) < 0) return -1; + if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1; + if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->refId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->msgMask) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->taskType) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->explain) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->needFetch) < 0) return -1; + if (tDecodeU32(&decoder, &pReq->sqlLen) < 0) return -1; + if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1; + if (tDecodeU32(&decoder, &pReq->msgLen) < 0) return -1; + if (tDecodeBinaryAlloc(&decoder, (void**)&pReq->msg, NULL) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + +void tFreeSSubQueryMsg(SSubQueryMsg *pReq) { + if (NULL == pReq) { + return; + } + + taosMemoryFreeClear(pReq->sql); + taosMemoryFreeClear(pReq->msg); +} + +int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) { + int32_t headLen = sizeof(SMsgHead); + if (buf != NULL) { + buf = (char *)buf + headLen; + bufLen -= headLen; + } + + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeU64(&encoder, pReq->sId) < 0) return -1; + if (tEncodeU64(&encoder, pReq->queryId) < 0) return -1; + if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1; + if (tEncodeI64(&encoder, pReq->refId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->execId) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + if (buf != NULL) { + SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen); + pHead->vgId = htonl(pReq->header.vgId); + pHead->contLen = htonl(tlen + headLen); + } + + return tlen + headLen; +} + +int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) { + int32_t headLen = sizeof(SMsgHead); + + SMsgHead *pHead = buf; + pHead->vgId = pReq->header.vgId; + pHead->contLen = pReq->header.contLen; + + SDecoder decoder = {0}; + tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeU64(&decoder, &pReq->sId) < 0) return -1; + if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1; + if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->refId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + +int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeI32(&encoder, pRsp->code) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->tbFName) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->sversion) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->tversion) < 0) return -1; + if (tEncodeI64(&encoder, pRsp->affectedRows) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + return tlen; +} + +int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, (char *)buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeI32(&decoder, &pRsp->code) < 0) return -1; + if (tDecodeCStrTo(&decoder, pRsp->tbFName) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->sversion) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->tversion) < 0) return -1; + if (tDecodeI64(&decoder, &pRsp->affectedRows) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) { int32_t headLen = sizeof(SMsgHead); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 7e7f71b1764f79f6c6b52a77d8e981d285f12add..bb8a7cd140fa22992127d8c40f71d5fdbb0f251b 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -65,19 +65,37 @@ int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) { STbVerInfo *tbInfo = ctx ? &ctx->tbInfo : NULL; int64_t affectedRows = ctx ? ctx->affectedRows : 0; - SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); - pRsp->code = htonl(code); - pRsp->affectedRows = htobe64(affectedRows); + SQueryTableRsp rsp = {0}; + rsp.code = code; + rsp.affectedRows = affectedRows; + if (tbInfo) { - strcpy(pRsp->tbFName, tbInfo->tbFName); - pRsp->sversion = htonl(tbInfo->sversion); - pRsp->tversion = htonl(tbInfo->tversion); + strcpy(rsp.tbFName, tbInfo->tbFName); + rsp.sversion = tbInfo->sversion; + rsp.tversion = tbInfo->tversion; + } + + int32_t msgSize = tSerializeSQueryTableRsp(NULL, 0, &rsp); + if (msgSize < 0) { + qError("tSerializeSQueryTableRsp failed"); + QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + void *pRsp = rpcMallocCont(msgSize); + if (NULL == pRsp) { + qError("rpcMallocCont %d failed", msgSize); + QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + if (tSerializeSQueryTableRsp(pRsp, msgSize, &rsp) < 0) { + qError("tSerializeSQueryTableRsp %d failed", msgSize); + QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } SRpcMsg rpcRsp = { .msgType = rspType, .pCont = pRsp, - .contLen = sizeof(*pRsp), + .contLen = msgSize, .code = code, .info = *pConn, }; @@ -182,23 +200,37 @@ int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) { #endif int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { - STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq)); - if (NULL == req) { - QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq)); + STaskDropReq qMsg; + qMsg.header.vgId = mgmt->nodeId; + qMsg.header.contLen = 0; + qMsg.sId = sId; + qMsg.queryId = qId; + qMsg.taskId = tId; + qMsg.refId = rId; + qMsg.execId = eId; + + int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg); + if (msgSize < 0) { + QW_SCH_TASK_ELOG("tSerializeSTaskDropReq get size, msgSize:%d", msgSize); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + void *msg = rpcMallocCont(msgSize); + if (NULL == msg) { + QW_SCH_TASK_ELOG("rpcMallocCont %d failed", msgSize); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + if (tSerializeSTaskDropReq(msg, msgSize, &qMsg) < 0) { + QW_SCH_TASK_ELOG("tSerializeSTaskDropReq failed, msgSize:%d", msgSize); + rpcFreeCont(msg); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - - req->header.vgId = mgmt->nodeId; - req->sId = sId; - req->queryId = qId; - req->taskId = tId; - req->refId = rId; - req->execId = eId; SRpcMsg pNewMsg = { .msgType = TDMT_SCH_DROP_TASK, - .pCont = req, - .contLen = sizeof(STaskDropReq), + .pCont = msg, + .contLen = msgSize, .code = 0, .info = *pConn, }; @@ -247,22 +279,37 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { } int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { - STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq)); - if (NULL == req) { - QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq)); + STaskDropReq qMsg; + qMsg.header.vgId = mgmt->nodeId; + qMsg.header.contLen = 0; + qMsg.sId = sId; + qMsg.queryId = qId; + qMsg.taskId = tId; + qMsg.refId = rId; + qMsg.execId = eId; + + int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg); + if (msgSize < 0) { + QW_SCH_TASK_ELOG("tSerializeSTaskDropReq get size, msgSize:%d", msgSize); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + void *msg = rpcMallocCont(msgSize); + if (NULL == msg) { + QW_SCH_TASK_ELOG("rpcMallocCont %d failed", msgSize); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + if (tSerializeSTaskDropReq(msg, msgSize, &qMsg) < 0) { + QW_SCH_TASK_ELOG("tSerializeSTaskDropReq failed, msgSize:%d", msgSize); + rpcFreeCont(msg); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - - req->header.vgId = htonl(mgmt->nodeId); - req->sId = htobe64(sId); - req->queryId = htobe64(qId); - req->taskId = htobe64(tId); - req->refId = htobe64(rId); SRpcMsg brokenMsg = { .msgType = TDMT_SCH_DROP_TASK, - .pCont = req, - .contLen = sizeof(STaskDropReq), + .pCont = msg, + .contLen = msgSize, .code = TSDB_CODE_RPC_BROKEN_LINK, .info = *pConn, }; @@ -312,40 +359,33 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGran } int32_t code = 0; - SSubQueryMsg *msg = pMsg->pCont; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - - if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { - QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen); + SSubQueryMsg msg = {0}; + if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) { + QW_ELOG("tDeserializeSSubQueryMsg failed, contLen:%d", pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - msg->sId = be64toh(msg->sId); - msg->queryId = be64toh(msg->queryId); - msg->taskId = be64toh(msg->taskId); - msg->refId = be64toh(msg->refId); - msg->execId = ntohl(msg->execId); - msg->phyLen = ntohl(msg->phyLen); - msg->sqlLen = ntohl(msg->sqlLen); - msg->msgMask = ntohl(msg->msgMask); - - if (chkGrant && (!TEST_SHOW_REWRITE_MASK(msg->msgMask)) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) { - QW_ELOG("query failed cause of grant expired, msgMask:%d", msg->msgMask); + if (chkGrant && (!TEST_SHOW_REWRITE_MASK(msg.msgMask)) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) { + QW_ELOG("query failed cause of grant expired, msgMask:%d", msg.msgMask); + tFreeSSubQueryMsg(&msg); QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED); } - uint64_t sId = msg->sId; - uint64_t qId = msg->queryId; - uint64_t tId = msg->taskId; - int64_t rId = msg->refId; - int32_t eId = msg->execId; + uint64_t sId = msg.sId; + uint64_t qId = msg.queryId; + uint64_t tId = msg.taskId; + int64_t rId = msg.refId; + int32_t eId = msg.execId; SQWMsg qwMsg = { - .msgType = pMsg->msgType, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info}; + .msgType = pMsg->msgType, .msg = msg.msg, .msgLen = msg.msgLen, .connInfo = pMsg->info}; - QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p", pMsg->info.handle); - QW_ERR_RET(qwPreprocessQuery(QW_FPARAMS(), &qwMsg)); - QW_SCH_TASK_DLOG("prerocessQuery end, handle:%p", pMsg->info.handle); + QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p, SQL:%s", pMsg->info.handle, msg.sql); + code = qwPreprocessQuery(QW_FPARAMS(), &qwMsg); + QW_SCH_TASK_DLOG("prerocessQuery end, handle:%p, code:%x", pMsg->info.handle, code); + + tFreeSSubQueryMsg(&msg); return TSDB_CODE_SUCCESS; } @@ -355,19 +395,25 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SSubQueryMsg *msg = pMsg->pCont; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; + SSubQueryMsg msg = {0}; + if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) { + QW_ELOG("tDeserializeSSubQueryMsg failed, contLen:%d", pMsg->contLen); + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } - uint64_t sId = msg->sId; - uint64_t qId = msg->queryId; - uint64_t tId = msg->taskId; - int64_t rId = msg->refId; - int32_t eId = msg->execId; + uint64_t sId = msg.sId; + uint64_t qId = msg.queryId; + uint64_t tId = msg.taskId; + int64_t rId = msg.refId; + int32_t eId = msg.execId; QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle); qwAbortPrerocessQuery(QW_FPARAMS()); QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle); + tFreeSSubQueryMsg(&msg); + return TSDB_CODE_SUCCESS; } @@ -377,42 +423,41 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int } int32_t code = 0; - SSubQueryMsg *msg = pMsg->pCont; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE); QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1); - if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { - QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen); + SSubQueryMsg msg = {0}; + if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) { + QW_ELOG("tDeserializeSSubQueryMsg failed, contLen:%d", pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - uint64_t sId = msg->sId; - uint64_t qId = msg->queryId; - uint64_t tId = msg->taskId; - int64_t rId = msg->refId; - int32_t eId = msg->execId; + uint64_t sId = msg.sId; + uint64_t qId = msg.queryId; + uint64_t tId = msg.taskId; + int64_t rId = msg.refId; + int32_t eId = msg.execId; SQWMsg qwMsg = {.node = node, - .msg = msg->msg + msg->sqlLen, - .msgLen = msg->phyLen, + .msg = msg.msg, + .msgLen = msg.msgLen, .connInfo = pMsg->info, .msgType = pMsg->msgType}; - qwMsg.msgInfo.explain = msg->explain; - qwMsg.msgInfo.taskType = msg->taskType; - qwMsg.msgInfo.needFetch = msg->needFetch; + qwMsg.msgInfo.explain = msg.explain; + qwMsg.msgInfo.taskType = msg.taskType; + qwMsg.msgInfo.needFetch = msg.needFetch; - char *sql = strndup(msg->msg, msg->sqlLen); QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType), - pMsg->info.handle, sql); - QW_ERR_JRET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql)); + pMsg->info.handle, msg.sql); + code = qwProcessQuery(QW_FPARAMS(), &qwMsg, msg.sql); + msg.sql = NULL; + QW_SCH_TASK_DLOG("processQuery end, node:%p, code:%x", node, code); -_return: - - QW_SCH_TASK_DLOG("processQuery end, node:%p, code:%d", node, code); + tFreeSSubQueryMsg(&msg); - return code; + return TSDB_CODE_SUCCESS; } int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { @@ -548,28 +593,22 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 } int32_t code = 0; - STaskDropReq *msg = pMsg->pCont; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1); - if (NULL == msg || pMsg->contLen < sizeof(*msg)) { - QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen); + STaskDropReq msg = {0}; + if (tDeserializeSTaskDropReq(pMsg->pCont, pMsg->contLen, &msg) < 0) { + QW_ELOG("tDeserializeSTaskDropReq failed, contLen:%d", pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - msg->sId = be64toh(msg->sId); - msg->queryId = be64toh(msg->queryId); - msg->taskId = be64toh(msg->taskId); - msg->refId = be64toh(msg->refId); - msg->execId = ntohl(msg->execId); - - uint64_t sId = msg->sId; - uint64_t qId = msg->queryId; - uint64_t tId = msg->taskId; - int64_t rId = msg->refId; - int32_t eId = msg->execId; + uint64_t sId = msg.sId; + uint64_t qId = msg.queryId; + uint64_t tId = msg.taskId; + int64_t rId = msg.refId; + int32_t eId = msg.execId; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info}; diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 6078a2a3ac44b226e722c253521031afc0fb4eda..8a48977c777af8d794a38ed1721319c4b0646953 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -114,7 +114,7 @@ void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) { qwtqueryMsg.queryId = htobe64(atomic_add_fetch_64(&qwtTestQueryId, 1)); qwtqueryMsg.sId = htobe64(1); qwtqueryMsg.taskId = htobe64(1); - qwtqueryMsg.phyLen = htonl(100); + qwtqueryMsg.msgLen = htonl(100); qwtqueryMsg.sqlLen = 0; queryRpc->msgType = TDMT_SCH_QUERY; queryRpc->pCont = &qwtqueryMsg; @@ -131,12 +131,29 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) { } void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) { - dropMsg->sId = htobe64(1); - dropMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId)); - dropMsg->taskId = htobe64(1); + dropMsg->sId = 1; + dropMsg->queryId = atomic_load_64(&qwtTestQueryId); + dropMsg->taskId = 1; + + int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, dropMsg); + if (msgSize < 0) { + return; + } + + char *msg = (char*)taosMemoryCalloc(1, msgSize); + if (NULL == msg) { + return; + } + + if (tSerializeSTaskDropReq(msg, msgSize, dropMsg) < 0) { + taosMemoryFree(msg); + return; + } + + dropRpc->msgType = TDMT_SCH_DROP_TASK; - dropRpc->pCont = dropMsg; - dropRpc->contLen = sizeof(STaskDropReq); + dropRpc->pCont = msg; + dropRpc->contLen = msgSize; } int32_t qwtStringToPlan(const char *str, SSubplan **subplan) { diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index a6a2a6c301bfc0e079fedd88fe247ba16264b034..b0bc0df8506b2c8bd704fde26f103dcab2b6a61f 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -334,17 +334,17 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - SQueryTableRsp *rsp = (SQueryTableRsp *)msg; - rsp->code = ntohl(rsp->code); - rsp->sversion = ntohl(rsp->sversion); - rsp->tversion = ntohl(rsp->tversion); - rsp->affectedRows = be64toh(rsp->affectedRows); - - SCH_ERR_JRET(rsp->code); + SQueryTableRsp rsp = {0}; + if (tDeserializeSQueryTableRsp(msg, msgSize, &rsp) < 0) { + SCH_TASK_ELOG("tDeserializeSQueryTableRsp failed, msgSize:%d", msgSize); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_MSG); + } + + SCH_ERR_JRET(rsp.code); - SCH_ERR_JRET(schSaveJobExecRes(pJob, rsp)); + SCH_ERR_JRET(schSaveJobExecRes(pJob, &rsp)); - atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); + atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows); taosMemoryFreeClear(msg); @@ -1042,30 +1042,40 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, case TDMT_SCH_MERGE_QUERY: { SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx)); - uint32_t len = strlen(pJob->sql); - msgSize = sizeof(SSubQueryMsg) + pTask->msgLen + len; + SSubQueryMsg qMsg; + qMsg.header.vgId = addr->nodeId; + qMsg.header.contLen = 0; + qMsg.sId = schMgmt.sId; + qMsg.queryId = pJob->queryId; + qMsg.taskId = pTask->taskId; + qMsg.refId = pJob->refId; + qMsg.execId = pTask->execId; + qMsg.msgMask = (pTask->plan->showRewrite) ? QUERY_MSG_MASK_SHOW_REWRITE() : 0; + qMsg.taskType = TASK_TYPE_TEMP; + qMsg.explain = SCH_IS_EXPLAIN_JOB(pJob); + qMsg.needFetch = SCH_TASK_NEED_FETCH(pTask); + qMsg.sqlLen = strlen(pJob->sql); + qMsg.sql = pJob->sql; + qMsg.msgLen = pTask->msgLen; + qMsg.msg = pTask->msg; + + msgSize = tSerializeSSubQueryMsg(NULL, 0, &qMsg); + if (msgSize < 0) { + SCH_TASK_ELOG("tSerializeSSubQueryMsg get size, msgSize:%d", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { SCH_TASK_ELOG("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SSubQueryMsg *pMsg = msg; - pMsg->header.vgId = htonl(addr->nodeId); - pMsg->sId = htobe64(schMgmt.sId); - pMsg->queryId = htobe64(pJob->queryId); - pMsg->taskId = htobe64(pTask->taskId); - pMsg->refId = htobe64(pJob->refId); - pMsg->execId = htonl(pTask->execId); - pMsg->taskType = TASK_TYPE_TEMP; - pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob); - pMsg->needFetch = SCH_TASK_NEED_FETCH(pTask); - pMsg->phyLen = htonl(pTask->msgLen); - pMsg->sqlLen = htonl(len); - pMsg->msgMask = htonl((pTask->plan->showRewrite) ? QUERY_MSG_MASK_SHOW_REWRITE() : 0); - - memcpy(pMsg->msg, pJob->sql, len); - memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen); + if (tSerializeSSubQueryMsg(msg, msgSize, &qMsg) < 0) { + SCH_TASK_ELOG("tSerializeSSubQueryMsg failed, msgSize:%d", msgSize); + taosMemoryFree(msg); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } persistHandle = true; SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle()); @@ -1092,22 +1102,32 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, break; } case TDMT_SCH_DROP_TASK: { - msgSize = sizeof(STaskDropReq); + STaskDropReq qMsg; + qMsg.header.vgId = addr->nodeId; + qMsg.header.contLen = 0; + qMsg.sId = schMgmt.sId; + qMsg.queryId = pJob->queryId; + qMsg.taskId = pTask->taskId; + qMsg.refId = pJob->refId; + qMsg.execId = pTask->execId; + + msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg); + if (msgSize < 0) { + SCH_TASK_ELOG("tSerializeSTaskDropReq get size, msgSize:%d", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { SCH_TASK_ELOG("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - STaskDropReq *pMsg = msg; - - pMsg->header.vgId = htonl(addr->nodeId); - - pMsg->sId = htobe64(schMgmt.sId); - pMsg->queryId = htobe64(pJob->queryId); - pMsg->taskId = htobe64(pTask->taskId); - pMsg->refId = htobe64(pJob->refId); - pMsg->execId = htonl(pTask->execId); + if (tSerializeSTaskDropReq(msg, msgSize, &qMsg) < 0) { + SCH_TASK_ELOG("tSerializeSTaskDropReq failed, msgSize:%d", msgSize); + taosMemoryFree(msg); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } break; } case TDMT_SCH_QUERY_HEARTBEAT: {