From a030a9f32fdc1e9dc04f7ea00b5a1922cdea5ab5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 9 May 2022 16:06:15 +0800 Subject: [PATCH] enh(query):support selectivity function and normal column data query. --- source/libs/executor/src/executorimpl.c | 41 +- source/libs/function/inc/builtinsimpl.h | 6 +- source/libs/function/src/builtins.c | 20 +- source/libs/function/src/builtinsimpl.c | 510 ++++++++++++++++++------ 4 files changed, 419 insertions(+), 158 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1327ddb48e..bd9bbec666 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include "filter.h" #include "function.h" #include "functionMgt.h" @@ -803,7 +804,8 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { if (functionNeedToExecute(&pCtx[k])) { pCtx[k].startTs = startTs; // this can be set during create the struct - pCtx[k].fpSet.process(&pCtx[k]); + if (pCtx[k].fpSet.process != NULL) + pCtx[k].fpSet.process(&pCtx[k]); } } } @@ -1074,35 +1076,36 @@ void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* // set the output buffer for the selectivity + tag query static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) { int32_t num = 0; - int16_t tagLen = 0; SqlFunctionCtx* p = NULL; - SqlFunctionCtx** pTagCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES); - if (pTagCtx == NULL) { + SqlFunctionCtx** pValCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES); + if (pValCtx == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } for (int32_t i = 0; i < numOfOutput; ++i) { - int32_t functionId = pCtx[i].functionId; - - if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) { - tagLen += pCtx[i].resDataInfo.bytes; - pTagCtx[num++] = &pCtx[i]; - } else if (1 /*(aAggs[functionId].status & FUNCSTATE_SELECTIVITY) != 0*/) { - p = &pCtx[i]; - } else if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG) { - // tag function may be the group by tag column - // ts may be the required primary timestamp column - continue; + if (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_select_value") == 0) { + pValCtx[num++] = &pCtx[i]; } else { - // the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ + p = &pCtx[i]; } +// if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) { +// tagLen += pCtx[i].resDataInfo.bytes; +// pTagCtx[num++] = &pCtx[i]; +// } else if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG) { +// // tag function may be the group by tag column +// // ts may be the required primary timestamp column +// continue; +// } else { +// // the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ +// } } + if (p != NULL) { - p->subsidiaries.pCtx = pTagCtx; + p->subsidiaries.pCtx = pValCtx; p->subsidiaries.num = num; } else { - taosMemoryFreeClear(pTagCtx); + taosMemoryFreeClear(pValCtx); } return TSDB_CODE_SUCCESS; @@ -2219,6 +2222,8 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset); if (pCtx[j].fpSet.process) { pCtx[j].fpSet.finalize(&pCtx[j], pBlock); + } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { + // do nothing, todo refactor } else { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 1f2ad0797d..97832d007e 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -37,11 +37,11 @@ bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t sumFunction(SqlFunctionCtx *pCtx); int32_t sumInvertFunction(SqlFunctionCtx *pCtx); -bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); -bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +bool minmaxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t minFunction(SqlFunctionCtx* pCtx); int32_t maxFunction(SqlFunctionCtx *pCtx); +int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); @@ -82,6 +82,8 @@ bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultIn int32_t histogramFunction(SqlFunctionCtx* pCtx); int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); + #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 5aa1b63c79..a7b2607778 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -509,9 +509,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateInOutNum, .dataRequiredFunc = statisDataRequired, .getEnvFunc = getMinmaxFuncEnv, - .initFunc = minFunctionSetup, + .initFunc = minmaxFunctionSetup, .processFunc = minFunction, - .finalizeFunc = functionFinalize + .finalizeFunc = minmaxFunctionFinalize }, { .name = "max", @@ -520,9 +520,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateInOutNum, .dataRequiredFunc = statisDataRequired, .getEnvFunc = getMinmaxFuncEnv, - .initFunc = maxFunctionSetup, + .initFunc = minmaxFunctionSetup, .processFunc = maxFunction, - .finalizeFunc = functionFinalize + .finalizeFunc = minmaxFunctionFinalize }, { .name = "stddev", @@ -562,7 +562,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateApercentile, .getEnvFunc = getMinmaxFuncEnv, - .initFunc = maxFunctionSetup, + .initFunc = minmaxFunctionSetup, .processFunc = maxFunction, .finalizeFunc = functionFinalize }, @@ -581,8 +581,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_BOTTOM, .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateBottom, - .getEnvFunc = getMinmaxFuncEnv, - .initFunc = maxFunctionSetup, + .getEnvFunc = getTopBotFuncEnv, + .initFunc = functionSetup, .processFunc = maxFunction, .finalizeFunc = functionFinalize }, @@ -603,7 +603,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC, .translateFunc = translateLastRow, .getEnvFunc = getMinmaxFuncEnv, - .initFunc = maxFunctionSetup, + .initFunc = minmaxFunctionSetup, .processFunc = maxFunction, .finalizeFunc = functionFinalize }, @@ -1032,8 +1032,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_SELECT_VALUE, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC, .translateFunc = translateSelectValue, - .getEnvFunc = NULL, - .initFunc = NULL, + .getEnvFunc = getSelectivityFuncEnv, // todo remove this function later. + .initFunc = functionSetup, .sprocessFunc = NULL, .finalizeFunc = NULL } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 9c1601b61a..3d77b7b218 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -37,13 +37,15 @@ typedef struct SAvgRes { int64_t count; } SAvgRes; +typedef struct STuplePos { + int32_t pageId; + int32_t offset; +} STuplePos; + typedef struct STopBotResItem { - SVariant v; - uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data - struct { - int32_t pageId; - int32_t offset; - } tuplePos; // tuple data of this chosen row + SVariant v; + uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data + STuplePos tuplePos; // tuple data of this chosen row } STopBotResItem; typedef struct STopBotRes { @@ -616,101 +618,25 @@ EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin return FUNC_DATA_REQUIRED_STATIS_LOAD; } -bool maxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { - if (!functionSetup(pCtx, pResultInfo)) { - return false; - } - - char* buf = GET_ROWCELL_INTERBUF(pResultInfo); - switch (pCtx->resDataInfo.type) { - case TSDB_DATA_TYPE_INT: - *((int32_t*)buf) = INT32_MIN; - break; - case TSDB_DATA_TYPE_UINT: - *((uint32_t*)buf) = 0; - break; - case TSDB_DATA_TYPE_FLOAT: - *((float*)buf) = -FLT_MAX; - break; - case TSDB_DATA_TYPE_DOUBLE: - SET_DOUBLE_VAL(((double*)buf), -DBL_MAX); - break; - case TSDB_DATA_TYPE_BIGINT: - *((int64_t*)buf) = INT64_MIN; - break; - case TSDB_DATA_TYPE_UBIGINT: - *((uint64_t*)buf) = 0; - break; - case TSDB_DATA_TYPE_SMALLINT: - *((int16_t*)buf) = INT16_MIN; - break; - case TSDB_DATA_TYPE_USMALLINT: - *((uint16_t*)buf) = 0; - break; - case TSDB_DATA_TYPE_TINYINT: - *((int8_t*)buf) = INT8_MIN; - break; - case TSDB_DATA_TYPE_UTINYINT: - *((uint8_t*)buf) = 0; - break; - case TSDB_DATA_TYPE_BOOL: - *((int8_t*)buf) = 0; - break; - default: - assert(0); - } - return true; -} +typedef struct SMinmaxResInfo { + bool assign; // assign the first value or not + int64_t v; + STuplePos tuplePos; +} SMinmaxResInfo; -bool minFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { +bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { if (!functionSetup(pCtx, pResultInfo)) { return false; // not initialized since it has been initialized } - char* buf = GET_ROWCELL_INTERBUF(pResultInfo); - switch (pCtx->resDataInfo.type) { - case TSDB_DATA_TYPE_TINYINT: - *((int8_t*)buf) = INT8_MAX; - break; - case TSDB_DATA_TYPE_UTINYINT: - *(uint8_t*)buf = UINT8_MAX; - break; - case TSDB_DATA_TYPE_SMALLINT: - *((int16_t*)buf) = INT16_MAX; - break; - case TSDB_DATA_TYPE_USMALLINT: - *((uint16_t*)buf) = UINT16_MAX; - break; - case TSDB_DATA_TYPE_INT: - *((int32_t*)buf) = INT32_MAX; - break; - case TSDB_DATA_TYPE_UINT: - *((uint32_t*)buf) = UINT32_MAX; - break; - case TSDB_DATA_TYPE_BIGINT: - *((int64_t*)buf) = INT64_MAX; - break; - case TSDB_DATA_TYPE_UBIGINT: - *((uint64_t*)buf) = UINT64_MAX; - break; - case TSDB_DATA_TYPE_FLOAT: - *((float*)buf) = FLT_MAX; - break; - case TSDB_DATA_TYPE_DOUBLE: - SET_DOUBLE_VAL(((double*)buf), DBL_MAX); - break; - case TSDB_DATA_TYPE_BOOL: - *((int8_t*)buf) = 1; - break; - default: - assert(0); - } - + SMinmaxResInfo* buf = GET_ROWCELL_INTERBUF(pResultInfo); + buf->assign = false; + buf->tuplePos.pageId = -1; return true; } bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { - pEnv->calcMemSize = sizeof(int64_t); + pEnv->calcMemSize = sizeof(SMinmaxResInfo); return true; } @@ -758,6 +684,9 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { } \ } while (0) +static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); +static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); + int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { int32_t numOfElems = 0; @@ -768,7 +697,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { int32_t type = pCol->info.type; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - char* buf = GET_ROWCELL_INTERBUF(pResInfo); + SMinmaxResInfo *pBuf = GET_ROWCELL_INTERBUF(pResInfo); // data in current data block are qualified to the query if (pInput->colDataAggIsSet) { @@ -795,11 +724,11 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (IS_SIGNED_NUMERIC_TYPE(type)) { int64_t prev = 0; - GET_TYPED_DATA(prev, int64_t, type, buf); + GET_TYPED_DATA(prev, int64_t, type, pBuf->v); int64_t val = GET_INT64_VAL(tval); if ((prev < val) ^ isMinFunc) { - *(int64_t*)buf = val; + pBuf->v = val; for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) { SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i]; if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor @@ -809,14 +738,16 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { __ctx->fpSet.process(__ctx); } + + saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); } } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { uint64_t prev = 0; - GET_TYPED_DATA(prev, uint64_t, type, buf); + GET_TYPED_DATA(prev, uint64_t, type, pBuf->v); uint64_t val = GET_UINT64_VAL(tval); if ((prev < val) ^ isMinFunc) { - *(uint64_t*)buf = val; + pBuf->v = val; for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) { SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i]; if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor @@ -829,12 +760,13 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { } } else if (type == TSDB_DATA_TYPE_DOUBLE) { double val = GET_DOUBLE_VAL(tval); - UPDATE_DATA(pCtx, *(double*)buf, val, numOfElems, isMinFunc, key); + UPDATE_DATA(pCtx, *(double*)&pBuf->v, val, numOfElems, isMinFunc, key); } else if (type == TSDB_DATA_TYPE_FLOAT) { double val = GET_DOUBLE_VAL(tval); - UPDATE_DATA(pCtx, *(float*)buf, val, numOfElems, isMinFunc, key); + UPDATE_DATA(pCtx, *(float*)&pBuf->v, val, numOfElems, isMinFunc, key); } + pBuf->assign = true; return numOfElems; } @@ -843,47 +775,318 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) { if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) { - LOOPCHECK_N(*(int8_t*)buf, pCol, pCtx, int8_t, numOfRows, start, isMinFunc, numOfElems); + int8_t* pData = (int8_t*)pCol->pData; + int8_t* val = (int8_t*)&pBuf->v; + + for (int32_t i = start; i < start + numOfRows; ++i) { + if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + if (!pBuf->assign) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } + + numOfElems += 1; + } } else if (type == TSDB_DATA_TYPE_SMALLINT) { - LOOPCHECK_N(*(int16_t*)buf, pCol, pCtx, int16_t, numOfRows, start, isMinFunc, numOfElems); + int16_t* pData = (int16_t*)pCol->pData; + int16_t* val = (int16_t*)&pBuf->v; + + for (int32_t i = start; i < start + numOfRows; ++i) { + if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + if (!pBuf->assign) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } + + numOfElems += 1; + } } else if (type == TSDB_DATA_TYPE_INT) { int32_t* pData = (int32_t*)pCol->pData; - int32_t* val = (int32_t*)buf; + int32_t* val = (int32_t*)&pBuf->v; for (int32_t i = start; i < start + numOfRows; ++i) { if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { continue; } - if ((*val < pData[i]) ^ isMinFunc) { + if (!pBuf->assign) { *val = pData[i]; - TSKEY ts = (pCtx->ptsList != NULL) ? GET_TS_DATA(pCtx, i) : 0; - DO_UPDATE_SUBSID_RES(pCtx, ts); + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } } numOfElems += 1; } - -#if defined(_DEBUG_VIEW) - qDebug("max value updated:%d", *retVal); -#endif } else if (type == TSDB_DATA_TYPE_BIGINT) { - LOOPCHECK_N(*(int64_t*)buf, pCol, pCtx, int64_t, numOfRows, start, isMinFunc, numOfElems); + int64_t* pData = (int64_t*)pCol->pData; + int64_t* val = (int64_t*)&pBuf->v; + + for (int32_t i = start; i < start + numOfRows; ++i) { + if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + if (!pBuf->assign) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } + + numOfElems += 1; + } } } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { if (type == TSDB_DATA_TYPE_UTINYINT) { - LOOPCHECK_N(*(uint8_t*)buf, pCol, pCtx, uint8_t, numOfRows, start, isMinFunc, numOfElems); + uint8_t* pData = (uint8_t*)pCol->pData; + uint8_t* val = (uint8_t*)&pBuf->v; + + for (int32_t i = start; i < start + numOfRows; ++i) { + if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + if (!pBuf->assign) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } + + numOfElems += 1; + } } else if (type == TSDB_DATA_TYPE_USMALLINT) { - LOOPCHECK_N(*(uint16_t*)buf, pCol, pCtx, uint16_t, numOfRows, start, isMinFunc, numOfElems); + uint16_t* pData = (uint16_t*)pCol->pData; + uint16_t* val = (uint16_t*)&pBuf->v; + + for (int32_t i = start; i < start + numOfRows; ++i) { + if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + if (!pBuf->assign) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } + + numOfElems += 1; + } } else if (type == TSDB_DATA_TYPE_UINT) { - LOOPCHECK_N(*(uint32_t*)buf, pCol, pCtx, uint32_t, numOfRows, start, isMinFunc, numOfElems); + uint32_t* pData = (uint32_t*)pCol->pData; + uint32_t* val = (uint32_t*)&pBuf->v; + + for (int32_t i = start; i < start + numOfRows; ++i) { + if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + if (!pBuf->assign) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } + + numOfElems += 1; + } } else if (type == TSDB_DATA_TYPE_UBIGINT) { - LOOPCHECK_N(*(uint64_t*)buf, pCol, pCtx, uint64_t, numOfRows, start, isMinFunc, numOfElems); + uint64_t* pData = (uint64_t*)pCol->pData; + uint64_t* val = (uint64_t*)&pBuf->v; + + for (int32_t i = start; i < start + numOfRows; ++i) { + if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + if (!pBuf->assign) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } + + numOfElems += 1; + } } } else if (type == TSDB_DATA_TYPE_DOUBLE) { - LOOPCHECK_N(*(double*)buf, pCol, pCtx, double, numOfRows, start, isMinFunc, numOfElems); + double* pData = (double*)pCol->pData; + double* val = (double*)&pBuf->v; + + for (int32_t i = start; i < start + numOfRows; ++i) { + if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + if (!pBuf->assign) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } + + numOfElems += 1; + } } else if (type == TSDB_DATA_TYPE_FLOAT) { - LOOPCHECK_N(*(float*)buf, pCol, pCtx, float, numOfRows, start, isMinFunc, numOfElems); + float* pData = (float*)pCol->pData; + float* val = (float*)&pBuf->v; + + for (int32_t i = start; i < start + numOfRows; ++i) { + if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + if (!pBuf->assign) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + pBuf->assign = true; + } else { + // ignore the equivalent data value + if ((*val) == pData[i]) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } + + numOfElems += 1; + } } return numOfElems; @@ -901,6 +1104,53 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } +int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); + + SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + // todo assign the tag value + int32_t currentRow = pBlock->info.rows; + colDataAppend(pCol, currentRow, (const char*)&pRes->v, false); + + int32_t pageId = pRes->tuplePos.pageId; + int32_t offset = pRes->tuplePos.offset; + if (pRes->tuplePos.pageId != -1) { + SFilePage* pPage = getBufPage(pCtx->pBuf, pageId); + + bool* nullList = (bool*)((char*)pPage + offset); + char* pStart = (char*)(nullList + pCtx->pSrcBlock->info.numOfCols * sizeof(bool)); + + // todo set the offset value to optimize the performance. + for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { + SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; + + SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0]; + int32_t srcSlotId = pFuncParam->pCol->slotId; + int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; + + int32_t ps = 0; + for (int32_t k = 0; k < srcSlotId; ++k) { + SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k); + ps += pSrcCol->info.bytes; + } + + SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); + if (nullList[srcSlotId]) { + colDataAppendNULL(pDstCol, currentRow); + } else { + colDataAppend(pDstCol, currentRow, (pStart + ps), false); + } + } + } + + return pEntryInfo->numOfRes; +} + bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SStddevRes); return true; @@ -1244,6 +1494,14 @@ bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } +bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); + pEnv->calcMemSize = pNode->node.resType.bytes; + return true; +} + + + static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) { if (pTsColInfo == NULL) { return 0; @@ -1624,9 +1882,6 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) { static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo); -static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem); -static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem); - int32_t topFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); @@ -1701,7 +1956,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData pItem->uid = uid; // save the data of this tuple - saveTupleData(pCtx, rowIndex, pSrcBlock, pItem); + saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); // allocate the buffer and keep the data of this row into the new allocated buffer pEntryInfo->numOfRes++; @@ -1716,15 +1971,14 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData pItem->uid = uid; // save the data of this tuple by over writing the old data - copyTupleData(pCtx, rowIndex, pSrcBlock, pItem); - + copyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type, topBotResComparFn, NULL, false); } } } -void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) { +void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { SFilePage* pPage = NULL; int32_t completeRowSize = pSrcBlock->info.rowSize + pSrcBlock->info.numOfCols * sizeof(bool); @@ -1740,7 +1994,7 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS } } - pItem->tuplePos.pageId = pCtx->curBufPage; + pPos->pageId = pCtx->curBufPage; // keep the current row data, extract method int32_t offset = 0; @@ -1764,17 +2018,17 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS offset += pCol->info.bytes; } - pItem->tuplePos.offset = pPage->num; + pPos->offset = pPage->num; pPage->num += completeRowSize; setBufPageDirty(pPage, true); releaseBufPage(pCtx->pBuf, pPage); } -void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) { - SFilePage* pPage = getBufPage(pCtx->pBuf, pItem->tuplePos.pageId); +void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { + SFilePage* pPage = getBufPage(pCtx->pBuf, pPos->pageId); - bool* nullList = (bool*)((char*)pPage + pItem->tuplePos.offset); + bool* nullList = (bool*)((char*)pPage + pPos->offset); char* pStart = (char*)(nullList + pSrcBlock->info.numOfCols * sizeof(bool)); int32_t offset = 0; -- GitLab