diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index d30ee32d67c2a9294b3a3aa0b2bc9144829a364c..657d8b78481d20813edabef254285121722aa2bf 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -13,7 +13,10 @@ * along with this program. If not, see . */ +#include #include "os.h" +#include "qPlan.h" +#include "qTableMeta.h" #include "tcmdtype.h" #include "tlockfree.h" #include "trpc.h" @@ -21,10 +24,8 @@ #include "tscLog.h" #include "tscProfile.h" #include "tscUtil.h" -#include "qTableMeta.h" #include "tsclient.h" #include "ttimer.h" -#include "qPlan.h" int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; @@ -2048,16 +2049,27 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { } SSqlCmd *pParentCmd = &pParentSql->cmd; - SHashObj *pSet = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); char* pMsg = pMultiMeta->meta; + char* buf = NULL; + if (pMultiMeta->compressed) { + buf = malloc(pMultiMeta->rawLen - sizeof(SMultiTableMeta)); + int32_t len = tsDecompressString(pMultiMeta->meta, pMultiMeta->contLen - sizeof(SMultiTableMeta), 1, + buf, pMultiMeta->rawLen - sizeof(SMultiTableMeta), ONE_STAGE_COMP, NULL, 0); + assert(len == pMultiMeta->rawLen - sizeof(SMultiTableMeta)); + + pMsg = buf; + } + for (int32_t i = 0; i < pMultiMeta->numOfTables; i++) { STableMetaMsg *pMetaMsg = (STableMetaMsg *)pMsg; int32_t code = tableMetaMsgConvert(pMetaMsg); if (code != TSDB_CODE_SUCCESS) { taosHashCleanup(pSet); taosReleaseRef(tscObjRef, pParentSql->self); + + tfree(buf); return code; } @@ -2066,6 +2078,8 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { tscError("0x%"PRIx64" invalid table meta from mnode, name:%s", pSql->self, pMetaMsg->tableFname); taosHashCleanup(pSet); taosReleaseRef(tscObjRef, pParentSql->self); + + tfree(buf); return TSDB_CODE_TSC_INVALID_VALUE; } @@ -2115,6 +2129,8 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { taosHashCleanup(pSet); taosReleaseRef(tscObjRef, pParentSql->self); + + tfree(buf); return TSDB_CODE_SUCCESS; } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index f20e1535bab725532bf8785cabdbe186448acc15..4e76b6dcc1fb0682e69de5a0e7e0f35440e7e40b 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -760,10 +760,12 @@ typedef struct STableMetaMsg { } STableMetaMsg; typedef struct SMultiTableMeta { - int32_t numOfTables; - int32_t numOfVgroup; - int32_t contLen; - char meta[]; + int32_t numOfTables; + int32_t numOfVgroup; + uint32_t contLen:31; + uint8_t compressed:1; // denote if compressed or not + uint32_t rawLen; // size before compress + char meta[]; } SMultiTableMeta; typedef struct { diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index beeff372aa75a34c4be1857782a76c2426748140..ea5611e683054ae084be61751225419834931dac 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -2892,7 +2892,7 @@ static SMultiTableMeta* ensureMsgBufferSpace(SMultiTableMeta *pMultiMeta, SArray (*totalMallocLen) *= 2; } - pMultiMeta = rpcReallocCont(pMultiMeta, *totalMallocLen); + pMultiMeta = realloc(pMultiMeta, *totalMallocLen); if (pMultiMeta == NULL) { return NULL; } @@ -2923,8 +2923,8 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { } // first malloc 80KB, subsequent reallocation will expand the size as twice of the original size - int32_t totalMallocLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16); - pMultiMeta = rpcMallocCont(totalMallocLen); + int32_t totalMallocLen = sizeof(SMultiTableMeta) + sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16); + pMultiMeta = calloc(1, totalMallocLen); if (pMultiMeta == NULL) { code = TSDB_CODE_MND_OUT_OF_MEMORY; goto _end; @@ -2957,7 +2957,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { int remain = totalMallocLen - pMultiMeta->contLen; if (remain <= sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)) { totalMallocLen *= 2; - pMultiMeta = rpcReallocCont(pMultiMeta, totalMallocLen); + pMultiMeta = realloc(pMultiMeta, totalMallocLen); if (pMultiMeta == NULL) { mnodeDecTableRef(pMsg->pTable); code = TSDB_CODE_MND_OUT_OF_MEMORY; @@ -3027,16 +3027,36 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { pMsg->rpcRsp.len = pMultiMeta->contLen; code = TSDB_CODE_SUCCESS; + char* tmp = rpcMallocCont(pMultiMeta->contLen + 2); + int32_t len = tsCompressString(pMultiMeta->meta, (int32_t)pMultiMeta->contLen - sizeof(SMultiTableMeta), 1, + tmp + sizeof(SMultiTableMeta), (int32_t)pMultiMeta->contLen - sizeof(SMultiTableMeta) + 2, ONE_STAGE_COMP, NULL, 0); + + pMultiMeta->rawLen = pMultiMeta->contLen; + if (len == -1 || len + sizeof(SMultiTableMeta) >= pMultiMeta->contLen + 2) { // compress failed, do not compress this binary data + pMultiMeta->compressed = 0; + memcpy(tmp, pMultiMeta, sizeof(SMultiTableMeta) + pMultiMeta->contLen); + } else { + pMultiMeta->compressed = 1; + pMultiMeta->contLen = sizeof(SMultiTableMeta) + len; + + // copy the header and the compressed payload + memcpy(tmp, pMultiMeta, sizeof(SMultiTableMeta)); + } + + pMsg->rpcRsp.rsp = tmp; + pMsg->rpcRsp.len = pMultiMeta->contLen; + + SMultiTableMeta* p = (SMultiTableMeta*) tmp; + + mDebug("multiTable info build completed, original:%d, compressed:%d, comp:%d", p->rawLen, p->contLen, p->compressed); + _end: tfree(str); tfree(nameList); taosArrayDestroy(pList); pMsg->pTable = NULL; pMsg->pVgroup = NULL; - - if (code != TSDB_CODE_SUCCESS) { - rpcFreeCont(pMultiMeta); - } + tfree(pMultiMeta); return code; }