From d31eee309a608224431e6386808e3d272a6891e1 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 11 Aug 2021 18:42:03 +0800 Subject: [PATCH] [TD-5623]: add server code for decompressing col data --- src/client/inc/tsclient.h | 1 + src/client/src/tscServer.c | 39 ++++++++++++++++++++++++++++++++------ src/query/src/qExecutor.c | 24 +++++++++++++++++++---- 3 files changed, 54 insertions(+), 10 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index b6821de87a..b025ca17c3 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -303,6 +303,7 @@ typedef struct { int16_t numOfCols; int16_t precision; bool completed; + bool compressed; int32_t code; int32_t numOfGroups; SResRec * pGroupRec; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 38927be4ac..399a47c4d6 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2678,6 +2678,27 @@ int tscProcessQueryRsp(SSqlObj *pSql) { return 0; } +static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char *data, int8_t compressed) { + int32_t decompLen = 0; + int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput; + + TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1); + int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1); + char *outputBuf = tcalloc(pRes->numOfRows, (pField->bytes + offset)); + + char *p = outputBuf; + int32_t bufOffset = 0, compSize = 0; + for(int32_t i = 0; i < numOfCols; ++i) { + SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); + bufOffset = pInfo->field.bytes * pRes->numOfRows; + int32_t flen = (*(tDataTypes[pInfo->field.type].decompFunc))(data, compSize, pRes->numOfRows, p, bufOffset, + compressed, NULL, 0); + p += flen; + decompLen +=flen; + } + tfree(outputBuf); +} + int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; @@ -2690,18 +2711,24 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { return pRes->code; } - pRes->numOfRows = htonl(pRetrieve->numOfRows); - pRes->precision = htons(pRetrieve->precision); - pRes->offset = htobe64(pRetrieve->offset); - pRes->useconds = htobe64(pRetrieve->useconds); - pRes->completed = (pRetrieve->completed == 1); - pRes->data = pRetrieve->data; + pRes->numOfRows = htonl(pRetrieve->numOfRows); + pRes->precision = htons(pRetrieve->precision); + pRes->offset = htobe64(pRetrieve->offset); + pRes->useconds = htobe64(pRetrieve->useconds); + pRes->completed = (pRetrieve->completed == 1); + pRes->compressed = (pRetrieve->compressed == 1); + pRes->data = pRetrieve->data; SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) { return pRes->code; } + //Decompress col data if compressed from server + if (pRes->compressed) { + decompressQueryColData(pRes, pQueryInfo, pRes->data, pRes->compressed); + } + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); if ((pCmd->command == TSDB_SQL_RETRIEVE) || ((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) && diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 71d8d24389..e687090106 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4207,21 +4207,30 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data SSDataBlock* pRes = pRuntimeEnv->outputBuf; + int32_t *compSizes = NULL; + int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput; + + if (compressed) { + compSizes = tmalloc(numOfCols); + } + if (pQueryAttr->pExpr2 == NULL) { - for (int32_t col = 0; col < pQueryAttr->numOfOutput; ++col) { + for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); if (compressed) { - *compLen += compressQueryColData(pColRes, pRes->info.rows, data, compressed); + compSizes[col] = compressQueryColData(pColRes, pRes->info.rows, data, compressed); + *compLen += compSizes[col]; } else { memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows); data += pColRes->info.bytes * pRes->info.rows; } } } else { - for (int32_t col = 0; col < pQueryAttr->numOfExpr2; ++col) { + for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); if (compressed) { - *compLen += compressQueryColData(pColRes, numOfRows, data, compressed); + compSizes[col] = compressQueryColData(pColRes, numOfRows, data, compressed); + *compLen += compSizes[col]; } else { memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows); data += pColRes->info.bytes * numOfRows; @@ -4229,6 +4238,13 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data } } + if (compressed) { + memmove(data, (char *)compSizes, numOfCols * sizeof(int32_t)); + data += numOfCols * sizeof(int32_t); + + tfree(compSizes); + } + int32_t numOfTables = (int32_t) taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap); *(int32_t*)data = htonl(numOfTables); data += sizeof(int32_t); -- GitLab