diff --git a/include/common/tmsg.h b/include/common/tmsg.h index fe20b5de70839c118f3f8d5249fa079489e8bb8a..8ebc8b80f256a782a63a05878941030f8479489d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -895,7 +895,7 @@ void tFreeSShowReq(SShowReq* pReq); typedef struct { int64_t showId; STableMetaRsp tableMeta; -} SShowRsp; +} SShowRsp, SVShowTablesRsp; int32_t tSerializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp); int32_t tDeserializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp); @@ -1428,11 +1428,6 @@ typedef struct { SMsgHead head; } SVShowTablesReq; -typedef struct { - int64_t id; - STableMetaRsp metaInfo; -} SVShowTablesRsp; - typedef struct { SMsgHead head; int32_t id; @@ -1637,7 +1632,7 @@ int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchR static FORCE_INLINE int32_t tEncodeSKv(SCoder* pEncoder, const SKv* pKv) { if (tEncodeI32(pEncoder, pKv->key) < 0) return -1; if (tEncodeI32(pEncoder, pKv->valueLen) < 0) return -1; - if (tEncodeCStrWithLen(pEncoder, (const char*)pKv->value, pKv->valueLen) < 0) return -1; + if (tEncodeBinary(pEncoder, (const char*)pKv->value, pKv->valueLen) < 0) return -1; return 0; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ccebc71ae82a533c230ad81a79cd7b68544512f2..acbefc6bea9dd0b26e20cf58d770840d1d3bf72d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1485,7 +1485,7 @@ int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) { if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; if (tEncodeI32(&encoder, pReq->payloadLen) < 0) return -1; if (pReq->payloadLen > 0) { - if (tEncodeCStr(&encoder, pReq->payload) < 0) return -1; + if (tEncodeBinary(&encoder, pReq->payload, pReq->payloadLen) < 0) return -1; } tEndEncode(&encoder); @@ -1706,12 +1706,13 @@ void tFreeSShowRsp(SShowRsp *pRsp) { tFreeSTableMetaRsp(&pRsp->tableMeta); } int32_t tSerializeSTableInfoReq(void *buf, int32_t bufLen, STableInfoReq *pReq) { int32_t headLen = sizeof(SMsgHead); - SCoder encoder = {0}; - tCoderInit(&encoder, TD_LITTLE_ENDIAN, (char *)buf + headLen, bufLen - headLen, TD_ENCODER); + if (buf != NULL) { + buf = (char *)buf + headLen; + bufLen -= headLen; + } - SMsgHead *pHead = &pReq->header; - pHead->vgId = htonl(pHead->vgId); - pHead->contLen = htonl(pHead->vgId); + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); if (tStartEncode(&encoder) < 0) return -1; if (tEncodeCStr(&encoder, pReq->dbFName) < 0) return -1; @@ -1720,16 +1721,25 @@ int32_t tSerializeSTableInfoReq(void *buf, int32_t bufLen, STableInfoReq *pReq) int32_t tlen = encoder.pos; tCoderClear(&encoder); - return tlen; + + if (buf != NULL) { + SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen); + pHead->vgId = htonl(pReq->header.vgId); + pHead->contLen = htonl(tlen + headLen); + } + + return tlen + headLen; } int32_t tDeserializeSTableInfoReq(void *buf, int32_t bufLen, STableInfoReq *pReq) { - SMsgHead *pHead = &pReq->header; - pHead->vgId = htonl(pHead->vgId); - pHead->contLen = htonl(pHead->vgId); + int32_t headLen = sizeof(SMsgHead); + + SMsgHead *pHead = buf; + pHead->vgId = pReq->header.vgId; + pHead->contLen = pReq->header.contLen; SCoder decoder = {0}; - tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + tCoderInit(&decoder, TD_LITTLE_ENDIAN, (char *)buf + headLen, bufLen - headLen, TD_DECODER); if (tStartDecode(&decoder) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->dbFName) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index c74525c36a82bb4cb6faf2f01bf244d76fba8201..3198ded37e0e0b8534dd79d6f302c749cfde3f90 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -120,6 +120,7 @@ static int32_t mndProcessShowReq(SMnodeMsg *pReq) { SShowMgmt *pMgmt = &pMnode->showMgmt; int32_t code = -1; SShowReq showReq = {0}; + SShowRsp showRsp = {0}; if (tDeserializeSShowReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &showReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -142,7 +143,6 @@ static int32_t mndProcessShowReq(SMnodeMsg *pReq) { goto SHOW_OVER; } - SShowRsp showRsp = {0}; showRsp.showId = pShow->id; showRsp.tableMeta.pSchemas = calloc(TSDB_MAX_COLUMNS, sizeof(SSchema)); if (showRsp.tableMeta.pSchemas == NULL) { diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 8689c4d010674d10fef36362c5037f7a63583245..436b0b996133d413eb769fb1780cd43926f407ec 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1183,7 +1183,7 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa taosRLockLatch(&pStb->lock); int32_t totalCols = pStb->numOfColumns + pStb->numOfTags; - pRsp->pSchemas = malloc(totalCols * sizeof(SSchema)); + pRsp->pSchemas = calloc(totalCols, sizeof(SSchema)); if (pRsp->pSchemas == NULL) { taosRUnLockLatch(&pStb->lock); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1222,6 +1222,7 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa } taosRUnLockLatch(&pStb->lock); + return 0; } static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) { @@ -1253,7 +1254,7 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) { STableInfoReq infoReq = {0}; STableMetaRsp metaRsp = {0}; - if (tSerializeSTableInfoReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &infoReq) != 0) { + if (tDeserializeSTableInfoReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &infoReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; goto RETRIEVE_META_OVER; } @@ -1269,7 +1270,7 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) { goto RETRIEVE_META_OVER; } - void *pRsp = malloc(rspLen); + void *pRsp = rpcMallocCont(rspLen); if (pRsp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto RETRIEVE_META_OVER; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 9d81a17f816c5bf6be5b76d05df49844b8eb390c..7999edbcff5a3951e0bf91903028c22221271d22 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -34,7 +34,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg); -static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg); +static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pReq); static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); @@ -334,11 +334,16 @@ static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg) { return 0; } -static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - STableInfoReq *pInfo = pMsg->rpcMsg.pCont; +static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + STableInfoReq infoReq = {0}; + + if (tSerializeSTableInfoReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &infoReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } - mDebug("topic:%s, start to retrieve meta", pInfo->tbName); + mDebug("topic:%s, start to retrieve meta", infoReq.tbName); #if 0 SDbObj *pDb = mndAcquireDbByTopic(pMnode, pInfo->tableFname); diff --git a/source/dnode/mnode/impl/test/stb/stb.cpp b/source/dnode/mnode/impl/test/stb/stb.cpp index 4ed13b6f1c67c1a7351cdc0069e31dea1c4d2696..573c1f4846e386b7984919c3563c63254f613ed2 100644 --- a/source/dnode/mnode/impl/test/stb/stb.cpp +++ b/source/dnode/mnode/impl/test/stb/stb.cpp @@ -339,10 +339,13 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { // ----- meta ------ { - int32_t contLen = sizeof(STableInfoReq); - STableInfoReq* pReq = (STableInfoReq*)rpcMallocCont(contLen); - strcpy(pReq->dbFName, dbname); - strcpy(pReq->tbName, "stb"); + STableInfoReq infoReq = {0}; + strcpy(infoReq.dbFName, dbname); + strcpy(infoReq.tbName, "stb"); + + int32_t contLen = tSerializeSTableInfoReq(NULL, 0, &infoReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSTableInfoReq(pReq, contLen, &infoReq); SRpcMsg* pMsg = test.SendReq(TDMT_MND_STB_META, pReq, contLen); ASSERT_NE(pMsg, nullptr); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index ddf2a0594cb435c17a42247d2d6721c80b992c7a..f779949f14caf53f8205633a3ea0779b2ec214bd 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -71,7 +71,6 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { } static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { - STableInfoReq * pReq = (STableInfoReq *)(pMsg->pCont); STbCfg * pTbCfg = NULL; STbCfg * pStbCfg = NULL; tb_uid_t uid; @@ -79,12 +78,19 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { int32_t nTagCols; SSchemaWrapper *pSW = NULL; STableMetaRsp *pTbMetaMsg = NULL; - SSchema * pTagSchema; + STableMetaRsp metaRsp = {0}; + SSchema *pTagSchema; SRpcMsg rpcMsg; int msgLen = 0; int32_t code = TSDB_CODE_VND_APP_ERROR; - pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tbName, &uid); + STableInfoReq infoReq = {0}; + if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + pTbCfg = metaGetTbInfoByName(pVnode->pMeta, infoReq.tbName, &uid); if (pTbCfg == NULL) { code = TSDB_CODE_VND_TB_NOT_EXIST; goto _exit; @@ -114,16 +120,15 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { pTagSchema = NULL; } - STableMetaRsp metaRsp = {0}; - metaRsp.pSchemas = malloc( sizeof(SSchema) * (nCols + nTagCols)); + metaRsp.pSchemas = calloc(nCols + nTagCols, sizeof(SSchema)); if (metaRsp.pSchemas == NULL) { code = TSDB_CODE_VND_OUT_OF_MEMORY; goto _exit; } metaRsp.dbId = htobe64(pVnode->config.dbId); - memcpy(metaRsp.dbFName, pReq->dbFName, sizeof(metaRsp.dbFName)); - strcpy(metaRsp.tbName, pReq->tbName); + memcpy(metaRsp.dbFName, infoReq.dbFName, sizeof(metaRsp.dbFName)); + strcpy(metaRsp.tbName, infoReq.tbName); if (pTbCfg->type == META_CHILD_TABLE) { strcpy(metaRsp.stbName, pStbCfg->name); metaRsp.suid = pTbCfg->ctbCfg.suid; @@ -148,11 +153,12 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { goto _exit; } - void *pRsp = malloc(rspLen); + void *pRsp = rpcMallocCont(rspLen); if (pRsp == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } + tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp); code = 0; diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 0bb24244c51e8227f6a2db97aadb56d9324ca0fa..753ac75860539574f6691dea3e500c241f7dc723 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -30,40 +30,33 @@ int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int3 return TSDB_CODE_TSC_INVALID_INPUT; } - int32_t estimateSize = sizeof(STableInfoReq); - if (NULL == *msg || msgSize < estimateSize) { - tfree(*msg); - *msg = rpcMallocCont(estimateSize); - if (NULL == *msg) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - } - - STableInfoReq *bMsg = (STableInfoReq *)*msg; - - bMsg->header.vgId = htonl(pInput->vgId); - + STableInfoReq infoReq = {0}; + infoReq.header.vgId = pInput->vgId; if (pInput->dbFName) { - tstrncpy(bMsg->dbFName, pInput->dbFName, tListLen(bMsg->dbFName)); + tstrncpy(infoReq.dbFName, pInput->dbFName, TSDB_DB_FNAME_LEN); } + tstrncpy(infoReq.tbName, pInput->tbName, TSDB_TABLE_NAME_LEN); + + int32_t bufLen = tSerializeSTableInfoReq(NULL, 0, &infoReq); + void *pBuf = rpcMallocCont(bufLen); + tSerializeSTableInfoReq(pBuf, bufLen, &infoReq); - tstrncpy(bMsg->tbName, pInput->tbName, tListLen(bMsg->tbName)); + *msg = pBuf; + *msgLen = bufLen; - *msgLen = (int32_t)sizeof(*bMsg); return TSDB_CODE_SUCCESS; } int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) { - if (NULL == input || NULL == msg || NULL == msgLen) { + SBuildUseDBInput *pInput = input; + if (NULL == pInput || NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; } - SBuildUseDBInput *bInput = input; - SUseDbReq usedbReq = {0}; - strncpy(usedbReq.db, bInput->db, sizeof(usedbReq.db)); + strncpy(usedbReq.db, pInput->db, sizeof(usedbReq.db)); usedbReq.db[sizeof(usedbReq.db) - 1] = 0; - usedbReq.vgVersion = bInput->vgVersion; + usedbReq.vgVersion = pInput->vgVersion; int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq); void *pBuf = rpcMallocCont(bufLen); @@ -99,15 +92,14 @@ int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) { memcpy(pOut->db, usedbRsp.db, TSDB_DB_FNAME_LEN); pOut->dbId = usedbRsp.uid; - pOut->dbVgroup->vgVersion = usedbRsp.vgVersion; - pOut->dbVgroup->hashMethod = usedbRsp.hashMethod; - pOut->dbVgroup = calloc(1, sizeof(SDBVgInfo)); if (NULL == pOut->dbVgroup) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto PROCESS_USEDB_OVER; } + pOut->dbVgroup->vgVersion = usedbRsp.vgVersion; + pOut->dbVgroup->hashMethod = usedbRsp.hashMethod; pOut->dbVgroup->vgHash = taosHashInit(usedbRsp.vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (NULL == pOut->dbVgroup->vgHash) { diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 76d3b7cdd0d6dbfb3e9a131b8838677ded6e55c3..a1b431fd7d92b3b6f1dcb41f6ecb136315527c53 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -172,45 +172,54 @@ int32_t qwBuildAndSendDropRsp(void *connection, int32_t code) { } int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { - int32_t numOfCols = 6; - int32_t msgSize = sizeof(SVShowTablesRsp) + sizeof(SSchema) * numOfCols; - - SVShowTablesRsp *pRsp = (SVShowTablesRsp *)rpcMallocCont(msgSize); + int32_t numOfCols = 6; + SVShowTablesRsp showRsp = {0}; + + // showRsp.showId = 1; + showRsp.tableMeta.pSchemas = calloc(numOfCols, sizeof(SSchema)); + if (showRsp.tableMeta.pSchemas == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } int32_t cols = 0; - SSchema *pSchema = pRsp->metaInfo.pSchemas; + SSchema *pSchema = showRsp.tableMeta.pSchemas; const SSchema *s = tGetTbnameColumnSchema(); - *pSchema = createSchema(s->type, htonl(s->bytes), htonl(++cols), "name"); + *pSchema = createSchema(s->type, s->bytes, ++cols, "name"); pSchema++; int32_t type = TSDB_DATA_TYPE_TIMESTAMP; - *pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "created"); + *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "created"); pSchema++; type = TSDB_DATA_TYPE_SMALLINT; - *pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "columns"); + *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "columns"); pSchema++; - *pSchema = createSchema(s->type, htonl(s->bytes), htonl(++cols), "stable"); + *pSchema = createSchema(s->type, s->bytes, ++cols, "stable"); pSchema++; type = TSDB_DATA_TYPE_BIGINT; - *pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "uid"); + *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "uid"); pSchema++; type = TSDB_DATA_TYPE_INT; - *pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "vgId"); + *pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "vgId"); assert(cols == numOfCols); - pRsp->metaInfo.numOfColumns = htonl(cols); + showRsp.tableMeta.numOfColumns = cols; + + int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp); + void *pBuf = rpcMallocCont(bufLen); + tSerializeSShowRsp(pBuf, bufLen, &showRsp); SRpcMsg rpcMsg = { - .handle = pMsg->handle, + .handle = pMsg->handle, .ahandle = pMsg->ahandle, - .pCont = pRsp, - .contLen = msgSize, - .code = code, + .pCont = pBuf, + .contLen = bufLen, + .code = code, }; rpcSendResponse(&rpcMsg);