提交 8271c508 编写于 作者: G Ganlin Zhao

[TD-5623]<feature>: append compressed sizes for each col afterdata

上级 d31eee30
...@@ -2678,25 +2678,30 @@ int tscProcessQueryRsp(SSqlObj *pSql) { ...@@ -2678,25 +2678,30 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
return 0; return 0;
} }
static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char *data, int8_t compressed) { static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char *data, int8_t compressed, int compLen) {
int32_t decompLen = 0; int32_t decompLen = 0;
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput; int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
int32_t *compSizes = tcalloc(numOfCols, sizeof(int32_t));
char *pData = data;
compSizes = (int32_t *)(pData + compLen);
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1); TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1); int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
char *outputBuf = tcalloc(pRes->numOfRows, (pField->bytes + offset)); char *outputBuf = tcalloc(pRes->numOfRows, (pField->bytes + offset));
char *p = outputBuf; char *p = outputBuf;
int32_t bufOffset = 0, compSize = 0; int32_t bufOffset = 0;
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
bufOffset = pInfo->field.bytes * pRes->numOfRows; bufOffset = pInfo->field.bytes * pRes->numOfRows;
int32_t flen = (*(tDataTypes[pInfo->field.type].decompFunc))(data, compSize, pRes->numOfRows, p, bufOffset, int32_t flen = (*(tDataTypes[pInfo->field.type].decompFunc))(pData, compSizes[i], pRes->numOfRows, p, bufOffset,
compressed, NULL, 0); compressed, NULL, 0);
p += flen; p += flen;
decompLen +=flen; decompLen +=flen;
pData += compSizes[i];
} }
tfree(outputBuf); tfree(outputBuf);
tfree(compSizes);
} }
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
...@@ -2726,7 +2731,8 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { ...@@ -2726,7 +2731,8 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
//Decompress col data if compressed from server //Decompress col data if compressed from server
if (pRes->compressed) { if (pRes->compressed) {
decompressQueryColData(pRes, pQueryInfo, pRes->data, pRes->compressed); int32_t compLen = htonl(pRetrieve->compLen);
decompressQueryColData(pRes, pQueryInfo, pRes->data, pRes->compressed, compLen);
} }
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......
...@@ -537,6 +537,7 @@ typedef struct SRetrieveTableRsp { ...@@ -537,6 +537,7 @@ typedef struct SRetrieveTableRsp {
int64_t offset; // updated offset value for multi-vnode projection query int64_t offset; // updated offset value for multi-vnode projection query
int64_t useconds; int64_t useconds;
int8_t compressed; int8_t compressed;
int32_t compLen;
char data[]; char data[];
} SRetrieveTableRsp; } SRetrieveTableRsp;
......
...@@ -4211,7 +4211,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data ...@@ -4211,7 +4211,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput; int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput;
if (compressed) { if (compressed) {
compSizes = tmalloc(numOfCols); compSizes = tcalloc(numOfCols, sizeof(int32_t));
} }
if (pQueryAttr->pExpr2 == NULL) { if (pQueryAttr->pExpr2 == NULL) {
......
...@@ -366,9 +366,11 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -366,9 +366,11 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
} }
if ((*pRsp)->compressed && compLen != 0) { if ((*pRsp)->compressed && compLen != 0) {
*contLen = *contLen - pQueryAttr->resultRowSize * s + compLen; int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput;
*contLen = *contLen - pQueryAttr->resultRowSize * s + compLen + numOfCols * sizeof(int32_t);
*pRsp = (SRetrieveTableRsp *)rpcReallocCont(*pRsp, *contLen); *pRsp = (SRetrieveTableRsp *)rpcReallocCont(*pRsp, *contLen);
} }
(*pRsp)->compLen = htonl(compLen);
pQInfo->rspContext = NULL; pQInfo->rspContext = NULL;
pQInfo->dataReady = QUERY_RESULT_NOT_READY; pQInfo->dataReady = QUERY_RESULT_NOT_READY;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册