提交 d31eee30 编写于 作者: G Ganlin Zhao

[TD-5623]<feature>: add server code for decompressing col data

上级 1a3cf2e5
...@@ -303,6 +303,7 @@ typedef struct { ...@@ -303,6 +303,7 @@ typedef struct {
int16_t numOfCols; int16_t numOfCols;
int16_t precision; int16_t precision;
bool completed; bool completed;
bool compressed;
int32_t code; int32_t code;
int32_t numOfGroups; int32_t numOfGroups;
SResRec * pGroupRec; SResRec * pGroupRec;
......
...@@ -2678,6 +2678,27 @@ int tscProcessQueryRsp(SSqlObj *pSql) { ...@@ -2678,6 +2678,27 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
return 0; return 0;
} }
static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char *data, int8_t compressed) {
int32_t decompLen = 0;
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
char *outputBuf = tcalloc(pRes->numOfRows, (pField->bytes + offset));
char *p = outputBuf;
int32_t bufOffset = 0, compSize = 0;
for(int32_t i = 0; i < numOfCols; ++i) {
SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
bufOffset = pInfo->field.bytes * pRes->numOfRows;
int32_t flen = (*(tDataTypes[pInfo->field.type].decompFunc))(data, compSize, pRes->numOfRows, p, bufOffset,
compressed, NULL, 0);
p += flen;
decompLen +=flen;
}
tfree(outputBuf);
}
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
...@@ -2690,18 +2711,24 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { ...@@ -2690,18 +2711,24 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
return pRes->code; return pRes->code;
} }
pRes->numOfRows = htonl(pRetrieve->numOfRows); pRes->numOfRows = htonl(pRetrieve->numOfRows);
pRes->precision = htons(pRetrieve->precision); pRes->precision = htons(pRetrieve->precision);
pRes->offset = htobe64(pRetrieve->offset); pRes->offset = htobe64(pRetrieve->offset);
pRes->useconds = htobe64(pRetrieve->useconds); pRes->useconds = htobe64(pRetrieve->useconds);
pRes->completed = (pRetrieve->completed == 1); pRes->completed = (pRetrieve->completed == 1);
pRes->data = pRetrieve->data; pRes->compressed = (pRetrieve->compressed == 1);
pRes->data = pRetrieve->data;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) { if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
return pRes->code; return pRes->code;
} }
//Decompress col data if compressed from server
if (pRes->compressed) {
decompressQueryColData(pRes, pQueryInfo, pRes->data, pRes->compressed);
}
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if ((pCmd->command == TSDB_SQL_RETRIEVE) || if ((pCmd->command == TSDB_SQL_RETRIEVE) ||
((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) && ((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) &&
......
...@@ -4207,21 +4207,30 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data ...@@ -4207,21 +4207,30 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
SSDataBlock* pRes = pRuntimeEnv->outputBuf; SSDataBlock* pRes = pRuntimeEnv->outputBuf;
int32_t *compSizes = NULL;
int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput;
if (compressed) {
compSizes = tmalloc(numOfCols);
}
if (pQueryAttr->pExpr2 == NULL) { if (pQueryAttr->pExpr2 == NULL) {
for (int32_t col = 0; col < pQueryAttr->numOfOutput; ++col) { for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col);
if (compressed) { if (compressed) {
*compLen += compressQueryColData(pColRes, pRes->info.rows, data, compressed); compSizes[col] = compressQueryColData(pColRes, pRes->info.rows, data, compressed);
*compLen += compSizes[col];
} else { } else {
memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows); memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows);
data += pColRes->info.bytes * pRes->info.rows; data += pColRes->info.bytes * pRes->info.rows;
} }
} }
} else { } else {
for (int32_t col = 0; col < pQueryAttr->numOfExpr2; ++col) { for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col);
if (compressed) { if (compressed) {
*compLen += compressQueryColData(pColRes, numOfRows, data, compressed); compSizes[col] = compressQueryColData(pColRes, numOfRows, data, compressed);
*compLen += compSizes[col];
} else { } else {
memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows); memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows);
data += pColRes->info.bytes * numOfRows; data += pColRes->info.bytes * numOfRows;
...@@ -4229,6 +4238,13 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data ...@@ -4229,6 +4238,13 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
} }
} }
if (compressed) {
memmove(data, (char *)compSizes, numOfCols * sizeof(int32_t));
data += numOfCols * sizeof(int32_t);
tfree(compSizes);
}
int32_t numOfTables = (int32_t) taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap); int32_t numOfTables = (int32_t) taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap);
*(int32_t*)data = htonl(numOfTables); *(int32_t*)data = htonl(numOfTables);
data += sizeof(int32_t); data += sizeof(int32_t);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册