From 309076de1f929b4633c7782396c23ed5a9d2296a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 11 Mar 2022 14:11:34 +0800 Subject: [PATCH] [td-13039] add first/last query. --- include/libs/function/function.h | 2 +- source/libs/function/inc/builtinsimpl.h | 4 + source/libs/function/src/builtins.c | 22 ++++ source/libs/function/src/builtinsimpl.c | 164 ++++++++++++++++++++---- 4 files changed, 165 insertions(+), 27 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index c01e267c42..fde09e59da 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -163,7 +163,7 @@ typedef struct SInputColumnInfoData { typedef struct SqlFunctionCtx { SInputColumnInfoData input; SResultDataInfo resDataInfo; - uint32_t order; // asc|desc + uint32_t order; // data block scanner order: asc|desc //////////////////////////////////////////////////////////////// int32_t startRow; // start row index int32_t size; // handled processed row number diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 7ba7d7bdcc..3f28f4de7b 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -37,6 +37,10 @@ bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); void minFunction(SqlFunctionCtx* pCtx); void maxFunction(SqlFunctionCtx *pCtx); +bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); +void firstFunction(SqlFunctionCtx *pCtx); +void lastFunction(SqlFunctionCtx *pCtx); + #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index edb0acf075..1d9db9aa71 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -61,6 +61,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = maxFunction, .finalizeFunc = functionFinalizer }, + { + .name = "first", + .type = FUNCTION_TYPE_FIRST, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = firstFunction, + .finalizeFunc = functionFinalizer + }, + { + .name = "last", + .type = FUNCTION_TYPE_LAST, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = lastFunction, + .finalizeFunc = functionFinalizer + }, { .name = "concat", .type = FUNCTION_TYPE_CONCAT, @@ -98,6 +118,8 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { pFunc->node.resType = (SDataType) { .bytes = tDataTypes[resType].bytes, .type = resType }; break; } + case FUNCTION_TYPE_FIRST: + case FUNCTION_TYPE_LAST: case FUNCTION_TYPE_MIN: case FUNCTION_TYPE_MAX: { SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index aaaee6d56c..ccac37fd0c 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -72,13 +72,12 @@ void countFunction(SqlFunctionCtx *pCtx) { int32_t numOfElem = 0; /* - * 1. column data missing (schema modified) causes pCtx->hasNull == true. pCtx->isAggSet == true; - * 2. for general non-primary key columns, pCtx->hasNull may be true or false, pCtx->isAggSet == true; - * 3. for primary key column, pCtx->hasNull always be false, pCtx->isAggSet == false; + * 1. column data missing (schema modified) causes pInputCol->hasNull == true. pInput->colDataAggIsSet == true; + * 2. for general non-primary key columns, pInputCol->hasNull may be true or false, pInput->colDataAggIsSet == true; + * 3. for primary key column, pInputCol->hasNull always be false, pInput->colDataAggIsSet == false; */ SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - if (pInput->colDataAggIsSet && pInput->totalRows == pInput->numOfRows) { numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull; ASSERT(numOfElem >= 0); @@ -173,7 +172,7 @@ void sumFunction(SqlFunctionCtx *pCtx) { SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); } -bool getSumFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { +bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SSumRes); return true; } @@ -265,8 +264,7 @@ bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { return true; } -bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { - SNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); +bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(int64_t); return true; } @@ -278,34 +276,34 @@ bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { do { \ for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \ SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \ - __ctx->fpSet.process(__ctx); \ + __ctx->fpSet.process(__ctx); \ } \ } while (0); -#define DO_UPDATE_SUBSID_RES(ctx, ts) \ - do { \ +#define DO_UPDATE_SUBSID_RES(ctx, ts) \ + do { \ for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \ - SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \ - if (__ctx->functionId == FUNCTION_TS_DUMMY) { \ - __ctx->tag.i = (ts); \ - __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \ - } \ - __ctx->fpSet.process(__ctx); \ - } \ + SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \ + if (__ctx->functionId == FUNCTION_TS_DUMMY) { \ + __ctx->tag.i = (ts); \ + __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \ + } \ + __ctx->fpSet.process(__ctx); \ + } \ } while (0) #define UPDATE_DATA(ctx, left, right, num, sign, _ts) \ - do { \ - if (((left) < (right)) ^ (sign)) { \ - (left) = (right); \ - DO_UPDATE_SUBSID_RES(ctx, _ts); \ - (num) += 1; \ - } \ + do { \ + if (((left) < (right)) ^ (sign)) { \ + (left) = (right); \ + DO_UPDATE_SUBSID_RES(ctx, _ts); \ + (num) += 1; \ + } \ } while (0) -#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \ +#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \ do { \ - _t* d = (_t*)((_col)->pData); \ + _t *d = (_t *)((_col)->pData); \ for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \ if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \ continue; \ @@ -445,4 +443,118 @@ void minFunction(SqlFunctionCtx *pCtx) { void maxFunction(SqlFunctionCtx *pCtx) { int32_t numOfElems = doMinMaxHelper(pCtx, 0); SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); -} \ No newline at end of file +} + +bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); + pEnv->calcMemSize = pNode->node.resType.bytes; + return true; +} + +// TODO fix this +// This ordinary first function only handle the data block in ascending order +void firstFunction(SqlFunctionCtx *pCtx) { + if (pCtx->order == TSDB_ORDER_DESC) { + return; + } + + int32_t numOfElems = 0; + + struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); + char* buf = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + + SColumnInfoData* pInputCol = pInput->pData[0]; + + // All null data column, return directly. + if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) { + ASSERT(pInputCol->hasNull == true); + return; + } + + // Check for the first not null data + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) { + continue; + } + + char* data = colDataGetData(pInputCol, i); + memcpy(buf, data, pInputCol->info.bytes); + // TODO handle the subsidary value +// if (pCtx->ptsList != NULL) { +// TSKEY k = GET_TS_DATA(pCtx, i); +// DO_UPDATE_TAG_COLUMNS(pCtx, k); +// } + + pResInfo->hasResult = DATA_SET_FLAG; + pResInfo->complete = true; + + numOfElems++; + break; + } + + SET_VAL(pResInfo, numOfElems, 1); +} + +void lastFunction(SqlFunctionCtx *pCtx) { + if (pCtx->order != TSDB_ORDER_DESC) { + return; + } + + int32_t numOfElems = 0; + + struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); + char* buf = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + + SColumnInfoData* pInputCol = pInput->pData[0]; + + // All null data column, return directly. + if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) { + ASSERT(pInputCol->hasNull == true); + return; + } + + if (pCtx->order == TSDB_ORDER_DESC) { + for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { + if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) { + continue; + } + + char* data = colDataGetData(pInputCol, i); + memcpy(buf, data, pInputCol->info.bytes); + +// TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0; +// DO_UPDATE_TAG_COLUMNS(pCtx, ts); + + pResInfo->hasResult = DATA_SET_FLAG; + pResInfo->complete = true; // set query completed on this column + numOfElems++; + break; + } + } else { // ascending order + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) { + continue; + } + + char* data = colDataGetData(pInputCol, i); + TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0; + + if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) < ts) { + pResInfo->hasResult = DATA_SET_FLAG; + memcpy(buf, data, pCtx->inputBytes); + + *(TSKEY*)buf = ts; +// DO_UPDATE_TAG_COLUMNS(pCtx, ts); + } + + numOfElems++; + break; + } + } + + SET_VAL(pResInfo, numOfElems, 1); +} -- GitLab