From 1a3cf2e5059900088c7268d6273344b412e775d9 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 6 Aug 2021 09:32:07 +0800 Subject: [PATCH] [TD-5623]: add compression code on server side --- src/inc/taosmsg.h | 1 + src/query/inc/qExecutor.h | 5 ++++- src/query/src/qExecutor.c | 30 +++++++++++++++++++++++------- src/query/src/queryMain.c | 10 +++++++++- 4 files changed, 37 insertions(+), 9 deletions(-) diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 837db72274..d8b94b3da1 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -536,6 +536,7 @@ typedef struct SRetrieveTableRsp { int16_t precision; int64_t offset; // updated offset value for multi-vnode projection query int64_t useconds; + int8_t compressed; char data[]; } SRetrieveTableRsp; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 84f00891e5..9fb7644cda 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -43,6 +43,9 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int #define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows) +#define QUERY_COMP_THRESHOLD 60 +#define NEEDTO_COMPRESS_QUERY(size) ((size) > QUERY_COMP_THRESHOLD ? 1 : 0) + enum { // when query starts to execute, this status will set QUERY_NOT_COMPLETED = 0x1u, @@ -647,7 +650,7 @@ void destroyUdfInfo(SUdfInfo* pUdfInfo); bool isValidQInfo(void *param); -int32_t doDumpQueryResult(SQInfo *pQInfo, char *data); +int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen); size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows); void setQueryKilled(SQInfo *pQInfo); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d799d56cea..71d8d24389 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4193,7 +4193,15 @@ static void updateNumOfRowsInResultRows(SQueryRuntimeEnv* pRuntimeEnv, SQLFuncti } } -static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) { +static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) { + int32_t colLen = pColRes->info.bytes * numOfRows; + int32_t colCompLen = (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colLen, numOfRows, data, + colLen + COMP_OVERFLOW_BYTES, compressed, NULL, 0); + data += colCompLen; + return colCompLen; +} + +static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data, int8_t compressed, int32_t *compLen) { SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -4202,14 +4210,22 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data if (pQueryAttr->pExpr2 == NULL) { for (int32_t col = 0; col < pQueryAttr->numOfOutput; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); - memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows); - data += pColRes->info.bytes * pRes->info.rows; + if (compressed) { + *compLen += compressQueryColData(pColRes, pRes->info.rows, data, compressed); + } else { + memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows); + data += pColRes->info.bytes * pRes->info.rows; + } } } else { for (int32_t col = 0; col < pQueryAttr->numOfExpr2; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); - memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows); - data += pColRes->info.bytes * numOfRows; + if (compressed) { + *compLen += compressQueryColData(pColRes, numOfRows, data, compressed); + } else { + memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows); + data += pColRes->info.bytes * numOfRows; + } } } @@ -8695,7 +8711,7 @@ void freeQInfo(SQInfo *pQInfo) { tfree(pQInfo); } -int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { +int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen) { // the remained number of retrieved rows, not the interpolated result SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; @@ -8738,7 +8754,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { setQueryStatus(pRuntimeEnv, QUERY_OVER); } } else { - doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data); + doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data, compressed, compLen); } qDebug("QInfo:0x%"PRIx64" current numOfRes rows:%d, total:%" PRId64, pQInfo->qId, diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 1a9c057ef0..c1cd431ef4 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -324,6 +324,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen, bool* continueExec) { SQInfo *pQInfo = (SQInfo *)qinfo; + int32_t compLen = 0; if (pQInfo == NULL || !isValidQInfo(pQInfo)) { return TSDB_CODE_QRY_INVALID_QHANDLE; @@ -356,12 +357,19 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co } (*pRsp)->precision = htons(pQueryAttr->precision); + (*pRsp)->compressed = NEEDTO_COMPRESS_QUERY(pQueryAttr->resultRowSize * s); + if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) { - doDumpQueryResult(pQInfo, (*pRsp)->data); + doDumpQueryResult(pQInfo, (*pRsp)->data, (*pRsp)->compressed, &compLen); } else { setQueryStatus(pRuntimeEnv, QUERY_OVER); } + if ((*pRsp)->compressed && compLen != 0) { + *contLen = *contLen - pQueryAttr->resultRowSize * s + compLen; + *pRsp = (SRetrieveTableRsp *)rpcReallocCont(*pRsp, *contLen); + } + pQInfo->rspContext = NULL; pQInfo->dataReady = QUERY_RESULT_NOT_READY; -- GitLab