From 2305b05869c8238998167798ee0da6a275dab1b2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Apr 2022 10:12:30 +0800 Subject: [PATCH] refactor(query): do some internal refactor for executor. --- source/libs/executor/inc/executorimpl.h | 120 ++++++++++++------------ source/libs/executor/src/executorimpl.c | 73 ++++++++------ source/libs/function/src/builtinsimpl.c | 81 +++++++++++----- 3 files changed, 165 insertions(+), 109 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index fa86cd2a18..230ccf6ace 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -404,25 +404,29 @@ typedef struct SAggSupporter { SArray* pResultRowArrayList; // The array list that contains the Result rows char* keyBuf; // window key buffer SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file - int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row + int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row } SAggSupporter; -typedef struct STableIntervalOperatorInfo { - SOptrBasicInfo binfo; // basic info - SGroupResInfo groupResInfo; // multiple results build supporter - SInterval interval; // interval info - int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. - STimeWindow win; // query time range - bool timeWindowInterpo; // interpolation needed or not - char** pRow; // previous row/tuple of already processed datablock - SAggSupporter aggSup; // aggregate supporter - STableQueryInfo* pCurrent; // current tableQueryInfo struct - int32_t order; // current SSDataBlock scan order - EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] - SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator. +typedef struct STimeWindowSupp { + int8_t calTrigger; + int64_t waterMark; SColumnInfoData timeWindowData; // query time window info for scalar function execution. - double watermark; // water mark - int32_t trigger; // trigger model +} STimeWindowAggSupp; + +typedef struct STableIntervalOperatorInfo { + SOptrBasicInfo binfo; // basic info + SGroupResInfo groupResInfo; // multiple results build supporter + SInterval interval; // interval info + int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. + STimeWindow win; // query time range + bool timeWindowInterpo; // interpolation needed or not + char** pRow; // previous row/tuple of already processed datablock + SAggSupporter aggSup; // aggregate supporter + STableQueryInfo* pCurrent; // current tableQueryInfo struct + int32_t order; // current SSDataBlock scan order + EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] + SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator. + STimeWindowAggSupp twAggSup; } STableIntervalOperatorInfo; typedef struct SAggOperatorInfo { @@ -440,19 +444,19 @@ typedef struct SAggOperatorInfo { } SAggOperatorInfo; typedef struct SProjectOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SSDataBlock* existDataBlock; - SArray* pPseudoColInfo; - SLimit limit; - SLimit slimit; - - uint64_t groupId; - int64_t curSOffset; - int64_t curGroupOutput; - - int64_t curOffset; - int64_t curOutput; + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SSDataBlock* existDataBlock; + SArray* pPseudoColInfo; + SLimit limit; + SLimit slimit; + + uint64_t groupId; + int64_t curSOffset; + int64_t curGroupOutput; + + int64_t curOffset; + int64_t curOutput; } SProjectOperatorInfo; typedef struct SFillOperatorInfo { @@ -467,10 +471,10 @@ typedef struct SFillOperatorInfo { } SFillOperatorInfo; typedef struct { - char* pData; - bool isNull; - int16_t type; - int32_t bytes; + char* pData; + bool isNull; + int16_t type; + int32_t bytes; } SGroupKeys, SStateKeys; typedef struct SGroupbyOperatorInfo { @@ -489,9 +493,9 @@ typedef struct SGroupbyOperatorInfo { } SGroupbyOperatorInfo; typedef struct SDataGroupInfo { - uint64_t groupId; - int64_t numOfRows; - SArray* pPageList; + uint64_t groupId; + int64_t numOfRows; + SArray* pPageList; } SDataGroupInfo; // The sort in partition may be needed later. @@ -506,9 +510,8 @@ typedef struct SPartitionOperatorInfo { SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file int32_t rowCapacity; // maximum number of rows for each buffer page int32_t* columnOffset; // start position for each column data - - void* pGroupIter; // group iterator - int32_t pageIndex; // page index of current group + void* pGroupIter; // group iterator + int32_t pageIndex; // page index of current group } SPartitionOperatorInfo; typedef struct SWindowRowsSup { @@ -519,13 +522,13 @@ typedef struct SWindowRowsSup { } SWindowRowsSup; typedef struct SSessionAggOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SGroupResInfo groupResInfo; - SWindowRowsSup winSup; - bool reptScan; // next round scan - int64_t gap; // session window gap - SColumnInfoData timeWindowData; // query time window info for scalar function execution. + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SGroupResInfo groupResInfo; + SWindowRowsSup winSup; + bool reptScan; // next round scan + int64_t gap; // session window gap + STimeWindowAggSupp twAggSup; } SSessionAggOperatorInfo; typedef struct STimeSliceOperatorInfo { @@ -535,14 +538,14 @@ typedef struct STimeSliceOperatorInfo { } STimeSliceOperatorInfo; typedef struct SStateWindowOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SGroupResInfo groupResInfo; - SWindowRowsSup winSup; - int32_t colIndex; // start row index - bool hasKey; - SStateKeys stateKey; - SColumnInfoData timeWindowData; // query time window info for scalar function execution. + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SGroupResInfo groupResInfo; + SWindowRowsSup winSup; + int32_t colIndex; // start row index + bool hasKey; + SStateKeys stateKey; + STimeWindowAggSupp twAggSup; // bool reptScan; } SStateWindowOperatorInfo; @@ -640,10 +643,11 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot, - const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); + SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, + STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo); + SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, @@ -656,7 +660,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, - SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); + SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 0fdb523e64..eaf09237de 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1097,6 +1097,8 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt pInput->totalRows = pBlock->info.rows; pInput->numOfRows = pBlock->info.rows; pInput->startRowIndex = 0; + + pInput->pPTS = taosArrayGet(pBlock->pDataBlock, 0); // todo set the correct timestamp column ASSERT(pInput->pData[j] != NULL); } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { if (createDummyCol) { @@ -1516,8 +1518,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe // window start key interpolation doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep, pInfo->order, false); - updateTimeWindowInfo(&pInfo->timeWindowData, &win, true); - doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); + doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); STimeWindow nextWin = win; while (1) { @@ -1548,8 +1550,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, pInfo->order, false); - updateTimeWindowInfo(&pInfo->timeWindowData, &nextWin, true); - doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); + doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } if (pInfo->timeWindowInterpo) { @@ -1620,8 +1622,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator } // pInfo->numOfRows data belong to the current session window - updateTimeWindowInfo(&pInfo->timeWindowData, &window, false); - doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); + doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); // here we start a new session window doKeepNewWindowStartInfo(pRowSup, tsList, j); @@ -1637,8 +1639,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } - updateTimeWindowInfo(&pInfo->timeWindowData, &pRowSup->win, false); - doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); + doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { @@ -5338,8 +5340,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } - updateTimeWindowInfo(&pInfo->timeWindowData, &window, false); - doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); + doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); // here we start a new session window doKeepNewWindowStartInfo(pRowSup, tsList, j); @@ -5355,8 +5357,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } - updateTimeWindowInfo(&pInfo->timeWindowData, &pRowSup->win, false); - doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); + doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { @@ -5863,30 +5865,30 @@ _error: } SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot, - const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { + SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, + STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, + SExecTaskInfo* pTaskInfo) { STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->order = TSDB_ORDER_ASC; - pInfo->interval = *pInterval; - pInfo->execModel = pTaskInfo->execModel; - - pInfo->win = pTaskInfo->window; - pInfo->win.skey = 0; - pInfo->win.ekey = INT64_MAX; - - pInfo->primaryTsIndex = primaryTsSlot; + pInfo->order = TSDB_ORDER_ASC; + pInfo->interval = *pInterval; + pInfo->execModel = pTaskInfo->execModel; + pInfo->win = pTaskInfo->window; + pInfo->win.skey = 0; + pInfo->win.ekey = INT64_MAX; + pInfo->primaryTsIndex = primaryTsSlotId; + pInfo->twAggSup = *pTwAggSupp; int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(pOperator, numOfRows); int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); - initExecTimeWindowInfo(&pInfo->timeWindowData, &pInfo->win); + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { @@ -5955,7 +5957,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* return NULL; } -SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSup, + SExecTaskInfo* pTaskInfo) { SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5969,6 +5972,9 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); + pInfo->twAggSup = *pTwAggSup; + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + pOperator->name = "StateWindowOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW; pOperator->blockingOptr = true; @@ -5992,7 +5998,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf } SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo) { + SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo) { SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -6008,8 +6014,9 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo goto _error; } + pInfo->twAggSup = *pTwAggSupp; initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); - initExecTimeWindowInfo(&pInfo->timeWindowData, &pTaskInfo->window); + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); pInfo->gap = gap; pInfo->binfo.pRes = pResBlock; @@ -6559,8 +6566,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision }; + STimeWindowAggSupp as = {.waterMark = pIntervalPhyNode->window.watermark, .calTrigger = pIntervalPhyNode->window.triggerType}; + int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->window.pTspk)->slotId; - pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo); + pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, &as, pTableGroupInfo, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; @@ -6571,9 +6580,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; + STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark, .calTrigger = pSessionNode->window.triggerType}; + SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); + pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, &as, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode; SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys); @@ -6584,9 +6595,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == type) { SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pPhyNode; + STimeWindowAggSupp as = {.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType}; + SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo); + pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) { SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*) pPhyNode; SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 0d7e984ae7..3b674759a3 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -771,13 +771,9 @@ bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -// TODO fix this -// This ordinary first function only handle the data block in ascending order +// This ordinary first function does not care if current scan is ascending order or descending order scan +// the OPTIMIZED version of first function will only handle the ascending order scan int32_t firstFunction(SqlFunctionCtx *pCtx) { - if (pCtx->order == TSDB_ORDER_DESC) { - return 0; - } - int32_t numOfElems = 0; SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); @@ -786,29 +782,72 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; + int32_t bytes = pInputCol->info.bytes; + // All null data column, return directly. if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { ASSERT(pInputCol->hasNull == true); return 0; } - // Check for the first not null data - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { - if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) { - continue; + SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet)? pInput->pColumnDataAgg[0]:NULL; + + TSKEY startKey = *(int64_t*)(pInput->pPTS ? colDataGetData(pInput->pPTS, 0) : 0); + TSKEY endKey = *(int64_t*)(pInput->pPTS ? colDataGetData(pInput->pPTS, pInput->totalRows - 1) : 0); + + int32_t blockDataOrder = (startKey <= endKey)? TSDB_ORDER_ASC:TSDB_ORDER_DESC; + + if (blockDataOrder == TSDB_ORDER_ASC) { + // filter according to current result firstly + if (pResInfo->numOfRes > 0) { + TSKEY ts = *(TSKEY*)(buf + bytes); + if (ts < startKey) { + return TSDB_CODE_SUCCESS; + } } - char* data = colDataGetData(pInputCol, i); - memcpy(buf, data, pInputCol->info.bytes); - // TODO handle the subsidary value -// if (pCtx->ptsList != NULL) { -// TSKEY k = GET_TS_DATA(pCtx, i); -// DO_UPDATE_TAG_COLUMNS(pCtx, k); -// } + for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) { + if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { + continue; + } + + char* data = colDataGetData(pInputCol, i); + + TSKEY cts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0; + if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) { + memcpy(buf, data, bytes); + *(TSKEY*)(buf + bytes) = cts; +// DO_UPDATE_TAG_COLUMNS(pCtx, ts); + } - pResInfo->complete = true; - numOfElems++; - break; + numOfElems++; + } + } else { + // in case of descending order time stamp serial, which usually happens as the results of the nest query, + // all data needs to be check. + if (pResInfo->numOfRes > 0) { + TSKEY ts = *(TSKEY*)(buf + bytes); + if (ts < endKey) { + return TSDB_CODE_SUCCESS; + } + } + + for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { + if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { + continue; + } + + char* data = colDataGetData(pInputCol, i); + + TSKEY cts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0; + if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) { + memcpy(buf, data, bytes); + *(TSKEY*)(buf + bytes) = cts; +// DO_UPDATE_TAG_COLUMNS(pCtx, ts); + } + + numOfElems++; + } } SET_VAL(pResInfo, numOfElems, 1); @@ -829,7 +868,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) { SColumnInfoData* pInputCol = pInput->pData[0]; // All null data column, return directly. - if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) { + if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { ASSERT(pInputCol->hasNull == true); return 0; } -- GitLab