From 75a45e3f5a492a84e8611580738b8b63dc15f1f3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 3 Mar 2021 14:39:16 +0800 Subject: [PATCH] [td-3047] refactor. --- src/query/src/qAggMain.c | 62 ++++++++++++++++++++++++--------------- src/query/src/qExecutor.c | 57 ++++++++++++++++++----------------- 2 files changed, 66 insertions(+), 53 deletions(-) diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 2fb377cb5c..f4dd1f8f8a 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -4085,37 +4085,51 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) { } if (pCtx->inputType == TSDB_DATA_TYPE_TIMESTAMP) { - *(TSKEY *) pCtx->pOutput = pCtx->startTs; + *(TSKEY *)pCtx->pOutput = pCtx->startTs; + } else if (type == TSDB_FILL_NULL) { + setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); + } else if (type == TSDB_FILL_SET_VALUE) { + tVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true); } else { - if (type == TSDB_FILL_NULL) { - setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); - } else if (type == TSDB_FILL_SET_VALUE) { - tVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true); - } else if (type == TSDB_FILL_PREV) { - assignVal(pCtx->pOutput, pCtx->pInput, pCtx->outputBytes, pCtx->inputType); - } else if (type == TSDB_FILL_NEXT) { - char* d = GET_INPUT_DATA(pCtx, 1); - assignVal(pCtx->pOutput, d, pCtx->outputBytes, pCtx->inputType); - } else if (type == TSDB_FILL_LINEAR) { - char* start = GET_INPUT_DATA(pCtx, 0); - char* end = GET_INPUT_DATA(pCtx, 1); - + if (pCtx->start.key != INT64_MIN && pCtx->start.key < pCtx->startTs && pCtx->end.key > pCtx->startTs) { + // the value of prev/next/linear interpolation is placed in pCtx->start + if (IS_NUMERIC_TYPE(pCtx->inputType) || pCtx->inputType == TSDB_DATA_TYPE_BOOL) { + SET_TYPED_DATA(pCtx->pOutput, pCtx->inputType, pCtx->start.val); + } else { + assignVal(pCtx->pOutput, pCtx->start.ptr, pCtx->outputBytes, pCtx->inputType); + } + } else { + // check the timestamp in input buffer TSKEY skey = GET_TS_DATA(pCtx, 0); TSKEY ekey = GET_TS_DATA(pCtx, 1); - SPoint point1 = {.key = skey, .val = start}; - SPoint point2 = {.key = ekey, .val = end }; - SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput}; + assert(pCtx->start.key == INT64_MIN && skey < pCtx->startTs && ekey > pCtx->startTs); - int32_t srcType = pCtx->inputType; - if (IS_NUMERIC_TYPE(srcType)) { // TODO should find the not null data? - if (isNull(start, srcType) || isNull(end, srcType)) { - setNull(pCtx->pOutput, srcType, pCtx->inputBytes); + if (type == TSDB_FILL_PREV) { + assignVal(pCtx->pOutput, pCtx->pInput, pCtx->outputBytes, pCtx->inputType); + } else if (type == TSDB_FILL_NEXT) { + char* val = pCtx->pInput + pCtx->inputBytes; + assignVal(pCtx->pOutput, val, pCtx->outputBytes, pCtx->inputType); + } else if (type == TSDB_FILL_LINEAR) { + char *start = GET_INPUT_DATA(pCtx, 0); + char *end = GET_INPUT_DATA(pCtx, 1); + + + + SPoint point1 = {.key = skey, .val = start}; + SPoint point2 = {.key = ekey, .val = end}; + SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput}; + + int32_t srcType = pCtx->inputType; + if (IS_NUMERIC_TYPE(srcType)) { // TODO should find the not null data? + if (isNull(start, srcType) || isNull(end, srcType)) { + setNull(pCtx->pOutput, srcType, pCtx->inputBytes); + } else { + taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, srcType); + } } else { - taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, srcType); + setNull(pCtx->pOutput, srcType, pCtx->inputBytes); } - } else { - setNull(pCtx->pOutput, srcType, pCtx->inputBytes); } } } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 3439e48d60..35aa7b487d 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1126,9 +1126,8 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC } } -void doRowwiseTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, - SArray *pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, - int32_t type) { +void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs, + int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type) { SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; SExprInfo* pExpr = pOperator->pExpr; @@ -1156,11 +1155,19 @@ void doRowwiseTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes); - SPoint point1 = (SPoint){.key = prevTs, .val = &v1}; - SPoint point2 = (SPoint){.key = curTs, .val = &v2}; - SPoint point = (SPoint){.key = windowKey, .val = &v}; + if (functionId == TSDB_FUNC_INTERP) { + if (type == RESULT_ROW_START_INTERP) { + pCtx[k].start.key = prevTs; + pCtx[k].start.val = v1; + + pCtx[k].end.key = curTs; + pCtx[k].end.val = v2; + } + } else if (functionId == TSDB_FUNC_TWA) { + SPoint point1 = (SPoint){.key = prevTs, .val = &v1}; + SPoint point2 = (SPoint){.key = curTs, .val = &v2}; + SPoint point = (SPoint){.key = windowKey, .val = &v }; - if (functionId == TSDB_FUNC_TWA) { taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE); if (type == RESULT_ROW_START_INTERP) { @@ -1170,49 +1177,43 @@ void doRowwiseTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pCtx[k].end.key = point.key; pCtx[k].end.val = v; } - } else { - if (type == RESULT_ROW_START_INTERP) { - pCtx[k].start.key = prevTs; - pCtx[k].start.val = v1; - - pCtx[k].end.key = curTs; - pCtx[k].end.val = v2; - } } } } -static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SQLFunctionCtx* pCtx, - int32_t pos, int32_t numOfRows, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win, int16_t type) { +static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SQLFunctionCtx* pCtx, int32_t pos, + int32_t numOfRows, SArray* pDataBlock, const TSKEY* tsCols, STimeWindow* win) { SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; SQuery* pQuery = pRuntimeEnv->pQuery; + bool ascQuery = QUERY_IS_ASC_QUERY(pQuery); + TSKEY curTs = tsCols[pos]; TSKEY lastTs = *(TSKEY *) pRuntimeEnv->prevRow[0]; // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed. // start exactly from this point, no need to do interpolation - TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->skey:win->ekey; + TSKEY key = ascQuery? win->skey:win->ekey; if (key == curTs) { setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); return true; } - if (lastTs == INT64_MIN && ((pos == 0 && QUERY_IS_ASC_QUERY(pQuery)) || (pos == (numOfRows - 1) && !QUERY_IS_ASC_QUERY(pQuery)))) { + if (lastTs == INT64_MIN && ((pos == 0 && ascQuery) || (pos == (numOfRows - 1) && !ascQuery))) { setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); return true; } int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - TSKEY prevTs = ((pos == 0 && QUERY_IS_ASC_QUERY(pQuery)) || (pos == (numOfRows - 1) && !QUERY_IS_ASC_QUERY(pQuery)))? - lastTs:tsCols[pos - step]; + TSKEY prevTs = ((pos == 0 && ascQuery) || (pos == (numOfRows - 1) && !ascQuery))? lastTs:tsCols[pos - step]; - doRowwiseTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pDataBlock, prevTs, pos - step, curTs, pos, key, RESULT_ROW_START_INTERP); + doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pDataBlock, prevTs, pos - step, curTs, pos, + key, RESULT_ROW_START_INTERP); return true; } static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SQLFunctionCtx* pCtx, - int32_t endRowIndex, SArray* pDataBlock, TSKEY* tsCols, TSKEY blockEkey, STimeWindow* win) { + int32_t endRowIndex, SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey, STimeWindow* win) { SQueryRuntimeEnv *pRuntimeEnv = pOperatorInfo->pRuntimeEnv; SQuery* pQuery = pRuntimeEnv->pQuery; int32_t numOfOutput = pOperatorInfo->numOfOutput; @@ -1238,7 +1239,8 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SQLFun assert(nextRowIndex >= 0); TSKEY nextKey = tsCols[nextRowIndex]; - doRowwiseTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key, RESULT_ROW_END_INTERP); + doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pDataBlock, actualEndKey, endRowIndex, nextKey, + nextRowIndex, key, RESULT_ROW_END_INTERP); return true; } @@ -1251,9 +1253,6 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc } assert(pBlock != NULL); - - int32_t fillType = pQuery->fillType; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0); @@ -1263,7 +1262,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc if (!done) { // it is not interpolated, now start to generated the interpolated value int32_t startRowIndex = startPos; bool interp = setTimeWindowInterpolationStartTs(pOperatorInfo, pCtx, startRowIndex, pBlock->info.rows, pBlock->pDataBlock, - tsCols, win, fillType); + tsCols, win); if (interp) { setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); } @@ -1342,7 +1341,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul numOfOutput, pInfo->rowCellInfoOffset); assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); - doRowwiseTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0], + doTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0], -1, tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); -- GitLab