提交 40f5ff71 编写于 作者: G Ganlin Zhao

add top/bot splitting

上级 b5f6272b
......@@ -124,7 +124,7 @@ typedef enum EFunctionType {
FUNCTION_TYPE_BLOCK_DIST, // block distribution aggregate function
// distributed splitting functions
FUNCTION_TYPE_APERCENTILE_PARTIAL,
FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000,
FUNCTION_TYPE_APERCENTILE_MERGE,
FUNCTION_TYPE_SPREAD_PARTIAL,
FUNCTION_TYPE_SPREAD_MERGE,
......@@ -134,6 +134,8 @@ typedef enum EFunctionType {
FUNCTION_TYPE_HYPERLOGLOG_MERGE,
FUNCTION_TYPE_ELAPSED_PARTIAL,
FUNCTION_TYPE_ELAPSED_MERGE,
FUNCTION_TYPE_TOP_PARTIAL,
FUNCTION_TYPE_TOP_MERGE,
// user defined funcion
FUNCTION_TYPE_UDF = 10000
......
......@@ -104,6 +104,7 @@ int32_t bottomFunction(SqlFunctionCtx *pCtx);
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t getTopBotInfoSize();
bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
......
......@@ -325,7 +325,7 @@ static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_
return TSDB_CODE_SUCCESS;
}
static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (2 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
......@@ -360,8 +360,62 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return TSDB_CODE_SUCCESS;
}
static int32_t translateBottom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateTop(pFunc, pErrBuf, len);
static int32_t translateTopBotImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (isPartial) {
if (2 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_NUMERIC_TYPE(para1Type) || !IS_INTEGER_TYPE(para2Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
// param1
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
if (nodeType(pParamNode1) != QUERY_NODE_VALUE) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
SValueNode* pValue = (SValueNode*)pParamNode1;
if (pValue->node.resType.type != TSDB_DATA_TYPE_BIGINT) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (pValue->datum.i < 1 || pValue->datum.i > 100) {
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
}
pValue->notReserved = true;
// set result type
pFunc->node.resType = (SDataType){.bytes = getTopBotInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
} else {
if (1 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (TSDB_DATA_TYPE_BINARY == para1Type) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
// set result type
SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type};
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateTopBotPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateTopBotImpl(pFunc, pErrBuf, len, true);
}
static int32_t translateTopBotMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateTopBotImpl(pFunc, pErrBuf, len, false);
}
static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
......@@ -1475,7 +1529,29 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "top",
.type = FUNCTION_TYPE_TOP,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateTop,
.translateFunc = translateTopBot,
.getEnvFunc = getTopBotFuncEnv,
.initFunc = functionSetup,
.processFunc = topFunction,
.finalizeFunc = topBotFinalize,
.combineFunc = topCombine,
},
{
.name = "_top_partial",
.type = FUNCTION_TYPE_TOP_PARTIAL,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateTopBotPartial,
.getEnvFunc = getTopBotFuncEnv,
.initFunc = functionSetup,
.processFunc = topFunction,
.finalizeFunc = topBotFinalize,
.combineFunc = topCombine,
},
{
.name = "_top_merge",
.type = FUNCTION_TYPE_TOP_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateTopBotMerge,
.getEnvFunc = getTopBotFuncEnv,
.initFunc = functionSetup,
.processFunc = topFunction,
......@@ -1486,7 +1562,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "bottom",
.type = FUNCTION_TYPE_BOTTOM,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateBottom,
.translateFunc = translateTopBot,
.getEnvFunc = getTopBotFuncEnv,
.initFunc = functionSetup,
.processFunc = bottomFunction,
......
......@@ -2655,6 +2655,10 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
return numOfElems;
}
int32_t getTopBotInfoSize() {
return (int32_t)sizeof(STopBotRes);
}
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SValueNode* pkNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册