From 8271c508899c6146be602527006b59079895f3c2 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 11 Aug 2021 21:42:59 +0800 Subject: [PATCH] [TD-5623]: append compressed sizes for each col afterdata --- src/client/src/tscServer.c | 14 ++++++++++---- src/inc/taosmsg.h | 1 + src/query/src/qExecutor.c | 2 +- src/query/src/queryMain.c | 4 +++- 4 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 399a47c4d6..f270d7b849 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2678,25 +2678,30 @@ int tscProcessQueryRsp(SSqlObj *pSql) { return 0; } -static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char *data, int8_t compressed) { +static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char *data, int8_t compressed, int compLen) { int32_t decompLen = 0; int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput; + int32_t *compSizes = tcalloc(numOfCols, sizeof(int32_t)); + char *pData = data; + compSizes = (int32_t *)(pData + compLen); TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1); int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1); char *outputBuf = tcalloc(pRes->numOfRows, (pField->bytes + offset)); char *p = outputBuf; - int32_t bufOffset = 0, compSize = 0; + int32_t bufOffset = 0; for(int32_t i = 0; i < numOfCols; ++i) { SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); bufOffset = pInfo->field.bytes * pRes->numOfRows; - int32_t flen = (*(tDataTypes[pInfo->field.type].decompFunc))(data, compSize, pRes->numOfRows, p, bufOffset, + int32_t flen = (*(tDataTypes[pInfo->field.type].decompFunc))(pData, compSizes[i], pRes->numOfRows, p, bufOffset, compressed, NULL, 0); p += flen; decompLen +=flen; + pData += compSizes[i]; } tfree(outputBuf); + tfree(compSizes); } int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { @@ -2726,7 +2731,8 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { //Decompress col data if compressed from server if (pRes->compressed) { - decompressQueryColData(pRes, pQueryInfo, pRes->data, pRes->compressed); + int32_t compLen = htonl(pRetrieve->compLen); + decompressQueryColData(pRes, pQueryInfo, pRes->data, pRes->compressed, compLen); } STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index d8b94b3da1..8f5269c158 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -537,6 +537,7 @@ typedef struct SRetrieveTableRsp { int64_t offset; // updated offset value for multi-vnode projection query int64_t useconds; int8_t compressed; + int32_t compLen; char data[]; } SRetrieveTableRsp; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e687090106..40698dad20 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4211,7 +4211,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput; if (compressed) { - compSizes = tmalloc(numOfCols); + compSizes = tcalloc(numOfCols, sizeof(int32_t)); } if (pQueryAttr->pExpr2 == NULL) { diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index c1cd431ef4..84aa4973a0 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -366,9 +366,11 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co } if ((*pRsp)->compressed && compLen != 0) { - *contLen = *contLen - pQueryAttr->resultRowSize * s + compLen; + int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput; + *contLen = *contLen - pQueryAttr->resultRowSize * s + compLen + numOfCols * sizeof(int32_t); *pRsp = (SRetrieveTableRsp *)rpcReallocCont(*pRsp, *contLen); } + (*pRsp)->compLen = htonl(compLen); pQInfo->rspContext = NULL; pQInfo->dataReady = QUERY_RESULT_NOT_READY; -- GitLab