From 79ffafb37025f3447365779e31561610f42e4741 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 19 Dec 2020 23:05:58 +0800 Subject: [PATCH] [TD-225] refactor. --- src/query/src/qExecutor.c | 90 ++++++++++++++++++--------------------- 1 file changed, 41 insertions(+), 49 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 95a2949950..b0c7cd3a62 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1124,6 +1124,45 @@ static TSKEY getStartTsKey(SQuery* pQuery, SDataBlockInfo* pDataBlockInfo, TSKEY return ts; } +static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray *pDataBlock, + SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep) { + if (!pRuntimeEnv->timeWindowInterpo) { + return; + } + + assert(pDataBlock != NULL); + + SQuery* pQuery = pRuntimeEnv->pQuery; + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + + SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, 0); + + TSKEY *tsCols = (TSKEY *)(pColInfo->pData); + bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP); + if (!done) { + int32_t startRowIndex = startPos; + bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startRowIndex, pDataBlockInfo->rows, pDataBlock, tsCols, win); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + } + } else { + setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); + } + + done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP); + if (!done) { + int32_t endRowIndex = startPos + (forwardStep - 1) * step; + + TSKEY endKey = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey:pDataBlockInfo->window.skey; + bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, endRowIndex, pDataBlock, tsCols, endKey, win); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + } + } else { + setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP); + } +} + /** * todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions * @param pRuntimeEnv @@ -1205,31 +1244,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * } // window start key interpolation - if (pRuntimeEnv->timeWindowInterpo) { - bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP); - if (!done) { - int32_t startRowIndex = pQuery->pos; - bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startRowIndex, pDataBlockInfo->rows, pDataBlock, tsCols, &win); - if (interp) { - setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); - } - } else { - setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); - } - - done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP); - if (!done) { - int32_t endRowIndex = pQuery->pos + (forwardStep - 1) * step; - - TSKEY endKey = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey:pDataBlockInfo->window.skey; - bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, endRowIndex, pDataBlock, tsCols, endKey, &win); - if (interp) { - setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); - } - } else { - setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP); - } - } + doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &win, pQuery->pos, forwardStep); bool pStatus = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, startPos, forwardStep, tsCols, pDataBlockInfo->rows); @@ -1260,30 +1275,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true); // window start(end) key interpolation - if (pRuntimeEnv->timeWindowInterpo) { - bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP); - if (!done) { - int32_t startRowIndex = startPos; - bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startRowIndex, pDataBlockInfo->rows, pDataBlock, tsCols, &nextWin); - if (interp) { - setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); - } - } else { - setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); - } - - done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP); - if (!done) { - int32_t endRowIndex = startPos + (forwardStep - 1)*step; - TSKEY endKey = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey:pDataBlockInfo->window.skey; - bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, endRowIndex, pDataBlock, tsCols, endKey, &nextWin); - if (interp) { - setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); - } - } else { - setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP); - } - } + doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &nextWin, startPos, forwardStep); bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); doBlockwiseApplyFunctions(pRuntimeEnv, closed, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows); -- GitLab