未验证 提交 594aa2fb 编写于 作者: G Ganlin Zhao 提交者: GitHub

Merge pull request #13738 from taosdata/enh/top_bot_split

enh(query): add top/bottom function distributed split
...@@ -124,7 +124,7 @@ typedef enum EFunctionType { ...@@ -124,7 +124,7 @@ typedef enum EFunctionType {
FUNCTION_TYPE_BLOCK_DIST, // block distribution aggregate function FUNCTION_TYPE_BLOCK_DIST, // block distribution aggregate function
// distributed splitting functions // distributed splitting functions
FUNCTION_TYPE_APERCENTILE_PARTIAL, FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000,
FUNCTION_TYPE_APERCENTILE_MERGE, FUNCTION_TYPE_APERCENTILE_MERGE,
FUNCTION_TYPE_SPREAD_PARTIAL, FUNCTION_TYPE_SPREAD_PARTIAL,
FUNCTION_TYPE_SPREAD_MERGE, FUNCTION_TYPE_SPREAD_MERGE,
...@@ -134,6 +134,10 @@ typedef enum EFunctionType { ...@@ -134,6 +134,10 @@ typedef enum EFunctionType {
FUNCTION_TYPE_HYPERLOGLOG_MERGE, FUNCTION_TYPE_HYPERLOGLOG_MERGE,
FUNCTION_TYPE_ELAPSED_PARTIAL, FUNCTION_TYPE_ELAPSED_PARTIAL,
FUNCTION_TYPE_ELAPSED_MERGE, FUNCTION_TYPE_ELAPSED_MERGE,
FUNCTION_TYPE_TOP_PARTIAL,
FUNCTION_TYPE_TOP_MERGE,
FUNCTION_TYPE_BOTTOM_PARTIAL,
FUNCTION_TYPE_BOTTOM_MERGE,
// user defined funcion // user defined funcion
FUNCTION_TYPE_UDF = 10000 FUNCTION_TYPE_UDF = 10000
......
...@@ -99,11 +99,18 @@ int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); ...@@ -99,11 +99,18 @@ int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
bool getTopBotMergeFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
bool topBotFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t topFunction(SqlFunctionCtx *pCtx); int32_t topFunction(SqlFunctionCtx *pCtx);
int32_t topFunctionMerge(SqlFunctionCtx *pCtx);
int32_t bottomFunction(SqlFunctionCtx *pCtx); int32_t bottomFunction(SqlFunctionCtx *pCtx);
int32_t bottomFunctionMerge(SqlFunctionCtx *pCtx);
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t getTopBotInfoSize(int64_t numOfItems);
bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
......
...@@ -326,7 +326,7 @@ static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_ ...@@ -326,7 +326,7 @@ static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (2 != numOfParams) { if (2 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
...@@ -361,8 +361,62 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { ...@@ -361,8 +361,62 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t translateBottom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateTopBotImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
return translateTop(pFunc, pErrBuf, len); int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (isPartial) {
if (2 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_NUMERIC_TYPE(para1Type) || !IS_INTEGER_TYPE(para2Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
// param1
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
if (nodeType(pParamNode1) != QUERY_NODE_VALUE) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
SValueNode* pValue = (SValueNode*)pParamNode1;
if (pValue->node.resType.type != TSDB_DATA_TYPE_BIGINT) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (pValue->datum.i < 1 || pValue->datum.i > 100) {
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
}
pValue->notReserved = true;
// set result type
pFunc->node.resType = (SDataType){.bytes = getTopBotInfoSize(pValue->datum.i) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
} else {
if (1 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (TSDB_DATA_TYPE_BINARY != para1Type) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
// Do nothing. We can only access output of partial functions as input,
// so original input type cannot be obtained, resType will be set same
// as original function input type after merge function created.
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateTopBotPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateTopBotImpl(pFunc, pErrBuf, len, true);
}
static int32_t translateTopBotMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateTopBotImpl(pFunc, pErrBuf, len, false);
} }
static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
...@@ -1483,23 +1537,71 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1483,23 +1537,71 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "top", .name = "top",
.type = FUNCTION_TYPE_TOP, .type = FUNCTION_TYPE_TOP,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateTop, .translateFunc = translateTopBot,
.getEnvFunc = getTopBotFuncEnv, .getEnvFunc = getTopBotFuncEnv,
.initFunc = functionSetup, .initFunc = topBotFunctionSetup,
.processFunc = topFunction, .processFunc = topFunction,
.finalizeFunc = topBotFinalize, .finalizeFunc = topBotFinalize,
.combineFunc = topCombine, .combineFunc = topCombine,
.pPartialFunc = "_top_partial",
.pMergeFunc = "_top_merge"
},
{
.name = "_top_partial",
.type = FUNCTION_TYPE_TOP_PARTIAL,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateTopBotPartial,
.getEnvFunc = getTopBotFuncEnv,
.initFunc = topBotFunctionSetup,
.processFunc = topFunction,
.finalizeFunc = topBotPartialFinalize,
.combineFunc = topCombine,
},
{
.name = "_top_merge",
.type = FUNCTION_TYPE_TOP_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateTopBotMerge,
.getEnvFunc = getTopBotMergeFuncEnv,
.initFunc = functionSetup,
.processFunc = topFunctionMerge,
.finalizeFunc = topBotMergeFinalize,
.combineFunc = topCombine,
}, },
{ {
.name = "bottom", .name = "bottom",
.type = FUNCTION_TYPE_BOTTOM, .type = FUNCTION_TYPE_BOTTOM,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateBottom, .translateFunc = translateTopBot,
.getEnvFunc = getTopBotFuncEnv, .getEnvFunc = getTopBotFuncEnv,
.initFunc = functionSetup, .initFunc = topBotFunctionSetup,
.processFunc = bottomFunction, .processFunc = bottomFunction,
.finalizeFunc = topBotFinalize, .finalizeFunc = topBotFinalize,
.combineFunc = bottomCombine, .combineFunc = bottomCombine,
.pPartialFunc = "_bottom_partial",
.pMergeFunc = "_bottom_merge"
},
{
.name = "_bottom_partial",
.type = FUNCTION_TYPE_BOTTOM_PARTIAL,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateTopBotPartial,
.getEnvFunc = getTopBotFuncEnv,
.initFunc = topBotFunctionSetup,
.processFunc = bottomFunction,
.finalizeFunc = topBotPartialFinalize,
.combineFunc = bottomCombine,
},
{
.name = "_bottom_merge",
.type = FUNCTION_TYPE_BOTTOM_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateTopBotMerge,
.getEnvFunc = getTopBotMergeFuncEnv,
.initFunc = functionSetup,
.processFunc = bottomFunctionMerge,
.finalizeFunc = topBotMergeFinalize,
.combineFunc = bottomCombine,
}, },
{ {
.name = "spread", .name = "spread",
......
...@@ -66,6 +66,9 @@ typedef struct STopBotResItem { ...@@ -66,6 +66,9 @@ typedef struct STopBotResItem {
} STopBotResItem; } STopBotResItem;
typedef struct STopBotRes { typedef struct STopBotRes {
int32_t maxSize;
int16_t type; //store the original input type, used in merge function
int32_t numOfItems;
STopBotResItem* pItems; STopBotResItem* pItems;
} STopBotRes; } STopBotRes;
...@@ -2647,12 +2650,35 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { ...@@ -2647,12 +2650,35 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
return numOfElems; return numOfElems;
} }
int32_t getTopBotInfoSize(int64_t numOfItems) {
return sizeof(STopBotRes) + numOfItems * sizeof(STopBotResItem);
}
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SValueNode* pkNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); SValueNode* pkNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem); pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem);
return true; return true;
} }
bool getTopBotMergeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
//intermediate result is binary and length contains VAR header size
pEnv->calcMemSize = pFunc->node.resType.bytes;
return true;
}
bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
if (!functionSetup(pCtx, pResInfo)) {
return false;
}
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
pRes->maxSize = pCtx->param[1].param.i;
return true;
}
static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) { static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
...@@ -2664,6 +2690,8 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) { ...@@ -2664,6 +2690,8 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery); uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery);
static void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery);
int32_t topFunction(SqlFunctionCtx* pCtx) { int32_t topFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0; int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
...@@ -2671,7 +2699,8 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { ...@@ -2671,7 +2699,8 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0]; SColumnInfoData* pCol = pInput->pData[0];
int32_t type = pInput->pData[0]->info.type; STopBotRes* pRes = getTopBotOutputInfo(pCtx);
pRes->type = pInput->pData[0]->info.type;
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < pInput->numOfRows + start; ++i) { for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
...@@ -2681,7 +2710,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { ...@@ -2681,7 +2710,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
numOfElems++; numOfElems++;
char* data = colDataGetData(pCol, i); char* data = colDataGetData(pCol, i);
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, true); doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2694,7 +2723,8 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) { ...@@ -2694,7 +2723,8 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0]; SColumnInfoData* pCol = pInput->pData[0];
int32_t type = pInput->pData[0]->info.type; STopBotRes* pRes = getTopBotOutputInfo(pCtx);
pRes->type = pInput->pData[0]->info.type;
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < pInput->numOfRows + start; ++i) { for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
...@@ -2704,8 +2734,53 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) { ...@@ -2704,8 +2734,53 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
numOfElems++; numOfElems++;
char* data = colDataGetData(pCol, i); char* data = colDataGetData(pCol, i);
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, false); doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false);
}
return TSDB_CODE_SUCCESS;
}
static void topBotTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput, bool isTopQuery) {
for (int32_t i = 0; i < pInput->numOfItems; i++) {
addResult(pCtx, &pInput->pItems[i], pInput->type, isTopQuery);
} }
}
int32_t topFunctionMerge(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
int32_t start = pInput->startRowIndex;
char* data = colDataGetData(pCol, start);
STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data);
STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
pInfo->maxSize = pInputInfo->maxSize;
pInfo->type = pInputInfo->type;
topBotTransferInfo(pCtx, pInputInfo, true);
SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes);
return TSDB_CODE_SUCCESS;
}
int32_t bottomFunctionMerge(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
int32_t start = pInput->startRowIndex;
char* data = colDataGetData(pCol, start);
STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data);
STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
pInfo->maxSize = pInputInfo->maxSize;
pInfo->type = pInputInfo->type;
topBotTransferInfo(pCtx, pInputInfo, false);
SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2740,7 +2815,6 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par ...@@ -2740,7 +2815,6 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par
void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) { uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) {
STopBotRes* pRes = getTopBotOutputInfo(pCtx); STopBotRes* pRes = getTopBotOutputInfo(pCtx);
int32_t maxSize = pCtx->param[1].param.i;
SVariant val = {0}; SVariant val = {0};
taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type); taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type);
...@@ -2749,7 +2823,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData ...@@ -2749,7 +2823,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
assert(pItems != NULL); assert(pItems != NULL);
// not full yet // not full yet
if (pEntryInfo->numOfRes < maxSize) { if (pEntryInfo->numOfRes < pRes->maxSize) {
STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes]; STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
pItem->v = val; pItem->v = val;
pItem->uid = uid; pItem->uid = uid;
...@@ -2759,6 +2833,8 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData ...@@ -2759,6 +2833,8 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
// allocate the buffer and keep the data of this row into the new allocated buffer // allocate the buffer and keep the data of this row into the new allocated buffer
pEntryInfo->numOfRes++; pEntryInfo->numOfRes++;
// accumulate number of items for each vgroup, this info is needed for merge
pRes->numOfItems++;
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
!isTopQuery); !isTopQuery);
} else { // replace the minimum value in the result } else { // replace the minimum value in the result
...@@ -2860,11 +2936,11 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS ...@@ -2860,11 +2936,11 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
releaseBufPage(pCtx->pBuf, pPage); releaseBufPage(pCtx->pBuf, pPage);
} }
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t topBotFinalizeImpl(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, bool isMerge) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
int32_t type = pCtx->input.pData[0]->info.type; int16_t type = pCtx->input.pData[0]->info.type;
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
...@@ -2880,29 +2956,58 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -2880,29 +2956,58 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false); colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false);
} }
if (!isMerge) {
setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow); setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
}
currentRow += 1; currentRow += 1;
} }
return pEntryInfo->numOfRes; return pEntryInfo->numOfRes;
} }
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return topBotFinalizeImpl(pCtx, pBlock, false);
}
int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return topBotFinalizeImpl(pCtx, pBlock, true);
}
int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = getTopBotInfoSize(pRes->maxSize);
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
memcpy(varDataVal(res), pRes, resultBytes);
varDataSetLen(res, resultBytes);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
colDataAppend(pCol, pBlock->info.rows, res, false);
taosMemoryFree(res);
return 1;
}
void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type,
bool isTopQuery) { bool isTopQuery) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = getTopBotOutputInfo(pCtx); STopBotRes* pRes = getTopBotOutputInfo(pCtx);
int32_t maxSize = pCtx->param[1].param.i;
STopBotResItem* pItems = pRes->pItems; STopBotResItem* pItems = pRes->pItems;
assert(pItems != NULL); assert(pItems != NULL);
// not full yet // not full yet
if (pEntryInfo->numOfRes < maxSize) { if (pEntryInfo->numOfRes < pRes->maxSize) {
STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes]; STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
pItem->v = pSourceItem->v; pItem->v = pSourceItem->v;
pItem->uid = pSourceItem->uid; pItem->uid = pSourceItem->uid;
pItem->tuplePos.pageId = -1; pItem->tuplePos.pageId = -1;
replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos); replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos);
pEntryInfo->numOfRes++; pEntryInfo->numOfRes++;
// accumulate number of items for each vgroup, this info is needed for merge
pRes->numOfItems++;
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
!isTopQuery); !isTopQuery);
} else { // replace the minimum value in the result } else { // replace the minimum value in the result
...@@ -2948,15 +3053,15 @@ int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { ...@@ -2948,15 +3053,15 @@ int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getSpreadInfoSize() {
return (int32_t)sizeof(SSpreadInfo);
}
bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SSpreadInfo); pEnv->calcMemSize = sizeof(SSpreadInfo);
return true; return true;
} }
int32_t getSpreadInfoSize() {
return (int32_t)sizeof(SSpreadInfo);
}
bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) { if (!functionSetup(pCtx, pResultInfo)) {
return false; return false;
...@@ -3083,7 +3188,7 @@ int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -3083,7 +3188,7 @@ int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = (int32_t)sizeof(SSpreadInfo); int32_t resultBytes = getSpreadInfoSize();
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
memcpy(varDataVal(res), pInfo, resultBytes); memcpy(varDataVal(res), pInfo, resultBytes);
......
...@@ -202,6 +202,27 @@ bool fmIsInvertible(int32_t funcId) { ...@@ -202,6 +202,27 @@ bool fmIsInvertible(int32_t funcId) {
return res; return res;
} }
//function has same input/output type
bool fmIsSameInOutType(int32_t funcId) {
bool res = false;
switch (funcMgtBuiltins[funcId].type) {
case FUNCTION_TYPE_MAX:
case FUNCTION_TYPE_MIN:
case FUNCTION_TYPE_TOP:
case FUNCTION_TYPE_BOTTOM:
case FUNCTION_TYPE_FIRST:
case FUNCTION_TYPE_LAST:
case FUNCTION_TYPE_SAMPLE:
case FUNCTION_TYPE_TAIL:
case FUNCTION_TYPE_UNIQUE:
res = true;
break;
default:
break;
}
return res;
}
static int32_t getFuncInfo(SFunctionNode* pFunc) { static int32_t getFuncInfo(SFunctionNode* pFunc) {
char msg[64] = {0}; char msg[64] = {0};
if (NULL != gFunMgtService.pFuncNameHashTable) { if (NULL != gFunMgtService.pFuncNameHashTable) {
...@@ -276,6 +297,10 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio ...@@ -276,6 +297,10 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
nodesDestroyList(pParameterList); nodesDestroyList(pParameterList);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
//overwrite function restype set by translate function
if (fmIsSameInOutType(pSrcFunc->funcId)) {
(*pMergeFunc)->node.resType = pSrcFunc->node.resType;
}
strcpy((*pMergeFunc)->node.aliasName, pSrcFunc->node.aliasName); strcpy((*pMergeFunc)->node.aliasName, pSrcFunc->node.aliasName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册