diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index b6821de87aed8831d4a8d7dd7bc8c46ac8fde551..b025ca17c3fc597c077b77e3dbdc070dee26e229 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -303,6 +303,7 @@ typedef struct { int16_t numOfCols; int16_t precision; bool completed; + bool compressed; int32_t code; int32_t numOfGroups; SResRec * pGroupRec; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 38927be4ac1ab927dea17b02ebfee2db9d2cad7a..399a47c4d668006d52bf5e48f4890e140a457bce 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2678,6 +2678,27 @@ int tscProcessQueryRsp(SSqlObj *pSql) { return 0; } +static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char *data, int8_t compressed) { + int32_t decompLen = 0; + int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput; + + TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1); + int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1); + char *outputBuf = tcalloc(pRes->numOfRows, (pField->bytes + offset)); + + char *p = outputBuf; + int32_t bufOffset = 0, compSize = 0; + for(int32_t i = 0; i < numOfCols; ++i) { + SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); + bufOffset = pInfo->field.bytes * pRes->numOfRows; + int32_t flen = (*(tDataTypes[pInfo->field.type].decompFunc))(data, compSize, pRes->numOfRows, p, bufOffset, + compressed, NULL, 0); + p += flen; + decompLen +=flen; + } + tfree(outputBuf); +} + int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; @@ -2690,18 +2711,24 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { return pRes->code; } - pRes->numOfRows = htonl(pRetrieve->numOfRows); - pRes->precision = htons(pRetrieve->precision); - pRes->offset = htobe64(pRetrieve->offset); - pRes->useconds = htobe64(pRetrieve->useconds); - pRes->completed = (pRetrieve->completed == 1); - pRes->data = pRetrieve->data; + pRes->numOfRows = htonl(pRetrieve->numOfRows); + pRes->precision = htons(pRetrieve->precision); + pRes->offset = htobe64(pRetrieve->offset); + pRes->useconds = htobe64(pRetrieve->useconds); + pRes->completed = (pRetrieve->completed == 1); + pRes->compressed = (pRetrieve->compressed == 1); + pRes->data = pRetrieve->data; SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) { return pRes->code; } + //Decompress col data if compressed from server + if (pRes->compressed) { + decompressQueryColData(pRes, pQueryInfo, pRes->data, pRes->compressed); + } + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); if ((pCmd->command == TSDB_SQL_RETRIEVE) || ((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) && diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 71d8d243891aecdc0d2d63cc0412a8be1eace10a..e6870901069359273e7eb353b171e54425dee2a9 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4207,21 +4207,30 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data SSDataBlock* pRes = pRuntimeEnv->outputBuf; + int32_t *compSizes = NULL; + int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput; + + if (compressed) { + compSizes = tmalloc(numOfCols); + } + if (pQueryAttr->pExpr2 == NULL) { - for (int32_t col = 0; col < pQueryAttr->numOfOutput; ++col) { + for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); if (compressed) { - *compLen += compressQueryColData(pColRes, pRes->info.rows, data, compressed); + compSizes[col] = compressQueryColData(pColRes, pRes->info.rows, data, compressed); + *compLen += compSizes[col]; } else { memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows); data += pColRes->info.bytes * pRes->info.rows; } } } else { - for (int32_t col = 0; col < pQueryAttr->numOfExpr2; ++col) { + for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); if (compressed) { - *compLen += compressQueryColData(pColRes, numOfRows, data, compressed); + compSizes[col] = compressQueryColData(pColRes, numOfRows, data, compressed); + *compLen += compSizes[col]; } else { memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows); data += pColRes->info.bytes * numOfRows; @@ -4229,6 +4238,13 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data } } + if (compressed) { + memmove(data, (char *)compSizes, numOfCols * sizeof(int32_t)); + data += numOfCols * sizeof(int32_t); + + tfree(compSizes); + } + int32_t numOfTables = (int32_t) taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap); *(int32_t*)data = htonl(numOfTables); data += sizeof(int32_t);