未验证 提交 8529e74c 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #7376 from taosdata/feature/TD-5623

[TD-5623]<feature>: add retrieved column data compression mechanism
......@@ -2678,6 +2678,50 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
return 0;
}
static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char **data, int8_t compressed, int compLen) {
int32_t decompLen = 0;
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
int32_t *compSizes;
char *pData = *data;
compSizes = (int32_t *)(pData + compLen);
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
char *outputBuf = tcalloc(pRes->numOfRows, (pField->bytes + offset));
char *p = outputBuf;
int32_t bufOffset;
for (int32_t i = 0; i < numOfCols; ++i) {
SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
bufOffset = pInfo->field.bytes * pRes->numOfRows;
int32_t flen = (*(tDataTypes[pInfo->field.type].decompFunc))(pData, htonl(compSizes[i]), pRes->numOfRows, p, bufOffset,
compressed, NULL, 0);
p += flen;
decompLen +=flen;
pData += htonl(compSizes[i]);
}
/* Resize rsp as decompressed data will occupy more space */
pRes->rspLen = pRes->rspLen - (compLen + numOfCols * sizeof(int32_t)) + decompLen;
char *new_rsp = (char *)realloc(pRes->pRsp, pRes->rspLen);
if (new_rsp == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return;
} else {
pRes->pRsp = new_rsp;
*data = ((SRetrieveTableRsp *)pRes->pRsp)->data;
pData = *data + compLen + numOfCols * sizeof(int32_t);
}
int32_t tailLen = pRes->rspLen - sizeof(SRetrieveTableRsp) - decompLen;
memmove(*data + decompLen, pData, tailLen);
memmove(*data, outputBuf, decompLen);
tfree(outputBuf);
}
int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
......@@ -2690,18 +2734,24 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
return pRes->code;
}
pRes->numOfRows = htonl(pRetrieve->numOfRows);
pRes->precision = htons(pRetrieve->precision);
pRes->offset = htobe64(pRetrieve->offset);
pRes->useconds = htobe64(pRetrieve->useconds);
pRes->completed = (pRetrieve->completed == 1);
pRes->data = pRetrieve->data;
pRes->numOfRows = htonl(pRetrieve->numOfRows);
pRes->precision = htons(pRetrieve->precision);
pRes->offset = htobe64(pRetrieve->offset);
pRes->useconds = htobe64(pRetrieve->useconds);
pRes->completed = (pRetrieve->completed == 1);
pRes->data = pRetrieve->data;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
return pRes->code;
}
//Decompress col data if compressed from server
if (pRetrieve->compressed) {
int32_t compLen = htonl(pRetrieve->compLen);
decompressQueryColData(pRes, pQueryInfo, &pRes->data, pRetrieve->compressed, compLen);
}
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if ((pCmd->command == TSDB_SQL_RETRIEVE) ||
((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) &&
......@@ -2714,10 +2764,10 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
if (pSql->pSubscription != NULL) {
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;
int32_t numOfTables = htonl(*(int32_t*)p);
......
......@@ -60,6 +60,7 @@ extern char tsLocale[];
extern char tsCharset[]; // default encode string
extern int8_t tsEnableCoreFile;
extern int32_t tsCompressMsgSize;
extern int32_t tsCompressColData;
extern int32_t tsMaxNumOfDistinctResults;
extern char tsTempDir[];
......
......@@ -75,6 +75,14 @@ int32_t tsMaxBinaryDisplayWidth = 30;
*/
int32_t tsCompressMsgSize = -1;
/* denote if server needs to compress the retrieved column data before adding to the rpc response message body.
* 0: disable column data compression
* 1: enable column data compression
* This option is default to disabled. Once enabled, compression will be conducted if any column has size more
* than QUERY_COMP_THRESHOLD. Otherwise, no further compression is needed.
*/
int32_t tsCompressColData = 0;
// client
int32_t tsMaxSQLStringLen = TSDB_MAX_ALLOWED_SQL_LEN;
int32_t tsMaxWildCardsLen = TSDB_PATTERN_STRING_MAX_LEN;
......@@ -991,6 +999,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "compressColData";
cfg.ptr = &tsCompressColData;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "maxSQLLength";
cfg.ptr = &tsMaxSQLStringLen;
cfg.valType = TAOS_CFG_VTYPE_INT32;
......
......@@ -536,6 +536,8 @@ typedef struct SRetrieveTableRsp {
int16_t precision;
int64_t offset; // updated offset value for multi-vnode projection query
int64_t useconds;
int8_t compressed;
int32_t compLen;
char data[];
} SRetrieveTableRsp;
......
......@@ -43,6 +43,10 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows)
//TODO: may need to fine tune this threshold
#define QUERY_COMP_THRESHOLD (1024 * 512)
#define NEEDTO_COMPRESS_QUERY(size) ((size) > QUERY_COMP_THRESHOLD ? 1 : 0)
enum {
// when query starts to execute, this status will set
QUERY_NOT_COMPLETED = 0x1u,
......@@ -638,6 +642,7 @@ int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg);
bool isQueryKilled(SQInfo *pQInfo);
int32_t checkForQueryBuf(size_t numOfTables);
bool checkNeedToCompressQueryCol(SQInfo *pQInfo);
bool doBuildResCheck(SQInfo* pQInfo);
void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status);
......@@ -646,7 +651,7 @@ void destroyUdfInfo(SUdfInfo* pUdfInfo);
bool isValidQInfo(void *param);
int32_t doDumpQueryResult(SQInfo *pQInfo, char *data);
int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen);
size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows);
void setQueryKilled(SQInfo *pQInfo);
......
......@@ -4190,26 +4190,60 @@ static void updateNumOfRowsInResultRows(SQueryRuntimeEnv* pRuntimeEnv, SQLFuncti
}
}
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) {
static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
int32_t colSize = pColRes->info.bytes * numOfRows;
return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data,
colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
}
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data, int8_t compressed, int32_t *compLen) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
SSDataBlock* pRes = pRuntimeEnv->outputBuf;
int32_t *compSizes = NULL;
int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput;
if (compressed) {
compSizes = tcalloc(numOfCols, sizeof(int32_t));
}
if (pQueryAttr->pExpr2 == NULL) {
for (int32_t col = 0; col < pQueryAttr->numOfOutput; ++col) {
for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col);
memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows);
data += pColRes->info.bytes * pRes->info.rows;
if (compressed) {
compSizes[col] = compressQueryColData(pColRes, pRes->info.rows, data, compressed);
data += compSizes[col];
*compLen += compSizes[col];
compSizes[col] = htonl(compSizes[col]);
} else {
memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows);
data += pColRes->info.bytes * pRes->info.rows;
}
}
} else {
for (int32_t col = 0; col < pQueryAttr->numOfExpr2; ++col) {
for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col);
memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows);
data += pColRes->info.bytes * numOfRows;
if (compressed) {
compSizes[col] = htonl(compressQueryColData(pColRes, numOfRows, data, compressed));
data += compSizes[col];
*compLen += compSizes[col];
compSizes[col] = htonl(compSizes[col]);
} else {
memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows);
data += pColRes->info.bytes * numOfRows;
}
}
}
if (compressed) {
memmove(data, (char *)compSizes, numOfCols * sizeof(int32_t));
data += numOfCols * sizeof(int32_t);
tfree(compSizes);
}
int32_t numOfTables = (int32_t) taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap);
*(int32_t*)data = htonl(numOfTables);
data += sizeof(int32_t);
......@@ -8692,7 +8726,7 @@ void freeQInfo(SQInfo *pQInfo) {
tfree(pQInfo);
}
int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen) {
// the remained number of retrieved rows, not the interpolated result
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
......@@ -8735,7 +8769,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
setQueryStatus(pRuntimeEnv, QUERY_OVER);
}
} else {
doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data);
doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data, compressed, compLen);
}
qDebug("QInfo:0x%"PRIx64" current numOfRes rows:%d, total:%" PRId64, pQInfo->qId,
......@@ -8821,6 +8855,30 @@ int32_t checkForQueryBuf(size_t numOfTables) {
return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}
bool checkNeedToCompressQueryCol(SQInfo *pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
SSDataBlock* pRes = pRuntimeEnv->outputBuf;
if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) <= 0) {
return false;
}
int32_t numOfRows = pQueryAttr->pExpr2 ? GET_NUM_OF_RESULTS(pRuntimeEnv) : pRes->info.rows;
int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput;
for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col);
int32_t colSize = pColRes->info.bytes * numOfRows;
if (NEEDTO_COMPRESS_QUERY(colSize)) {
return true;
}
}
return false;
}
void releaseQueryBuf(size_t numOfTables) {
if (tsQueryBufferSizeBytes < 0) {
return;
......
......@@ -324,6 +324,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen, bool* continueExec) {
SQInfo *pQInfo = (SQInfo *)qinfo;
int32_t compLen = 0;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
......@@ -356,12 +357,21 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
}
(*pRsp)->precision = htons(pQueryAttr->precision);
(*pRsp)->compressed = (int8_t)(tsCompressColData && checkNeedToCompressQueryCol(pQInfo));
if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) {
doDumpQueryResult(pQInfo, (*pRsp)->data);
doDumpQueryResult(pQInfo, (*pRsp)->data, (*pRsp)->compressed, &compLen);
} else {
setQueryStatus(pRuntimeEnv, QUERY_OVER);
}
if ((*pRsp)->compressed && compLen != 0) {
int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput;
*contLen = *contLen - pQueryAttr->resultRowSize * s + compLen + numOfCols * sizeof(int32_t);
*pRsp = (SRetrieveTableRsp *)rpcReallocCont(*pRsp, *contLen);
}
(*pRsp)->compLen = htonl(compLen);
pQInfo->rspContext = NULL;
pQInfo->dataReady = QUERY_RESULT_NOT_READY;
......
......@@ -25,7 +25,7 @@ class TDTestCase:
def run(self):
tdSql.query("show variables")
tdSql.checkData(54, 1, 864000)
tdSql.checkData(55, 1, 864000)
def stop(self):
tdSql.close()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册