From 03c026a54149f84a8087a49f9336d7123d83a657 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 16 Feb 2022 13:36:03 +0800 Subject: [PATCH] serialize status msg --- include/common/tmsg.h | 8 +- source/common/src/tmsg.c | 174 ++++++++++++++----------- source/dnode/mgmt/impl/src/dndMgmt.c | 8 +- source/dnode/mnode/impl/src/mndDnode.c | 10 +- 4 files changed, 115 insertions(+), 85 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 59f8d38345..6f35dd797e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -696,8 +696,8 @@ typedef struct { SArray* pVloads; // array of SVnodeLoad } SStatusReq; -int32_t tSerializeSStatusReq(void** buf, SStatusReq* pReq); -void* tDeserializeSStatusReq(void* buf, SStatusReq* pReq); +int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); +int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); typedef struct { int32_t dnodeId; @@ -716,8 +716,8 @@ typedef struct { SArray* pDnodeEps; // Array of SDnodeEp } SStatusRsp; -int32_t tSerializeSStatusRsp(void** buf, SStatusRsp* pRsp); -void* tDeserializeSStatusRsp(void* buf, SStatusRsp* pRsp); +int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp); +int32_t tDeserializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp); typedef struct { int32_t reserve; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e763c0c85f..144054d078 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -583,144 +583,172 @@ void tFreeSMAltertbReq(SMAltertbReq *pReq) { pReq->pFields = NULL; } -int32_t tSerializeSStatusReq(void **buf, SStatusReq *pReq) { - int32_t tlen = 0; +int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; // status - tlen += taosEncodeFixedI32(buf, pReq->sver); - tlen += taosEncodeFixedI64(buf, pReq->dver); - tlen += taosEncodeFixedI32(buf, pReq->dnodeId); - tlen += taosEncodeFixedI64(buf, pReq->clusterId); - tlen += taosEncodeFixedI64(buf, pReq->rebootTime); - tlen += taosEncodeFixedI64(buf, pReq->updateTime); - tlen += taosEncodeFixedI32(buf, pReq->numOfCores); - tlen += taosEncodeFixedI32(buf, pReq->numOfSupportVnodes); - tlen += taosEncodeString(buf, pReq->dnodeEp); + if (tEncodeI32(&encoder, pReq->sver) < 0) return -1; + if (tEncodeI64(&encoder, pReq->dver) < 0) return -1; + if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1; + if (tEncodeI64(&encoder, pReq->clusterId) < 0) return -1; + if (tEncodeI64(&encoder, pReq->rebootTime) < 0) return -1; + if (tEncodeI64(&encoder, pReq->updateTime) < 0) return -1; + if (tEncodeI32(&encoder, pReq->numOfCores) < 0) return -1; + if (tEncodeI32(&encoder, pReq->numOfSupportVnodes) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->dnodeEp) < 0) return -1; // cluster cfg - tlen += taosEncodeFixedI32(buf, pReq->clusterCfg.statusInterval); - tlen += taosEncodeFixedI64(buf, pReq->clusterCfg.checkTime); - tlen += taosEncodeString(buf, pReq->clusterCfg.timezone); - tlen += taosEncodeString(buf, pReq->clusterCfg.locale); - tlen += taosEncodeString(buf, pReq->clusterCfg.charset); + if (tEncodeI32(&encoder, pReq->clusterCfg.statusInterval) < 0) return -1; + if (tEncodeI64(&encoder, pReq->clusterCfg.checkTime) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->clusterCfg.timezone) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->clusterCfg.locale) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->clusterCfg.charset) < 0) return -1; // vnode loads int32_t vlen = (int32_t)taosArrayGetSize(pReq->pVloads); - tlen += taosEncodeFixedI32(buf, vlen); + if (tEncodeI32(&encoder, vlen) < 0) return -1; for (int32_t i = 0; i < vlen; ++i) { SVnodeLoad *pload = taosArrayGet(pReq->pVloads, i); - tlen += taosEncodeFixedI32(buf, pload->vgId); - tlen += taosEncodeFixedI8(buf, pload->role); - tlen += taosEncodeFixedI64(buf, pload->numOfTables); - tlen += taosEncodeFixedI64(buf, pload->numOfTimeSeries); - tlen += taosEncodeFixedI64(buf, pload->totalStorage); - tlen += taosEncodeFixedI64(buf, pload->compStorage); - tlen += taosEncodeFixedI64(buf, pload->pointsWritten); + if (tEncodeI32(&encoder, pload->vgId) < 0) return -1; + if (tEncodeI8(&encoder, pload->role) < 0) return -1; + if (tEncodeI64(&encoder, pload->numOfTables) < 0) return -1; + if (tEncodeI64(&encoder, pload->numOfTimeSeries) < 0) return -1; + if (tEncodeI64(&encoder, pload->totalStorage) < 0) return -1; + if (tEncodeI64(&encoder, pload->compStorage) < 0) return -1; + if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1; } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); return tlen; } -void *tDeserializeSStatusReq(void *buf, SStatusReq *pReq) { +int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + // status - buf = taosDecodeFixedI32(buf, &pReq->sver); - buf = taosDecodeFixedI64(buf, &pReq->dver); - buf = taosDecodeFixedI32(buf, &pReq->dnodeId); - buf = taosDecodeFixedI64(buf, &pReq->clusterId); - buf = taosDecodeFixedI64(buf, &pReq->rebootTime); - buf = taosDecodeFixedI64(buf, &pReq->updateTime); - buf = taosDecodeFixedI32(buf, &pReq->numOfCores); - buf = taosDecodeFixedI32(buf, &pReq->numOfSupportVnodes); - buf = taosDecodeStringTo(buf, pReq->dnodeEp); + if (tDecodeI32(&decoder, &pReq->sver) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->dver) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->clusterId) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->rebootTime) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->updateTime) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->numOfCores) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->numOfSupportVnodes) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->dnodeEp) < 0) return -1; // cluster cfg - buf = taosDecodeFixedI32(buf, &pReq->clusterCfg.statusInterval); - buf = taosDecodeFixedI64(buf, &pReq->clusterCfg.checkTime); - buf = taosDecodeStringTo(buf, pReq->clusterCfg.timezone); - buf = taosDecodeStringTo(buf, pReq->clusterCfg.locale); - buf = taosDecodeStringTo(buf, pReq->clusterCfg.charset); + if (tDecodeI32(&decoder, &pReq->clusterCfg.statusInterval) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->clusterCfg.checkTime) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->clusterCfg.timezone) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->clusterCfg.locale) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->clusterCfg.charset) < 0) return -1; // vnode loads int32_t vlen = 0; - buf = taosDecodeFixedI32(buf, &vlen); + if (tDecodeI32(&decoder, &vlen) < 0) return -1; pReq->pVloads = taosArrayInit(vlen, sizeof(SVnodeLoad)); if (pReq->pVloads == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return -1; } for (int32_t i = 0; i < vlen; ++i) { SVnodeLoad vload = {0}; - buf = taosDecodeFixedI32(buf, &vload.vgId); - buf = taosDecodeFixedI8(buf, &vload.role); - buf = taosDecodeFixedI64(buf, &vload.numOfTables); - buf = taosDecodeFixedI64(buf, &vload.numOfTimeSeries); - buf = taosDecodeFixedI64(buf, &vload.totalStorage); - buf = taosDecodeFixedI64(buf, &vload.compStorage); - buf = taosDecodeFixedI64(buf, &vload.pointsWritten); + if (tDecodeI32(&decoder, &vload.vgId) < 0) return -1; + if (tDecodeI8(&decoder, &vload.role) < 0) return -1; + if (tDecodeI64(&decoder, &vload.numOfTables) < 0) return -1; + if (tDecodeI64(&decoder, &vload.numOfTimeSeries) < 0) return -1; + if (tDecodeI64(&decoder, &vload.totalStorage) < 0) return -1; + if (tDecodeI64(&decoder, &vload.compStorage) < 0) return -1; + if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1; if (taosArrayPush(pReq->pVloads, &vload) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return -1; } } - return buf; + tEndDecode(&decoder); + tCoderClear(&decoder); + return 0; } -int32_t tSerializeSStatusRsp(void **buf, SStatusRsp *pRsp) { - int32_t tlen = 0; +int32_t tSerializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); - // status; - tlen += taosEncodeFixedI64(buf, pRsp->dver); + if (tStartEncode(&encoder) < 0) return -1; + + // status + if (tEncodeI64(&encoder, pRsp->dver) < 0) return -1; // dnode cfg - tlen += taosEncodeFixedI32(buf, pRsp->dnodeCfg.dnodeId); - tlen += taosEncodeFixedI64(buf, pRsp->dnodeCfg.clusterId); + if (tEncodeI32(&encoder, pRsp->dnodeCfg.dnodeId) < 0) return -1; + if (tEncodeI64(&encoder, pRsp->dnodeCfg.clusterId) < 0) return -1; // dnode eps int32_t dlen = (int32_t)taosArrayGetSize(pRsp->pDnodeEps); - tlen += taosEncodeFixedI32(buf, dlen); + if (tEncodeI32(&encoder, dlen) < 0) return -1; for (int32_t i = 0; i < dlen; ++i) { SDnodeEp *pDnodeEp = taosArrayGet(pRsp->pDnodeEps, i); - tlen += taosEncodeFixedI32(buf, pDnodeEp->id); - tlen += taosEncodeFixedI8(buf, pDnodeEp->isMnode); - tlen += taosEncodeString(buf, pDnodeEp->ep.fqdn); - tlen += taosEncodeFixedU16(buf, pDnodeEp->ep.port); + if (tEncodeI32(&encoder, pDnodeEp->id) < 0) return -1; + if (tEncodeI8(&encoder, pDnodeEp->isMnode) < 0) return -1; + if (tEncodeCStr(&encoder, pDnodeEp->ep.fqdn) < 0) return -1; + if (tEncodeU16(&encoder, pDnodeEp->ep.port) < 0) return -1; } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); return tlen; } -void *tDeserializeSStatusRsp(void *buf, SStatusRsp *pRsp) { +int32_t tDeserializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + // status - buf = taosDecodeFixedI64(buf, &pRsp->dver); + if (tDecodeI64(&decoder, &pRsp->dver) < 0) return -1; // cluster cfg - buf = taosDecodeFixedI32(buf, &pRsp->dnodeCfg.dnodeId); - buf = taosDecodeFixedI64(buf, &pRsp->dnodeCfg.clusterId); + if (tDecodeI32(&decoder, &pRsp->dnodeCfg.dnodeId) < 0) return -1; + if (tDecodeI64(&decoder, &pRsp->dnodeCfg.clusterId) < 0) return -1; // dnode eps int32_t dlen = 0; - buf = taosDecodeFixedI32(buf, &dlen); + if (tDecodeI32(&decoder, &dlen) < 0) return -1; pRsp->pDnodeEps = taosArrayInit(dlen, sizeof(SDnodeEp)); if (pRsp->pDnodeEps == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return -1; } for (int32_t i = 0; i < dlen; ++i) { SDnodeEp dnodeEp = {0}; - buf = taosDecodeFixedI32(buf, &dnodeEp.id); - buf = taosDecodeFixedI8(buf, &dnodeEp.isMnode); - buf = taosDecodeStringTo(buf, dnodeEp.ep.fqdn); - buf = taosDecodeFixedU16(buf, &dnodeEp.ep.port); + if (tDecodeI32(&decoder, &dnodeEp.id) < 0) return -1; + if (tDecodeI8(&decoder, &dnodeEp.isMnode) < 0) return -1; + if (tDecodeCStrTo(&decoder, dnodeEp.ep.fqdn) < 0) return -1; + if (tDecodeU16(&decoder, &dnodeEp.ep.port) < 0) return -1; if (taosArrayPush(pRsp->pDnodeEps, &dnodeEp) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return -1; } } - return buf; + tEndDecode(&decoder); + tCoderClear(&decoder); + return 0; } int32_t tSerializeSCreateAcctReq(void *buf, int32_t bufLen, SCreateAcctReq *pReq) { diff --git a/source/dnode/mgmt/impl/src/dndMgmt.c b/source/dnode/mgmt/impl/src/dndMgmt.c index 41dd54a0ff..ff8efb6a1d 100644 --- a/source/dnode/mgmt/impl/src/dndMgmt.c +++ b/source/dnode/mgmt/impl/src/dndMgmt.c @@ -379,10 +379,9 @@ void dndSendStatusReq(SDnode *pDnode) { req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad)); dndGetVnodeLoads(pDnode, req.pVloads); - int32_t contLen = tSerializeSStatusReq(NULL, &req); + int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); void *pHead = rpcMallocCont(contLen); - void *pBuf = pHead; - tSerializeSStatusReq(&pBuf, &req); + tSerializeSStatusReq(pHead, contLen, &req); taosArrayDestroy(req.pVloads); SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527}; @@ -437,7 +436,8 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) { } } else { SStatusRsp statusRsp = {0}; - if (pRsp->pCont != NULL && pRsp->contLen != 0 && tDeserializeSStatusRsp(pRsp->pCont, &statusRsp) != NULL) { + if (pRsp->pCont != NULL && pRsp->contLen != 0 && + tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) { pMgmt->dver = statusRsp.dver; dndUpdateDnodeCfg(pDnode, &statusRsp.dnodeCfg); dndUpdateDnodeEps(pDnode, statusRsp.pDnodeEps); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index bad0aa8a0b..07b38bdc6a 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -302,7 +302,10 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { SDnodeObj *pDnode = NULL; int32_t code = -1; - if (tDeserializeSStatusReq(pReq->rpcMsg.pCont, &statusReq) == NULL) goto PROCESS_STATUS_MSG_OVER; + if (tDeserializeSStatusReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &statusReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto PROCESS_STATUS_MSG_OVER; + } if (statusReq.dnodeId == 0) { pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp); @@ -410,10 +413,9 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { mndGetDnodeData(pMnode, statusRsp.pDnodeEps); - int32_t contLen = tSerializeSStatusRsp(NULL, &statusRsp); + int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp); void *pHead = rpcMallocCont(contLen); - void *pBuf = pHead; - tSerializeSStatusRsp(&pBuf, &statusRsp); + tSerializeSStatusRsp(pHead, contLen, &statusRsp); taosArrayDestroy(statusRsp.pDnodeEps); pReq->contLen = contLen; -- GitLab