From cafc5e4cae25d659a08d58d565c99e05bbdd9507 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 22 Nov 2022 10:43:56 +0800 Subject: [PATCH] enh: refact fetch message --- include/common/tmsg.h | 5 +- source/common/src/tmsg.c | 55 +++++++++++++++++++++ source/libs/executor/src/exchangeoperator.c | 37 +++++++++----- source/libs/qworker/src/qwMsg.c | 19 +++---- source/libs/scheduler/src/schRemote.c | 27 ++++++---- 5 files changed, 109 insertions(+), 34 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7d1c6a91b3..29d68aba14 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1629,7 +1629,6 @@ int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq); int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq); void tFreeSSubQueryMsg(SSubQueryMsg *pReq); - typedef struct { SMsgHead header; uint64_t sId; @@ -1667,6 +1666,10 @@ typedef struct { int32_t execId; } SResFetchReq; +int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq); +int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq); + + typedef struct { SMsgHead header; uint64_t sId; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c6ec4cabe5..7575554bcf 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4723,6 +4723,61 @@ void tFreeSSubQueryMsg(SSubQueryMsg *pReq) { taosMemoryFreeClear(pReq->msg); } + +int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) { + int32_t headLen = sizeof(SMsgHead); + if (buf != NULL) { + buf = (char *)buf + headLen; + bufLen -= headLen; + } + + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeU64(&encoder, pReq->sId) < 0) return -1; + if (tEncodeU64(&encoder, pReq->queryId) < 0) return -1; + if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->execId) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + if (buf != NULL) { + SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen); + pHead->vgId = htonl(pReq->header.vgId); + pHead->contLen = htonl(tlen + headLen); + } + + return tlen + headLen; +} + +int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) { + int32_t headLen = sizeof(SMsgHead); + + SMsgHead *pHead = buf; + pHead->vgId = pReq->header.vgId; + pHead->contLen = pReq->header.contLen; + + SDecoder decoder = {0}; + tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeU64(&decoder, &pReq->sId) < 0) return -1; + if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1; + if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + + int32_t tSerializeSTqOffsetVal(SEncoder *pEncoder, STqOffsetVal *pOffset) { if (tEncodeI8(pEncoder, pOffset->type) < 0) return -1; if (tEncodeI64(pEncoder, pOffset->uid) < 0) return -1; diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index c57a1b38eb..ffbdc1362d 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -403,27 +403,42 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas loadRemoteDataCallback(pWrapper, &pBuf, code); taosMemoryFree(pWrapper); } else { - SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq)); - if (NULL == pMsg) { + SResFetchReq req = {0}; + req.header.vgId = pSource->addr.nodeId; + req.sId = pSource->schedId; + req.taskId = pSource->taskId; + req.queryId = pTaskInfo->id.queryId; + req.execId = pSource->execId; + + int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req); + if (msgSize < 0) { pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; taosMemoryFree(pWrapper); return pTaskInfo->code; } + + void* msg = taosMemoryCalloc(1, msgSize); + if (NULL == msg) { + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + taosMemoryFree(pWrapper); + return pTaskInfo->code; + } + + if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) { + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + taosMemoryFree(pWrapper); + taosMemoryFree(msg); + return pTaskInfo->code; + } qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId, sourceIndex, totalSources); - pMsg->header.vgId = htonl(pSource->addr.nodeId); - pMsg->sId = htobe64(pSource->schedId); - pMsg->taskId = htobe64(pSource->taskId); - pMsg->queryId = htobe64(pTaskInfo->id.queryId); - pMsg->execId = htonl(pSource->execId); - // send the fetch remote task result reques SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { - taosMemoryFreeClear(pMsg); + taosMemoryFreeClear(msg); taosMemoryFree(pWrapper); qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; @@ -432,8 +447,8 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas pMsgSendInfo->param = pWrapper; pMsgSendInfo->paramFreeFp = taosMemoryFree; - pMsgSendInfo->msgInfo.pData = pMsg; - pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); + pMsgSendInfo->msgInfo.pData = msg; + pMsgSendInfo->msgInfo.len = msgSize; pMsgSendInfo->msgType = pSource->fetchMsgType; pMsgSendInfo->fp = loadRemoteDataCallback; diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index bb8a7cd140..d9a7cea411 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -499,27 +499,22 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int return TSDB_CODE_QRY_INVALID_INPUT; } - SResFetchReq *msg = pMsg->pCont; + SResFetchReq req = {0}; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1); - if (NULL == msg || pMsg->contLen < sizeof(*msg)) { - QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen); + if (tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0) { + QW_ELOG("tDeserializeSResFetchReq %d failed", pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - msg->sId = be64toh(msg->sId); - msg->queryId = be64toh(msg->queryId); - msg->taskId = be64toh(msg->taskId); - msg->execId = ntohl(msg->execId); - - uint64_t sId = msg->sId; - uint64_t qId = msg->queryId; - uint64_t tId = msg->taskId; + uint64_t sId = req.sId; + uint64_t qId = req.queryId; + uint64_t tId = req.taskId; int64_t rId = 0; - int32_t eId = msg->execId; + int32_t eId = req.execId; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info, .msgType = pMsg->msgType}; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index b0bc0df850..c154060d21 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1083,22 +1083,29 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, } case TDMT_SCH_FETCH: case TDMT_SCH_MERGE_FETCH: { - msgSize = sizeof(SResFetchReq); + SResFetchReq req = {0}; + req.header.vgId = addr->nodeId; + req.sId = schMgmt.sId; + req.queryId = pJob->queryId; + req.taskId = pTask->taskId; + req.execId = pTask->execId; + + msgSize = tSerializeSResFetchReq(NULL, 0, &req); + if (msgSize < 0) { + SCH_TASK_ELOG("tSerializeSResFetchReq get size, msgSize:%d", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { SCH_TASK_ELOG("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SResFetchReq *pMsg = msg; - - pMsg->header.vgId = htonl(addr->nodeId); - - pMsg->sId = htobe64(schMgmt.sId); - pMsg->queryId = htobe64(pJob->queryId); - pMsg->taskId = htobe64(pTask->taskId); - pMsg->execId = htonl(pTask->execId); - + if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) { + SCH_TASK_ELOG("tSerializeSResFetchReq %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } break; } case TDMT_SCH_DROP_TASK: { -- GitLab