diff --git a/include/common/tmsg.h b/include/common/tmsg.h index afe54bce0b1accf5c035398ca981c64911c8f60e..556c4a9a1ff6e82491c17dedbb8938814b5ae103 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3169,8 +3169,7 @@ typedef struct { typedef struct { SMsgHead header; - int32_t msgNum; - SBatchMsg msg[]; + SArray* pMsgs; //SArray } SBatchReq; typedef struct { @@ -3179,17 +3178,40 @@ typedef struct { int32_t msgLen; int32_t rspCode; void* msg; +} SBatchRspMsg; + +typedef struct { + SArray* pRsps; //SArray } SBatchRsp; -static FORCE_INLINE void tFreeSBatchRsp(void* p) { +int32_t tSerializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq); +int32_t tDeserializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq); +static FORCE_INLINE void tFreeSBatchReqMsg(void* msg) { + if (NULL == msg) { + return; + } + SBatchMsg* pMsg = (SBatchMsg*)msg; + taosMemoryFree(pMsg->msg); +} + +int32_t tSerializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp); +int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp); + +static FORCE_INLINE void tFreeSBatchRspMsg(void* p) { if (NULL == p) { return; } - SBatchRsp* pRsp = (SBatchRsp*)p; + SBatchRspMsg* pRsp = (SBatchRspMsg*)p; taosMemoryFree(pRsp->msg); } +int32_t tSerializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq); +int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq); +int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq); +int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq); + + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index d0971b013fcc12519b452b5800f35322acc16b20..5291130e4fb80733dd133bfafe49ba26d2ca2016 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -351,7 +351,7 @@ typedef struct SVgDataBlocks { SVgroupInfo vg; int32_t numOfTables; // number of tables in current submit block uint32_t size; - void* pData; // SMsgDesc + SSubmitReq + SSubmitBlk + ... + void* pData; // SSubmitReq + SSubmitBlk + ... } SVgDataBlocks; typedef void (*FFreeDataBlockHash)(SHashObj*); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index ab44236d96ad877988231bcbf15369c14df35855..28b41b97c51d8fb440a26b13b533e14e9316e151 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -728,12 +728,26 @@ void tmqSendHbReq(void* param, void* tmrId) { taosMemoryFree(param); return; } - int64_t consumerId = tmq->consumerId; - int32_t epoch = tmq->epoch; - SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq)); - if (pReq == NULL) goto OVER; - pReq->consumerId = htobe64(consumerId); - pReq->epoch = epoch; + + SMqHbReq req = {0}; + req.consumerId = tmq->consumerId; + req.epoch = tmq->epoch; + + int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req); + if (tlen < 0) { + tscError("tSerializeSMqHbReq failed"); + return; + } + void *pReq = taosMemoryCalloc(1, tlen); + if (tlen < 0) { + tscError("failed to malloc MqHbReq msg, size:%d", tlen); + return; + } + if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) { + tscError("tSerializeSMqHbReq %d failed", tlen); + taosMemoryFree(pReq); + return; + } SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { @@ -742,7 +756,7 @@ void tmqSendHbReq(void* param, void* tmrId) { } sendInfo->msgInfo = (SDataBuf){ .pData = pReq, - .len = sizeof(SMqHbReq), + .len = tlen, .handle = NULL, }; @@ -1378,21 +1392,31 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { } atomic_store_32(&tmq->epSkipCnt, 0); #endif - int32_t tlen = sizeof(SMqAskEpReq); - SMqAskEpReq* req = taosMemoryCalloc(1, tlen); - if (req == NULL) { - tscError("failed to malloc get subscribe ep buf"); - /*atomic_store_8(&tmq->epStatus, 0);*/ + SMqAskEpReq req = {0}; + req.consumerId = tmq->consumerId; + req.epoch = tmq->epoch; + strcpy(req.cgroup, tmq->groupId); + + int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req); + if (tlen < 0) { + tscError("tSerializeSMqAskEpReq failed"); + return -1; + } + void *pReq = taosMemoryCalloc(1, tlen); + if (tlen < 0) { + tscError("failed to malloc askEpReq msg, size:%d", tlen); + return -1; + } + if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) { + tscError("tSerializeSMqAskEpReq %d failed", tlen); + taosMemoryFree(pReq); return -1; } - req->consumerId = htobe64(tmq->consumerId); - req->epoch = htonl(tmq->epoch); - strcpy(req->cgroup, tmq->groupId); SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam)); if (pParam == NULL) { tscError("failed to malloc subscribe param"); - taosMemoryFree(req); + taosMemoryFree(pReq); /*atomic_store_8(&tmq->epStatus, 0);*/ return -1; } @@ -1405,13 +1429,13 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { if (sendInfo == NULL) { tsem_destroy(&pParam->rspSem); taosMemoryFree(pParam); - taosMemoryFree(req); + taosMemoryFree(pReq); /*atomic_store_8(&tmq->epStatus, 0);*/ return -1; } sendInfo->msgInfo = (SDataBuf){ - .pData = req, + .pData = pReq, .len = tlen, .handle = NULL, }; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4485e8df0c37f14a6e8e87b431d601fcd50305e1..52d9c5b1991001ecf25dc53465d64bdb14166a49 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4446,6 +4446,204 @@ void tFreeSExplainRsp(SExplainRsp *pRsp) { taosMemoryFreeClear(pRsp->subplanInfo); } +int32_t tSerializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq) { + int32_t headLen = sizeof(SMsgHead); + if (buf != NULL) { + buf = (char *)buf + headLen; + bufLen -= headLen; + } + + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + + int32_t num = taosArrayGetSize(pReq->pMsgs); + if (tEncodeI32(&encoder, num) < 0) return -1; + for (int32_t i = 0; i < num; ++i) { + SBatchMsg *pMsg = taosArrayGet(pReq->pMsgs, i); + if (tEncodeI32(&encoder, pMsg->msgIdx) < 0) return -1; + if (tEncodeI32(&encoder, pMsg->msgType) < 0) return -1; + if (tEncodeI32(&encoder, pMsg->msgLen) < 0) return -1; + if (tEncodeBinary(&encoder, pMsg->msg, pMsg->msgLen) < 0) return -1; + } + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + if (buf != NULL) { + SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen); + pHead->vgId = htonl(pReq->header.vgId); + pHead->contLen = htonl(tlen + headLen); + } + + return tlen + headLen; +} + +int32_t tDeserializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq) { + int32_t headLen = sizeof(SMsgHead); + + SMsgHead *pHead = buf; + pHead->vgId = pReq->header.vgId; + pHead->contLen = pReq->header.contLen; + + SDecoder decoder = {0}; + tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen); + + if (tStartDecode(&decoder) < 0) return -1; + + int32_t num = 0; + if (tDecodeI32(&decoder, &num) < 0) return -1; + if (num <= 0) { + pReq->pMsgs = NULL; + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; + } + + pReq->pMsgs = taosArrayInit(num, sizeof(SBatchMsg)); + if (NULL == pReq->pMsgs) return -1; + for (int32_t i = 0; i < num; ++i) { + SBatchMsg msg = {0}; + if (tDecodeI32(&decoder, &msg.msgIdx) < 0) return -1; + if (tDecodeI32(&decoder, &msg.msgType) < 0) return -1; + if (tDecodeI32(&decoder, &msg.msgLen) < 0) return -1; + if (tDecodeBinaryAlloc(&decoder, &msg.msg, NULL) < 0) return -1; + if (NULL == taosArrayPush(pReq->pMsgs, &msg)) return -1; + } + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + +int32_t tSerializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + + int32_t num = taosArrayGetSize(pRsp->pRsps); + if (tEncodeI32(&encoder, num) < 0) return -1; + for (int32_t i = 0; i < num; ++i) { + SBatchRspMsg *pMsg = taosArrayGet(pRsp->pRsps, i); + if (tEncodeI32(&encoder, pMsg->reqType) < 0) return -1; + if (tEncodeI32(&encoder, pMsg->msgIdx) < 0) return -1; + if (tEncodeI32(&encoder, pMsg->msgLen) < 0) return -1; + if (tEncodeI32(&encoder, pMsg->rspCode) < 0) return -1; + if (tEncodeBinary(&encoder, pMsg->msg, pMsg->msgLen) < 0) return -1; + } + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + return tlen; +} + +int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, (char *)buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + + int32_t num = 0; + if (tDecodeI32(&decoder, &num) < 0) return -1; + if (num <= 0) { + pRsp->pRsps = NULL; + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; + } + + pRsp->pRsps = taosArrayInit(num, sizeof(SBatchRspMsg)); + if (NULL == pRsp->pRsps) return -1; + for (int32_t i = 0; i < num; ++i) { + SBatchRspMsg msg = {0}; + if (tDecodeI32(&decoder, &msg.reqType) < 0) return -1; + if (tDecodeI32(&decoder, &msg.msgIdx) < 0) return -1; + if (tDecodeI32(&decoder, &msg.msgLen) < 0) return -1; + if (tDecodeI32(&decoder, &msg.rspCode) < 0) return -1; + if (tDecodeBinaryAlloc(&decoder, &msg.msg, NULL) < 0) return -1; + if (NULL == taosArrayPush(pRsp->pRsps, &msg)) return -1; + } + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + + +int32_t tSerializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->epoch) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->cgroup) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + return tlen; +} + +int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, (char *)buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->cgroup) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + +int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->epoch) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + return tlen; +} + +int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, (char *)buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + + int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 62ad5bae157f9cfd15a23caa971424c0b7c92250..300251d64d0626859fdb5a79fc7df81bcb016ca4 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -325,9 +325,14 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; - SMqHbReq *pReq = (SMqHbReq *)pMsg->pCont; - int64_t consumerId = be64toh(pReq->consumerId); + SMqHbReq req = {0}; + if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + int64_t consumerId = req.consumerId; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { mError("consumer %" PRId64 " not exist", consumerId); @@ -359,10 +364,16 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; - SMqAskEpReq *pReq = (SMqAskEpReq *)pMsg->pCont; + SMqAskEpReq req = {0}; SMqAskEpRsp rsp = {0}; - int64_t consumerId = be64toh(pReq->consumerId); - int32_t epoch = ntohl(pReq->epoch); + + if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + int64_t consumerId = req.consumerId; + int32_t epoch = req.epoch; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { @@ -370,7 +381,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { return -1; } - ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0); + ASSERT(strcmp(req.cgroup, pConsumer->cgroup) == 0); atomic_store_32(&pConsumer->hbStatus, 0); diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 3a7c25f7f96094aa5ceb82d8526d00c30c07d2a6..5278fc776107c31e43375b16b708c0c8ad34901d 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -63,81 +63,60 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { return code; } + +static FORCE_INLINE void mnodeFreeSBatchRspMsg(void* p) { + if (NULL == p) { + return; + } + + SBatchRspMsg* pRsp = (SBatchRspMsg*)p; + rpcFreeCont(pRsp->msg); +} + int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { int32_t code = 0; - int32_t offset = 0; int32_t rspSize = 0; - SBatchReq *batchReq = (SBatchReq *)pMsg->pCont; - int32_t msgNum = ntohl(batchReq->msgNum); - offset += sizeof(SBatchReq); + SBatchReq batchReq = {0}; SBatchMsg req = {0}; - SBatchRsp rsp = {0}; + SBatchRspMsg rsp = {0}; + SBatchRsp batchRsp = {0}; SRpcMsg reqMsg = *pMsg; - SRpcMsg rspMsg = {0}; void *pRsp = NULL; SMnode *pMnode = pMsg->info.node; + if (tDeserializeSBatchReq(pMsg->pCont, pMsg->contLen, &batchReq)) { + code = TSDB_CODE_OUT_OF_MEMORY; + mError("tDeserializeSBatchReq failed"); + goto _exit; + } + + int32_t msgNum = taosArrayGetSize(batchReq.pMsgs); if (msgNum >= MAX_META_MSG_IN_BATCH) { code = TSDB_CODE_INVALID_MSG; mError("too many msgs %d in mnode batch meta req", msgNum); goto _exit; } - SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp)); - if (NULL == batchRsp) { + batchRsp.pRsps = taosArrayInit(msgNum, sizeof(SBatchRspMsg)); + if (NULL == batchRsp.pRsps) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } for (int32_t i = 0; i < msgNum; ++i) { - if (offset >= pMsg->contLen) { - mError("offset %d is bigger than contLen %d", offset, pMsg->contLen); - terrno = TSDB_CODE_INVALID_MSG_LEN; - taosArrayDestroy(batchRsp); - return -1; - } - - req.msgIdx = ntohl(*(int32_t *)((char *)pMsg->pCont + offset)); - offset += sizeof(req.msgIdx); - if (offset >= pMsg->contLen) { - mError("offset %d is bigger than contLen %d", offset, pMsg->contLen); - terrno = TSDB_CODE_INVALID_MSG_LEN; - taosArrayDestroy(batchRsp); - return -1; - } - - req.msgType = ntohl(*(int32_t *)((char *)pMsg->pCont + offset)); - offset += sizeof(req.msgType); - if (offset >= pMsg->contLen) { - mError("offset %d is bigger than contLen %d", offset, pMsg->contLen); - terrno = TSDB_CODE_INVALID_MSG_LEN; - taosArrayDestroy(batchRsp); - return -1; - } + SBatchMsg* req = taosArrayGet(batchReq.pMsgs, i); - req.msgLen = ntohl(*(int32_t *)((char *)pMsg->pCont + offset)); - offset += sizeof(req.msgLen); - if (offset >= pMsg->contLen) { - mError("offset %d is bigger than contLen %d", offset, pMsg->contLen); - terrno = TSDB_CODE_INVALID_MSG_LEN; - taosArrayDestroy(batchRsp); - return -1; - } - - req.msg = (char *)pMsg->pCont + offset; - offset += req.msgLen; - - reqMsg.msgType = req.msgType; - reqMsg.pCont = req.msg; - reqMsg.contLen = req.msgLen; + reqMsg.msgType = req->msgType; + reqMsg.pCont = req->msg; + reqMsg.contLen = req->msgLen; reqMsg.info.rsp = NULL; reqMsg.info.rspLen = 0; - MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(req.msgType)]; + MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(req->msgType)]; if (fp == NULL) { mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_MSG_NOT_PROCESSED; - taosArrayDestroy(batchRsp); + taosArrayDestroy(batchRsp.pRsps); return -1; } @@ -146,49 +125,29 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { } else { rsp.rspCode = 0; } - rsp.msgIdx = req.msgIdx; + rsp.msgIdx = req->msgIdx; rsp.reqType = reqMsg.msgType; rsp.msgLen = reqMsg.info.rspLen; rsp.msg = reqMsg.info.rsp; - taosArrayPush(batchRsp, &rsp); - - rspSize += sizeof(rsp) + rsp.msgLen - POINTER_BYTES; + taosArrayPush(batchRsp.pRsps, &rsp); } - rspSize += sizeof(int32_t); - offset = 0; - + rspSize = tSerializeSBatchRsp(NULL, 0, &batchRsp); + if (rspSize < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } pRsp = rpcMallocCont(rspSize); if (pRsp == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - - *(int32_t *)((char *)pRsp + offset) = htonl(msgNum); - offset += sizeof(msgNum); - for (int32_t i = 0; i < msgNum; ++i) { - SBatchRsp *p = taosArrayGet(batchRsp, i); - - *(int32_t *)((char *)pRsp + offset) = htonl(p->reqType); - offset += sizeof(p->reqType); - *(int32_t *)((char *)pRsp + offset) = htonl(p->msgIdx); - offset += sizeof(p->msgIdx); - *(int32_t *)((char *)pRsp + offset) = htonl(p->msgLen); - offset += sizeof(p->msgLen); - *(int32_t *)((char *)pRsp + offset) = htonl(p->rspCode); - offset += sizeof(p->rspCode); - if (p->msg != NULL) { - memcpy((char *)pRsp + offset, p->msg, p->msgLen); - offset += p->msgLen; - } - - rpcFreeCont(p->msg); + if (tSerializeSBatchRsp(pRsp, rspSize, &batchRsp) < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; } - taosArrayDestroy(batchRsp); - batchRsp = NULL; - _exit: pMsg->info.rsp = pRsp; @@ -198,7 +157,8 @@ _exit: mError("mnd get batch meta failed cause of %s", tstrerror(code)); } - taosArrayDestroyEx(batchRsp, tFreeSBatchRsp); + taosArrayDestroyEx(batchReq.pMsgs, tFreeSBatchReqMsg); + taosArrayDestroyEx(batchRsp.pRsps, mnodeFreeSBatchRspMsg); return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 15769ef4c900625da55dca6b03722a3ab0cabd0c..8e9aab0afd129dbf1fab3d301d6da406590db3d2 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -264,77 +264,55 @@ _exit: return TSDB_CODE_SUCCESS; } +static FORCE_INLINE void vnodeFreeSBatchRspMsg(void* p) { + if (NULL == p) { + return; + } + + SBatchRspMsg* pRsp = (SBatchRspMsg*)p; + rpcFreeCont(pRsp->msg); +} + + int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; - int32_t offset = 0; int32_t rspSize = 0; - SBatchReq *batchReq = (SBatchReq *)pMsg->pCont; - int32_t msgNum = ntohl(batchReq->msgNum); - offset += sizeof(SBatchReq); - SBatchMsg req = {0}; - SBatchRsp rsp = {0}; + SBatchReq batchReq = {0}; + SBatchMsg *req = NULL; + SBatchRspMsg rsp = {0}; + SBatchRsp batchRsp = {0}; SRpcMsg reqMsg = *pMsg; SRpcMsg rspMsg = {0}; void *pRsp = NULL; + if (tDeserializeSBatchReq(pMsg->pCont, pMsg->contLen, &batchReq)) { + code = TSDB_CODE_OUT_OF_MEMORY; + qError("tDeserializeSBatchReq failed"); + goto _exit; + } + + int32_t msgNum = taosArrayGetSize(batchReq.pMsgs); if (msgNum >= MAX_META_MSG_IN_BATCH) { code = TSDB_CODE_INVALID_MSG; qError("too many msgs %d in vnode batch meta req", msgNum); goto _exit; } - SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp)); - if (NULL == batchRsp) { + batchRsp.pRsps = taosArrayInit(msgNum, sizeof(SBatchRspMsg)); + if (NULL == batchRsp.pRsps) { code = TSDB_CODE_OUT_OF_MEMORY; + qError("taosArrayInit %d SBatchRspMsg failed", msgNum); goto _exit; } for (int32_t i = 0; i < msgNum; ++i) { - if (offset >= pMsg->contLen) { - qError("vnode offset %d is bigger than contLen %d", offset, pMsg->contLen); - terrno = TSDB_CODE_INVALID_MSG_LEN; - taosArrayDestroy(batchRsp); - return -1; - } - - req.msgIdx = ntohl(*(int32_t *)((char *)pMsg->pCont + offset)); - offset += sizeof(req.msgIdx); - - if (offset >= pMsg->contLen) { - qError("vnode offset %d is bigger than contLen %d", offset, pMsg->contLen); - terrno = TSDB_CODE_INVALID_MSG_LEN; - taosArrayDestroy(batchRsp); - return -1; - } - - req.msgType = ntohl(*(int32_t *)((char *)pMsg->pCont + offset)); - offset += sizeof(req.msgType); - - if (offset >= pMsg->contLen) { - qError("vnode offset %d is bigger than contLen %d", offset, pMsg->contLen); - terrno = TSDB_CODE_INVALID_MSG_LEN; - taosArrayDestroy(batchRsp); - return -1; - } - - req.msgLen = ntohl(*(int32_t *)((char *)pMsg->pCont + offset)); - offset += sizeof(req.msgLen); - - if (offset >= pMsg->contLen) { - qError("vnode offset %d is bigger than contLen %d", offset, pMsg->contLen); - terrno = TSDB_CODE_INVALID_MSG_LEN; - taosArrayDestroy(batchRsp); - return -1; - } - - req.msg = (char *)pMsg->pCont + offset; - offset += req.msgLen; - - reqMsg.msgType = req.msgType; - reqMsg.pCont = req.msg; - reqMsg.contLen = req.msgLen; - - switch (req.msgType) { + req = taosArrayGet(batchReq.pMsgs, i); + + reqMsg.msgType = req->msgType; + reqMsg.pCont = req->msg; + reqMsg.contLen = req->msgLen; + + switch (req->msgType) { case TDMT_VND_TABLE_META: vnodeGetTableMeta(pVnode, &reqMsg, false); break; @@ -342,63 +320,40 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { vnodeGetTableCfg(pVnode, &reqMsg, false); break; default: - qError("invalid req msgType %d", req.msgType); + qError("invalid req msgType %d", req->msgType); reqMsg.code = TSDB_CODE_INVALID_MSG; reqMsg.pCont = NULL; reqMsg.contLen = 0; break; } - rsp.msgIdx = req.msgIdx; + rsp.msgIdx = req->msgIdx; rsp.reqType = reqMsg.msgType; rsp.msgLen = reqMsg.contLen; rsp.rspCode = reqMsg.code; rsp.msg = reqMsg.pCont; - taosArrayPush(batchRsp, &rsp); - - rspSize += sizeof(rsp) + rsp.msgLen - POINTER_BYTES; + taosArrayPush(batchRsp.pRsps, &rsp); } - rspSize += sizeof(int32_t); - offset = 0; - - if (rspSize > MAX_META_BATCH_RSP_SIZE) { - qError("rspSize:%d overload", rspSize); - code = TSDB_CODE_INVALID_MSG_LEN; + rspSize = tSerializeSBatchRsp(NULL, 0, &batchRsp); + if (rspSize < 0) { + qError("tSerializeSBatchRsp failed"); + code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - pRsp = rpcMallocCont(rspSize); if (pRsp == NULL) { + qError("rpcMallocCont %d failed", rspSize); code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - - *(int32_t *)((char *)pRsp + offset) = htonl(msgNum); - offset += sizeof(msgNum); - for (int32_t i = 0; i < msgNum; ++i) { - SBatchRsp *p = taosArrayGet(batchRsp, i); - - *(int32_t *)((char *)pRsp + offset) = htonl(p->reqType); - offset += sizeof(p->reqType); - *(int32_t *)((char *)pRsp + offset) = htonl(p->msgIdx); - offset += sizeof(p->msgIdx); - *(int32_t *)((char *)pRsp + offset) = htonl(p->msgLen); - offset += sizeof(p->msgLen); - *(int32_t *)((char *)pRsp + offset) = htonl(p->rspCode); - offset += sizeof(p->rspCode); - if (p->msg) { - memcpy((char *)pRsp + offset, p->msg, p->msgLen); - offset += p->msgLen; - } - - taosMemoryFreeClear(p->msg); + if (tSerializeSBatchRsp(pRsp, rspSize, &batchRsp) < 0) { + qError("tSerializeSBatchRsp %d failed", rspSize); + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; } - taosArrayDestroy(batchRsp); - batchRsp = NULL; - _exit: rspMsg.info = pMsg->info; @@ -411,7 +366,8 @@ _exit: qError("vnd get batch meta failed cause of %s", tstrerror(code)); } - taosArrayDestroyEx(batchRsp, tFreeSBatchRsp); + taosArrayDestroyEx(batchReq.pMsgs, tFreeSBatchReqMsg); + taosArrayDestroyEx(batchRsp.pRsps, tFreeSBatchRspMsg); tmsgSendRsp(&rspMsg); diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 4d3a3a1ab41ea01090ff7a350f7d9638978de1a3..c95d0fe4620f7a068813701588da77d9d149a936 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -234,7 +234,6 @@ typedef struct SCatalog { typedef struct SCtgBatch { int32_t batchId; int32_t msgType; - int32_t msgSize; SArray* pMsgs; SRequestConnInfo conn; char dbFName[TSDB_DB_FNAME_LEN]; diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 23bccb0835eee85781c4fcbfe5f152119ce5acbf..753c4ce9171a7acc6a1b5d065751b0ca32268972 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -26,19 +26,29 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu SCatalog* pCtg = pJob->pCtg; int32_t taskNum = taosArrayGetSize(cbParam->taskId); SDataBuf taskMsg = *pMsg; - int32_t offset = 0; - int32_t msgNum = (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0; + int32_t msgNum = 0; + SBatchRsp batchRsp = {0}; + SBatchRspMsg rsp = {0}; + SBatchRspMsg *pRsp = NULL; + + if (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) { + if (tDeserializeSBatchRsp(pMsg->pData, pMsg->len, &batchRsp) < 0) { + ctgError("tDeserializeSBatchRsp failed, msgLen:%d", pMsg->len); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + msgNum = taosArrayGetSize(batchRsp.pRsps); + } + ASSERT(taskNum == msgNum || 0 == msgNum); ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, TMSG_INFO(cbParam->reqType + 1)); - offset += sizeof(msgNum); - SBatchRsp rsp = {0}; SHashObj* pBatchs = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); if (NULL == pBatchs) { ctgError("taosHashInit %d batch failed", taskNum); - CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } for (int32_t i = 0; i < taskNum; ++i) { @@ -46,25 +56,18 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu int32_t* msgIdx = taosArrayGet(cbParam->msgIdx, i); SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId); if (msgNum > 0) { - rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); - offset += sizeof(rsp.reqType); - rsp.msgIdx = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); - offset += sizeof(rsp.msgIdx); - rsp.msgLen = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); - offset += sizeof(rsp.msgLen); - rsp.rspCode = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); - offset += sizeof(rsp.rspCode); - rsp.msg = ((char*)pMsg->pData) + offset; - offset += rsp.msgLen; - - taskMsg.msgType = rsp.reqType; - taskMsg.pData = rsp.msg; - taskMsg.len = rsp.msgLen; - - ASSERT(rsp.msgIdx == *msgIdx); + pRsp = taosArrayGet(batchRsp.pRsps, i); + + taskMsg.msgType = pRsp->reqType; + taskMsg.pData = pRsp->msg; + taskMsg.len = pRsp->msgLen; + + ASSERT(pRsp->msgIdx == *msgIdx); } else { - rsp.msgIdx = *msgIdx; - rsp.reqType = -1; + pRsp = &rsp; + pRsp->msgIdx = *msgIdx; + pRsp->reqType = -1; + pRsp->rspCode = 0; taskMsg.msgType = -1; taskMsg.pData = NULL; taskMsg.len = 0; @@ -72,20 +75,22 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu SCtgTaskReq tReq; tReq.pTask = pTask; - tReq.msgIdx = rsp.msgIdx; + tReq.msgIdx = pRsp->msgIdx; SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq.msgIdx); pMsgCtx->pBatchs = pBatchs; ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId, - rsp.msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs); + pRsp->msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs); - (*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode)); + (*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, pRsp->reqType, &taskMsg, (pRsp->rspCode ? pRsp->rspCode : rspCode)); } CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs)); _return: + taosArrayDestroyEx(batchRsp.pRsps, tFreeSBatchRspMsg); + ctgFreeBatchs(pBatchs); CTG_RET(code); } @@ -481,7 +486,6 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT if (NULL == taosArrayPush(newBatch.pMsgIdxs, &req.msgIdx)) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - newBatch.msgSize = sizeof(SBatchReq) + sizeof(req) + msgSize - POINTER_BYTES; if (vgId > 0) { SName* pName = NULL; @@ -533,8 +537,6 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - pBatch->msgSize += sizeof(req) + msgSize - POINTER_BYTES; - if (vgId > 0) { SName* pName = NULL; if (TDMT_VND_TABLE_CFG == msgType) { @@ -570,38 +572,35 @@ _return: return code; } -int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) { - *msg = taosMemoryCalloc(1, pBatch->msgSize); - if (NULL == (*msg)) { - CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); - } - - int32_t offset = 0; +int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg, int32_t *pSize) { int32_t num = taosArrayGetSize(pBatch->pMsgs); - SBatchReq* pBatchReq = (SBatchReq*)(*msg); - if (num >= CTG_MAX_REQ_IN_BATCH) { qError("too many msgs %d in one batch request", num); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - pBatchReq->header.vgId = htonl(vgId); - pBatchReq->msgNum = htonl(num); - offset += sizeof(SBatchReq); - - for (int32_t i = 0; i < num; ++i) { - SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i); - *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgIdx); - offset += sizeof(pReq->msgIdx); - *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgType); - offset += sizeof(pReq->msgType); - *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgLen); - offset += sizeof(pReq->msgLen); - memcpy((char*)(*msg) + offset, pReq->msg, pReq->msgLen); - offset += pReq->msgLen; + SBatchReq batchReq = {0}; + + batchReq.header.vgId = vgId; + batchReq.pMsgs = pBatch->pMsgs; + + int32_t msgSize = tSerializeSBatchReq(NULL, 0, &batchReq); + if (msgSize < 0) { + qError("tSerializeSBatchReq failed"); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + *msg = taosMemoryCalloc(1, msgSize); + if (NULL == (*msg)) { + qError("calloc batchReq msg failed, size:%d", msgSize); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + if (tSerializeSBatchReq(*msg, msgSize, &batchReq) < 0) { + qError("tSerializeSBatchReq failed"); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - ASSERT(pBatch->msgSize == offset); + *pSize = msgSize; qDebug("batch req %d to vg %d msg built with %d meta reqs", pBatch->batchId, vgId, num); @@ -616,12 +615,13 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) { size_t len = 0; int32_t* vgId = taosHashGetKey(p, &len); SCtgBatch* pBatch = (SCtgBatch*)p; + int32_t msgSize = 0; ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId); - CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg)); + CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg, &msgSize)); code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->pMsgIdxs, - pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize); + pBatch->dbFName, *vgId, pBatch->msgType, msg, msgSize); pBatch->pTaskIds = NULL; CTG_ERR_JRET(code); diff --git a/source/util/src/thash.c b/source/util/src/thash.c index c3d4668e1146226d8420d43ae624547c7f35bd37..f27c4c9be10c4d019cc3cbf75fa2f32aaa7e039e 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -147,7 +147,7 @@ static FORCE_INLINE SHashNode *doSearchInEntryList(SHashObj *pHashObj, SHashEntr uint32_t hashVal) { SHashNode *pNode = pe->next; while (pNode) { - atomic_add_fetch_64(&pHashObj->compTimes, 1); + //atomic_add_fetch_64(&pHashObj->compTimes, 1); if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && pNode->removed == 0) { assert(pNode->hashVal == hashVal);