From 3f88dbe256c610c7fbac22864b0637de67ae7a1b Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 18 Nov 2022 11:43:03 +0800 Subject: [PATCH] enh: refact client server messages --- include/common/tmsg.h | 3 +++ source/client/src/clientTmq.c | 2 +- source/common/src/tmsg.c | 37 +++++++++++++++++++++++++++++++++ source/libs/qworker/src/qwMsg.c | 32 +++++++++++++++++++++------- 4 files changed, 66 insertions(+), 8 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9b7f3bff4c..69d272386e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1741,6 +1741,9 @@ typedef struct { int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq); int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq); +int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp); +int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp); + typedef struct { int32_t code; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 1fe89bcafd..28b41b97c5 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -756,7 +756,7 @@ void tmqSendHbReq(void* param, void* tmrId) { } sendInfo->msgInfo = (SDataBuf){ .pData = pReq, - .len = sizeof(SMqHbReq), + .len = tlen, .handle = NULL, }; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 8ddb60655e..1164c88477 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4778,6 +4778,43 @@ int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) return 0; } +int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeI32(&encoder, pRsp->code) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->tbFName) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->sversion) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->tversion) < 0) return -1; + if (tEncodeI64(&encoder, pRsp->affectedRows) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + return tlen; +} + +int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, (char *)buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeI32(&decoder, &pRsp->code) < 0) return -1; + if (tDecodeCStrTo(&decoder, pRsp->tbFName) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->sversion) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->tversion) < 0) return -1; + if (tDecodeI64(&decoder, &pRsp->affectedRows) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) { int32_t headLen = sizeof(SMsgHead); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 82e71bc8d0..71f24f8acd 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -65,19 +65,37 @@ int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) { STbVerInfo *tbInfo = ctx ? &ctx->tbInfo : NULL; int64_t affectedRows = ctx ? ctx->affectedRows : 0; - SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); - pRsp->code = htonl(code); - pRsp->affectedRows = htobe64(affectedRows); + SQueryTableRsp rsp = {0}; + rsp.code = code; + rsp.affectedRows = affectedRows; + if (tbInfo) { - strcpy(pRsp->tbFName, tbInfo->tbFName); - pRsp->sversion = htonl(tbInfo->sversion); - pRsp->tversion = htonl(tbInfo->tversion); + strcpy(rsp.tbFName, tbInfo->tbFName); + rsp.sversion = tbInfo->sversion; + rsp.tversion = tbInfo->tversion; + } + + int32_t msgSize = tSerializeSQueryTableRsp(NULL, 0, &rsp); + if (msgSize < 0) { + qError("tSerializeSQueryTableRsp failed"); + QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + void *pRsp = rpcMallocCont(msgSize); + if (NULL == pRsp) { + qError("rpcMallocCont %d failed", msgSize); + QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + if (tSerializeSQueryTableRsp(pRsp, msgSize, &rsp) < 0) { + qError("tSerializeSQueryTableRsp %d failed", msgSize); + QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } SRpcMsg rpcRsp = { .msgType = rspType, .pCont = pRsp, - .contLen = sizeof(*pRsp), + .contLen = msgSize, .code = code, .info = *pConn, }; -- GitLab