From d1eb6f8333197372fa4f099409941497e927e8da Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Mar 2021 16:51:45 +0800 Subject: [PATCH] [td-2895] refactor. --- src/query/src/qExecutor.c | 98 +++++++++------------------------------ 1 file changed, 21 insertions(+), 77 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index a120124aea..cefa52ff80 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -33,7 +33,6 @@ #define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN) #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) -#define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index) * (step)) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) #define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0} @@ -54,14 +53,6 @@ typedef enum SResultTsInterpType { RESULT_ROW_END_INTERP = 2, } SResultTsInterpType; -typedef struct { - int32_t status; // query status - TSKEY lastKey; // the lastKey value before query executed - STimeWindow w; // whole query time window - int32_t windowIndex; // index of active time window result for interval query - STSCursor cur; -} SQueryStatusInfo; - #if 0 static UNUSED_FUNC void *u_malloc (size_t __size) { uint32_t v = rand(); @@ -695,13 +686,14 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, } } -static UNUSED_FUNC void updateResultRowIndex(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, bool ascQuery, bool timeWindowInterpo) { - if ((pTableQueryInfo->lastKey > pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey < pTableQueryInfo->win.ekey && (!ascQuery))) { +static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, STimeWindow* pWindow, TSKEY lastKey, + bool ascQuery, bool timeWindowInterpo) { + if ((lastKey > pWindow->ekey && ascQuery) || (lastKey < pWindow->ekey && (!ascQuery))) { closeAllResultRows(pResultRowInfo); pResultRowInfo->curIndex = pResultRowInfo->size - 1; } else { - int32_t step = ascQuery? 1:-1; - doUpdateResultRowIndex(pResultRowInfo, pTableQueryInfo->lastKey - step, ascQuery, timeWindowInterpo); + int32_t step = ascQuery ? 1 : -1; + doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, timeWindowInterpo); } } @@ -1179,13 +1171,16 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc } } -static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, - STableIntervalOperatorInfo* pInfo, SSDataBlock* pSDataBlock, int32_t groupId) { +static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t groupId) { + STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info; + SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; int32_t numOfOutput = pOperatorInfo->numOfOutput; SQuery* pQuery = pRuntimeEnv->pQuery; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + bool ascQuery = QUERY_IS_ASC_QUERY(pQuery); + int32_t prevIndex = curTimeWindowIndex(pResultRowInfo); TSKEY* tsCols = NULL; @@ -1196,11 +1191,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey); } - int32_t startPos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : (pSDataBlock->info.rows - 1); + int32_t startPos = ascQuery? 0 : (pSDataBlock->info.rows - 1); TSKEY ts = getStartTsKey(pQuery, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows); STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQuery); - bool masterScan = (pRuntimeEnv->scanFlag == MASTER_SCAN) ? true : false; + bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); SResultRow* pResult = NULL; int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId, pInfo->pCtx, @@ -1277,9 +1272,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } if (pQuery->timeWindowInterpo) { - int32_t rowIndex = QUERY_IS_ASC_QUERY(pQuery)? pSDataBlock->info.rows-1:0; + int32_t rowIndex = ascQuery? (pSDataBlock->info.rows-1):0; saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex); } + + updateResultRowInfoActiveIndex(pResultRowInfo, &pQuery->window, pQuery->current->lastKey, ascQuery, pQuery->timeWindowInterpo); } static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { @@ -3174,7 +3171,7 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t groupIndex, TSKEY nextKey) { - STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; + STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; // lastKey needs to be updated pTableQueryInfo->lastKey = nextKey; @@ -3183,8 +3180,8 @@ void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, i } int64_t uid = 0; - SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, &pInfo->resultRowInfo, (char *)&groupIndex, - sizeof(groupIndex), true, uid); + SResultRow* pResultRow = + doPrepareResultRowFromKey(pRuntimeEnv, &pInfo->resultRowInfo, (char*)&groupIndex, sizeof(groupIndex), true, uid); assert (pResultRow != NULL); /* @@ -3356,16 +3353,6 @@ void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) { pTableQueryInfo->lastKey = pTableQueryInfo->win.skey; } -bool requireTimestamp(SQuery *pQuery) { - for (int32_t i = 0; i < pQuery->numOfOutput; i++) { - int32_t functionId = pQuery->pExpr1[i].base.functionId; - if ((aAggs[functionId].status & TSDB_FUNCSTATE_NEED_TS) != 0) { - return true; - } - } - return false; -} - /** * copyToOutputBuf support copy data in ascending/descending order * For interval query of both super table and table, copy the data in ascending order, since the output results are @@ -3801,49 +3788,6 @@ void queryCostStatis(SQInfo *pQInfo) { static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo); -static UNUSED_FUNC void setTableQueryHandle(SQueryRuntimeEnv* pRuntimeEnv, int32_t tableIndex) { - SQuery* pQuery = pRuntimeEnv->pQuery; - - int32_t numOfGroup = (int32_t) GET_NUM_OF_TABLEGROUP(pRuntimeEnv); - - STableQueryInfo* pCheckInfo = NULL; - if (numOfGroup == 1) { - SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0); - pCheckInfo = taosArrayGetP(group, tableIndex); - } else { - assert(numOfGroup == pRuntimeEnv->tableqinfoGroupInfo.numOfTables); - SArray *group = GET_TABLEGROUP(pRuntimeEnv, tableIndex); - pCheckInfo = taosArrayGetP(group, 0); - } - - // handle the first table - STsdbQueryCond cond = { - .twindow = {pCheckInfo->lastKey, pCheckInfo->win.ekey}, - .order = pQuery->order.order, - .colList = pQuery->colList, - .numOfCols = pQuery->numOfCols, - .loadExternalRows = isPointInterpoQuery(pQuery), - }; - - SArray *g1 = taosArrayInit(1, POINTER_BYTES); - SArray *tx = taosArrayInit(1, sizeof(STableKeyInfo)); - - STableKeyInfo info = {.pTable = pCheckInfo->pTable, .lastKey = pCheckInfo->lastKey}; - taosArrayPush(tx, &info); - - taosArrayPush(g1, &tx); - STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1}; - - if (pRuntimeEnv->pQueryHandle == NULL) { - pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &gp, pRuntimeEnv->qinfo, &pQuery->memRef); - } else { - tsdbResetQueryHandleForNewTable(pRuntimeEnv->pQueryHandle, &cond, &gp); - } - - taosArrayDestroy(tx); - taosArrayDestroy(g1); -} - static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; @@ -4437,7 +4381,7 @@ static SSDataBlock* doSTableAggregate(void* param) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - TSKEY k = (pQuery->order.order == TSDB_ORDER_ASC)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1; + TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1; setExecutionContext(pRuntimeEnv, pInfo, pOperator->numOfOutput, pQuery->current->groupIndex, k); doAggregateImpl(pOperator, pQuery->window.skey, pInfo->pCtx, pBlock); } @@ -4593,7 +4537,7 @@ static SSDataBlock* doIntervalAgg(void* param) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order); - hashIntervalAgg(pOperator, &pIntervalInfo->resultRowInfo, pIntervalInfo, pBlock, 0); + hashIntervalAgg(pOperator, &pIntervalInfo->resultRowInfo, pBlock, 0); } // restore the value @@ -4651,7 +4595,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param) { setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order); setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); - hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pIntervalInfo, pBlock, pTableQueryInfo->groupIndex); + hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex); } pOperator->status = OP_RES_TO_RETURN; -- GitLab