未验证 提交 05c7bc17 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #20098 from taosdata/feat/TS-2671

feat(query): optimize percentile function performance
......@@ -796,19 +796,23 @@ HISTOGRAM(expr,bin_type, bin_description, normalized)
### PERCENTILE
```sql
PERCENTILE(expr, p)
PERCENTILE(expr, p [, p1] ...)
```
**Description**: The value whose rank in a specific column matches the specified percentage. If such a value matching the specified percentage doesn't exist in the column, an interpolation value will be returned.
**Return value type**: DOUBLE
**Return value type**: This function takes 2 minumum and 11 maximum parameters, and it can simultaneously return 10 percentiles at most. If 2 parameters are given, a single percentile is returned and the value type is DOUBLE.
If more than 2 parameters are given, the return value type is a VARCHAR string, the format of which is a JSON ARRAY containing all return values.
**Applicable column types**: Numeric
**Applicable table types**: table only
**More explanations**: _p_ is in range [0,100], when _p_ is 0, the result is same as using function MIN; when _p_ is 100, the result is same as function MAX.
**More explanations**:
- _p_ is in range [0,100], when _p_ is 0, the result is same as using function MIN; when _p_ is 100, the result is same as function MAX.
- When calculating multiple percentiles of a specific column, a single PERCENTILE function with multiple parameters is adviced, as this can largely reduce the query response time.
For example, using SELECT percentile(col, 90, 95, 99) FROM table will perform better than SELECT percentile(col, 90), percentile(col, 95), percentile(col, 99) from table.
## Selection Functions
......
......@@ -798,18 +798,22 @@ HISTOGRAM(expr,bin_type, bin_description, normalized)
### PERCENTILE
```sql
PERCENTILE(expr, p)
PERCENTILE(expr, p [, p1] ... )
```
**功能说明**:统计表中某列的值百分比分位数。
**返回数据类型**DOUBLE
**返回数据类型**该函数最小参数个数为 2 个,最大参数个数为 11 个。可以最多同时返回 10 个百分比分位数。当参数个数为 2 时, 返回一个分位数, 类型为DOUBLE,当参数个数大于 2 时,返回类型为VARCHAR, 格式为包含多个返回值的JSON数组
**应用字段**:数值类型。
**适用于**:表。
**使用说明***P*值取值范围 0≤*P*≤100,为 0 的时候等同于 MIN,为 100 的时候等同于 MAX。
**使用说明**
- *P*值取值范围 0≤*P*≤100,为 0 的时候等同于 MIN,为 100 的时候等同于 MAX;
- 同时计算针对同一列的多个分位数时,建议使用一个PERCENTILE函数和多个参数的方式,能很大程度上降低查询的响应时间。
比如,使用查询SELECT percentile(col, 90, 95, 99) FROM table, 性能会优于SELECT percentile(col, 90), percentile(col, 95), percentile(col, 99) from table。
## 选择函数
......
......@@ -76,7 +76,7 @@ enum {
enum {
MAIN_SCAN = 0x0u,
REVERSE_SCAN = 0x1u, // todo remove it
REPEAT_SCAN = 0x2u, // repeat scan belongs to the master scan
PRE_SCAN = 0x2u, // pre-scan belongs to the main scan and occurs before main scan
};
typedef struct SPoint1 {
......
......@@ -397,19 +397,20 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC
}
}
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order) {
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
SqlFunctionCtx* pCtx = pExprSup->pCtx;
for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
pCtx[i].order = order;
pCtx[i].input.numOfRows = pBlock->info.rows;
setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock);
pCtx[i].pSrcBlock = pBlock;
pCtx[i].scanFlag = scanFlag;
}
}
void setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) {
if (pBlock->pBlockAgg != NULL) {
doSetInputDataBlockInfo(pExprSup, pBlock, order);
doSetInputDataBlockInfo(pExprSup, pBlock, order, scanFlag);
} else {
doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol);
}
......@@ -539,7 +540,7 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
return false;
}
if (pCtx->scanFlag == REPEAT_SCAN) {
if (pCtx->scanFlag == PRE_SCAN) {
return fmIsRepeatScanFunc(pCtx->functionId);
}
......
......@@ -703,7 +703,8 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTableScanInfo->base.scanFlag = REPEAT_SCAN;
pTableScanInfo->base.scanFlag = MAIN_SCAN;
pTableScanInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
// do prepare for the next round table scan operation
......@@ -729,7 +730,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
if (pTableScanInfo->scanTimes < total) {
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTableScanInfo->base.scanFlag = REPEAT_SCAN;
pTableScanInfo->base.scanFlag = MAIN_SCAN;
qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
......@@ -881,8 +882,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
}
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
pInfo->base.scanFlag = (pInfo->scanInfo.numOfAsc > 1) ? PRE_SCAN : MAIN_SCAN;
pInfo->base.scanFlag = MAIN_SCAN;
pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode);
pInfo->base.readHandle = *readHandle;
pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;
......
......@@ -504,27 +504,45 @@ static int32_t translateTimezone(SFunctionNode* pFunc, char* pErrBuf, int32_t le
}
static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (2 != LIST_LENGTH(pFunc->pParameterList)) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (numOfParams < 2 || numOfParams > 11) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
// param1
SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
if (pValue->datum.i < 0 || pValue->datum.i > 100) {
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (!IS_NUMERIC_TYPE(para1Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pValue->notReserved = true;
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_NUMERIC_TYPE(para1Type) || (!IS_SIGNED_NUMERIC_TYPE(para2Type) && !IS_UNSIGNED_NUMERIC_TYPE(para2Type))) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
for (int32_t i = 1; i < numOfParams; ++i) {
SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, i);
pValue->notReserved = true;
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type;
if (!IS_NUMERIC_TYPE(paraType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
double v = 0;
if (IS_INTEGER_TYPE(paraType)) {
v = (double)pValue->datum.i;
} else {
v = pValue->datum.d;
}
if (v < 0 || v > 100) {
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
}
}
// set result type
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
if (numOfParams > 2) {
pFunc->node.resType = (SDataType){.bytes = 512, .type = TSDB_DATA_TYPE_VARCHAR};
} else {
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
}
return TSDB_CODE_SUCCESS;
}
......@@ -2273,8 +2291,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "percentile",
.type = FUNCTION_TYPE_PERCENTILE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_REPEAT_SCAN_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_REPEAT_SCAN_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translatePercentile,
.dataRequiredFunc = statisDataRequired,
.getEnvFunc = getPercentileFuncEnv,
.initFunc = percentileFunctionSetup,
.processFunc = percentileFunction,
......
......@@ -1599,7 +1599,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
int32_t type = pCol->info.type;
SPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->scanFlag == REPEAT_SCAN && pInfo->stage == 0) {
if (pCtx->scanFlag == MAIN_SCAN && pInfo->stage == 0) {
pInfo->stage += 1;
// all data are null, set it completed
......@@ -1682,26 +1682,67 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
}
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SVariant* pVal = &pCtx->param[1].param;
int32_t code = 0;
double v = 0;
GET_TYPED_DATA(v, double, pVal->nType, &pVal->i);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SPercentileInfo* ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
int32_t code = 0;
double v = 0;
tMemBucket* pMemBucket = ppInfo->pMemBucket;
if (pMemBucket != NULL && pMemBucket->total > 0) { // check for null
if (pMemBucket == NULL || pMemBucket->total == 0) { // check for null
code = TSDB_CODE_FAILED;
goto _fin_error;
}
if (pCtx->numOfParams > 2) {
char buf[512] = {0};
size_t len = 1;
varDataVal(buf)[0] = '[';
for (int32_t i = 1; i < pCtx->numOfParams; ++i) {
SVariant* pVal = &pCtx->param[i].param;
GET_TYPED_DATA(v, double, pVal->nType, &pVal->i);
int32_t code = getPercentile(pMemBucket, v, &ppInfo->result);
if (code != TSDB_CODE_SUCCESS) {
goto _fin_error;
}
if (i == pCtx->numOfParams - 1) {
len += snprintf(varDataVal(buf) + len, sizeof(buf) - VARSTR_HEADER_SIZE - len, "%.6lf]", ppInfo->result);
} else {
len += snprintf(varDataVal(buf) + len, sizeof(buf) - VARSTR_HEADER_SIZE - len, "%.6lf, ", ppInfo->result);
}
}
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
varDataSetLen(buf, len);
colDataAppend(pCol, pBlock->info.rows, buf, false);
tMemBucketDestroy(pMemBucket);
return pResInfo->numOfRes;
} else {
SVariant* pVal = &pCtx->param[1].param;
GET_TYPED_DATA(v, double, pVal->nType, &pVal->i);
code = getPercentile(pMemBucket, v, &ppInfo->result);
if (code != TSDB_CODE_SUCCESS) {
goto _fin_error;
}
tMemBucketDestroy(pMemBucket);
return functionFinalize(pCtx, pBlock);
}
_fin_error:
tMemBucketDestroy(pMemBucket);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
return code;
return functionFinalize(pCtx, pBlock);
}
bool getApercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
......
......@@ -50,7 +50,7 @@ class TDTestCase:
'col12': f'binary({self.binary_length})',
'col13': f'nchar({self.nchar_length})'
}
self.tag_dict = {
'ts_tag' : 'timestamp',
't1': 'tinyint',
......@@ -85,19 +85,19 @@ class TDTestCase:
self.tag_values = [
f'{self.tag_ts},{self.tag_tinyint},{self.tag_smallint},{self.tag_int},{self.tag_bigint},\
{self.tag_utint},{self.tag_usint},{self.tag_uint},{self.tag_ubint},{self.tag_float},{self.tag_double},{self.tag_bool},"{self.binary_str}","{self.nchar_str}"'
]
self.param = [1,50,100]
def insert_data(self,column_dict,tbname,row_num):
intData = []
intData = []
floatData = []
insert_sql = self.setsql.set_insertsql(column_dict,tbname,self.binary_str,self.nchar_str)
for i in range(row_num):
insert_list = []
self.setsql.insert_values(column_dict,i,insert_sql,insert_list,self.ts)
intData.append(i)
intData.append(i)
floatData.append(i + 0.1)
return intData,floatData
def check_tags(self,tags,param,num,value):
......@@ -117,6 +117,20 @@ class TDTestCase:
else:
tdSql.query(f'select percentile({k}, {param}) from {self.ntbname}')
tdSql.checkData(0, 0, np.percentile(floatData, param))
tdSql.query(f'select percentile(col1, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100) from {self.ntbname}')
tdSql.checkData(0, 0, '[0.900000, 1.800000, 2.700000, 3.600000, 4.500000, 5.400000, 6.300000, 7.200000, 8.100000, 9.000000]')
tdSql.query(f'select percentile(col1, 9.9, 19.9, 29.9, 39.9, 49.9, 59.9, 69.9, 79.9, 89.9, 99.9) from {self.ntbname}')
tdSql.checkData(0, 0, '[0.891000, 1.791000, 2.691000, 3.591000, 4.491000, 5.391000, 6.291000, 7.191000, 8.091000, 8.991000]')
tdSql.error(f'select percentile(col1) from {self.ntbname}')
tdSql.error(f'select percentile(col1, -1) from {self.ntbname}')
tdSql.error(f'select percentile(col1, 101) from {self.ntbname}')
tdSql.error(f'select percentile(col1, col2) from {self.ntbname}')
tdSql.error(f'select percentile(1, col1) from {self.ntbname}')
tdSql.error(f'select percentile(col1, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 101) from {self.ntbname}')
tdSql.execute(f'drop database {self.dbname}')
def function_check_ctb(self):
tdSql.execute(f'create database {self.dbname}')
......@@ -135,7 +149,7 @@ class TDTestCase:
else:
tdSql.query(f'select percentile({k}, {param}) from {self.stbname}_{i}')
tdSql.checkData(0, 0, np.percentile(floatData, param))
for k,v in self.tag_dict.items():
for param in self.param:
if v.lower() in ['timestamp','bool'] or 'binary' in v.lower() or 'nchar' in v.lower():
......@@ -145,11 +159,25 @@ class TDTestCase:
data_num = tdSql.queryResult[0][0]
tdSql.query(f'select percentile({k},{param}) from {self.stbname}_{i}')
tdSql.checkData(0,0,data_num)
tdSql.execute(f'drop database {self.dbname}')
tdSql.query(f'select percentile(col1, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100) from {self.stbname}_0')
tdSql.checkData(0, 0, '[0.900000, 1.800000, 2.700000, 3.600000, 4.500000, 5.400000, 6.300000, 7.200000, 8.100000, 9.000000]')
tdSql.query(f'select percentile(col1, 9.9, 19.9, 29.9, 39.9, 49.9, 59.9, 69.9, 79.9, 89.9, 99.9) from {self.stbname}_0')
tdSql.checkData(0, 0, '[0.891000, 1.791000, 2.691000, 3.591000, 4.491000, 5.391000, 6.291000, 7.191000, 8.091000, 8.991000]')
tdSql.error(f'select percentile(col1) from {self.stbname}_0')
tdSql.error(f'select percentile(col1, -1) from {self.stbname}_0')
tdSql.error(f'select percentile(col1, 101) from {self.stbname}_0')
tdSql.error(f'select percentile(col1, col2) from {self.stbname}_0')
tdSql.error(f'select percentile(1, col1) from {self.stbname}_0')
tdSql.error(f'select percentile(col1, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 101) from {self.stbname}_0')
tdSql.execute(f'drop database {self.dbname}')
def run(self):
self.function_check_ntb()
self.function_check_ctb()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册