提交 5d543449 编写于 作者: G Ganlin Zhao

enh(query): add distributed splitting of aggregate function

TD-16321
上级 9c21f484
......@@ -122,6 +122,10 @@ typedef enum EFunctionType {
// internal function
FUNCTION_TYPE_SELECT_VALUE,
// distributed splitting functions
FUNCTION_TYPE_APERCENTILE_PARTIAL,
FUNCTION_TYPE_APERCENTILE_MERGE,
// user defined funcion
FUNCTION_TYPE_UDF = 10000
} EFunctionType;
......
......@@ -23,6 +23,7 @@ extern "C" {
#include "function.h"
#include "functionMgt.h"
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx));
......@@ -77,6 +78,7 @@ bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool apercentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t apercentileFunction(SqlFunctionCtx *pCtx);
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
......
......@@ -212,7 +212,7 @@ static bool validateApercentileAlgo(const SValueNode* pVal) {
0 == strcasecmp(varDataVal(pVal->datum.p), "t-digest"));
}
static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
static int32_t translateApercentileImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (2 != numOfParams && 3 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
......@@ -261,10 +261,22 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
pValue->notReserved = true;
}
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
if (!isPartial) {
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
} else {
pFunc->node.resType = (SDataType){.bytes = 1000, .type = TSDB_DATA_TYPE_BINARY};
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateApercentilePartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateApercentileImpl(pFunc, pErrBuf, len, true);
}
static int32_t translateApercentileFinal(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateApercentileImpl(pFunc, pErrBuf, len, false);
}
static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// pseudo column do not need to check parameters
pFunc->node.resType =
......@@ -1142,9 +1154,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = countFunction,
.finalizeFunc = functionFinalize,
.invertFunc = countInvertFunction,
.combineFunc = combineFunction,
// .pPartialFunc = "count",
// .pMergeFunc = "sum"
.combineFunc = combineFunction,
.pPartialFunc = "count",
.pMergeFunc = "sum"
},
{
.name = "sum",
......@@ -1157,7 +1169,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = sumFunction,
.finalizeFunc = functionFinalize,
.invertFunc = sumInvertFunction,
.combineFunc = sumCombine,
.combineFunc = sumCombine,
.pPartialFunc = "sum",
.pMergeFunc = "sum"
},
{
.name = "min",
......@@ -1169,7 +1183,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = minmaxFunctionSetup,
.processFunc = minFunction,
.finalizeFunc = minmaxFunctionFinalize,
.combineFunc = minCombine
.combineFunc = minCombine,
.pPartialFunc = "min",
.pMergeFunc = "min"
},
{
.name = "max",
......@@ -1181,7 +1197,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = minmaxFunctionSetup,
.processFunc = maxFunction,
.finalizeFunc = minmaxFunctionFinalize,
.combineFunc = maxCombine
.combineFunc = maxCombine,
.pPartialFunc = "max",
.pMergeFunc = "max"
},
{
.name = "stddev",
......@@ -1217,6 +1235,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.finalizeFunc = avgFinalize,
.invertFunc = avgInvertFunction,
.combineFunc = avgCombine,
.pPartialFunc = "avgPartial",
.pMergeFunc = "avgMerge"
},
{
.name = "percentile",
......@@ -1232,7 +1252,27 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "apercentile",
.type = FUNCTION_TYPE_APERCENTILE,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateApercentile,
.translateFunc = translateApercentileFinal,
.getEnvFunc = getApercentileFuncEnv,
.initFunc = apercentileFunctionSetup,
.processFunc = apercentileFunction,
.finalizeFunc = apercentileFinalize
},
{
.name = "_apercentile_partial",
.type = FUNCTION_TYPE_APERCENTILE_PARTIAL,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateApercentilePartial,
.getEnvFunc = getApercentileFuncEnv,
.initFunc = apercentileFunctionSetup,
.processFunc = apercentileFunction,
.finalizeFunc = apercentilePartialFinalize
},
{
.name = "_apercentile_merge",
.type = FUNCTION_TYPE_APERCENTILE_MERGE,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateApercentileFinal,
.getEnvFunc = getApercentileFuncEnv,
.initFunc = apercentileFunctionSetup,
.processFunc = apercentileFunction,
......@@ -1299,7 +1339,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = functionSetup,
.processFunc = firstFunction,
.finalizeFunc = firstLastFinalize,
.combineFunc = firstCombine,
.combineFunc = firstCombine,
.pPartialFunc = "first",
.pMergeFunc = "first"
},
{
.name = "last",
......@@ -1311,6 +1353,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = lastFunction,
.finalizeFunc = firstLastFinalize,
.combineFunc = lastCombine,
.pPartialFunc = "last",
.pMergeFunc = "last"
},
{
.name = "twa",
......
......@@ -2071,6 +2071,41 @@ int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return functionFinalize(pCtx, pBlock);
}
int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SVariant* pVal = &pCtx->param[1].param;
double percent = (pVal->nType == TSDB_DATA_TYPE_BIGINT) ? pVal->i : pVal->d;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
int32_t bytesHist = (int32_t)(sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
int32_t bytesDigest = (int32_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
int32_t resultBytes = TMAX(bytesHist, bytesDigest);
char *tmp = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
if (pInfo->pTDigest->size > 0) {
memcpy(varDataVal(tmp), pInfo->pTDigest, resultBytes);
} else {
return TSDB_CODE_SUCCESS;
}
} else {
if (pInfo->pHisto->numOfElems > 0) {
memcpy(varDataVal(tmp), pInfo->pHisto, resultBytes);
} else {
return TSDB_CODE_SUCCESS;
}
}
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
colDataAppend(pCol, pBlock->info.rows, tmp, false);
taosMemoryFree(tmp);
return pResInfo->numOfRes;
}
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册