diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index f3bcad8eded26a0715e9546de2b1f59c07b18fef..af9b8d4a10fb5bb66595123c8c4e394a42f89788 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -93,8 +93,10 @@ int32_t diffFunction(SqlFunctionCtx *pCtx); bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t firstFunction(SqlFunctionCtx *pCtx); +int32_t firstFunctionMerge(SqlFunctionCtx *pCtx); int32_t lastFunction(SqlFunctionCtx *pCtx); int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getFirstLastInfoSize(int32_t resBytes); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index a2849d601e2ce2ea973e2574bbfeb6c61018e297..180ffd18485ba17c8c673e6f60b09e61718f04c4 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1749,7 +1749,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, .processFunc = firstFunction, - .finalizeFunc = firstLastFinalize, + .finalizeFunc = firstLastPartialFinalize, .combineFunc = firstCombine, }, { @@ -1759,7 +1759,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateFirstLastMerge, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, - .processFunc = firstFunction, + .processFunc = firstFunctionMerge, .finalizeFunc = firstLastFinalize, .combineFunc = firstCombine, }, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 44418685b0108c3e54d7b1684d2e14372c18a86c..1bb27780bcacd4562957d05da49c607316eb6074 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -72,6 +72,12 @@ typedef struct STopBotRes { STopBotResItem* pItems; } STopBotRes; +typedef struct SFirstLastRes { + bool hasResult; + int32_t bytes; + char buf[]; +} SFirstLastRes; + typedef struct SStddevRes { double result; int64_t count; @@ -2245,12 +2251,12 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) } int32_t getFirstLastInfoSize(int32_t resBytes) { - return resBytes + sizeof(int64_t); + return sizeof(SFirstLastRes) + resBytes + sizeof(int64_t); } bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); - pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t); + pEnv->calcMemSize = sizeof(SFirstLastRes) + pNode->node.resType.bytes + sizeof(int64_t); return true; } @@ -2274,12 +2280,13 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - char* buf = GET_ROWCELL_INTERBUF(pResInfo); + SFirstLastRes *pInfo = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; int32_t bytes = pInputCol->info.bytes; + pInfo->bytes = bytes; // All null data column, return directly. if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { @@ -2297,7 +2304,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { if (blockDataOrder == TSDB_ORDER_ASC) { // filter according to current result firstly if (pResInfo->numOfRes > 0) { - TSKEY ts = *(TSKEY*)(buf + bytes); + TSKEY ts = *(TSKEY*)(pInfo->buf + bytes); if (ts < startKey) { return TSDB_CODE_SUCCESS; } @@ -2313,9 +2320,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { - memcpy(buf, data, bytes); - *(TSKEY*)(buf + bytes) = cts; + if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) { + memcpy(pInfo->buf, data, bytes); + *(TSKEY*)(pInfo->buf + bytes) = cts; + pInfo->hasResult = true; // DO_UPDATE_TAG_COLUMNS(pCtx, ts); pResInfo->numOfRes = 1; @@ -2326,7 +2334,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { // in case of descending order time stamp serial, which usually happens as the results of the nest query, // all data needs to be check. if (pResInfo->numOfRes > 0) { - TSKEY ts = *(TSKEY*)(buf + bytes); + TSKEY ts = *(TSKEY*)(pInfo->buf + bytes); if (ts < endKey) { return TSDB_CODE_SUCCESS; } @@ -2342,9 +2350,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { - memcpy(buf, data, bytes); - *(TSKEY*)(buf + bytes) = cts; + if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) { + memcpy(pInfo->buf, data, bytes); + *(TSKEY*)(pInfo->buf + bytes) = cts; + pInfo->hasResult = true; // DO_UPDATE_TAG_COLUMNS(pCtx, ts); pResInfo->numOfRes = 1; break; @@ -2422,6 +2431,39 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } +static void firstLastTransferInfo(SFirstLastRes* pInput, SFirstLastRes* pOutput) { + pOutput->bytes = pInput->bytes; + TSKEY *tsIn = (TSKEY*)(pInput->buf + pInput->bytes); + TSKEY *tsOut = (TSKEY*)(pOutput->buf + pInput->bytes); + if (pOutput->hasResult) { + if (*tsIn > *tsOut) { + return; + } + } + *tsOut = *tsIn; + memcpy(pOutput->buf, pInput->buf, pOutput->bytes); + pOutput->hasResult = true; + return; +} + +int32_t firstFunctionMerge(SqlFunctionCtx *pCtx) { + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); + + SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + int32_t start = pInput->startRowIndex; + char* data = colDataGetData(pCol, start); + SFirstLastRes* pInputInfo = (SFirstLastRes *)varDataVal(data); + + firstLastTransferInfo(pInputInfo, pInfo); + + SET_VAL(GET_RES_INFO(pCtx), 1, 1); + + return TSDB_CODE_SUCCESS; +} + int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); @@ -2435,6 +2477,24 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pResInfo->numOfRes; } +int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); + SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getFirstLastInfoSize(pRes->bytes); + char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + + memcpy(varDataVal(res), pRes, resultBytes); + varDataSetLen(res, resultBytes); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + colDataAppend(pCol, pBlock->info.rows, res, false); + + taosMemoryFree(res); + return 1; +} + int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);