From 0e98a4eadba66eafda9abbfe3f2a0e6e40f0ab6e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 27 Nov 2020 18:07:33 +0800 Subject: [PATCH] [TD-2129]: fix bug in twa calculation. --- src/client/src/tscFunctionImpl.c | 60 ++++-- src/query/inc/qExecutor.h | 11 +- src/query/inc/tsqlfunction.h | 19 +- src/query/src/qExecutor.c | 318 +++++++++++++++++++++++++++---- src/query/src/qUtil.c | 3 +- 5 files changed, 343 insertions(+), 68 deletions(-) diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 2ed476f063..0f5f4cb89a 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -64,13 +64,13 @@ } \ } while (0); -#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \ -do {\ -for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \ - SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[i]; \ - aAggs[TSDB_FUNC_TAG].xFunction(__ctx); \ - } \ -} while(0); +#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \ + do { \ + for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \ + SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[i]; \ + aAggs[TSDB_FUNC_TAG].xFunction(__ctx); \ + } \ + } while (0); void noop1(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {} void noop2(SQLFunctionCtx *UNUSED_PARAM(pCtx), int32_t UNUSED_PARAM(index)) {} @@ -3625,11 +3625,10 @@ static bool twa_function_setup(SQLFunctionCtx *pCtx) { } SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - STwaInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo); - - pInfo->lastKey = INT64_MIN; - pInfo->type = pCtx->inputType; - + + STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); + pInfo->lastKey = INT64_MIN; + pInfo->win = TSWINDOW_INITIALIZER; return true; } @@ -3640,10 +3639,24 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); - if (pInfo->lastKey == INT64_MIN) { - pInfo->lastKey = pCtx->nStartQueryTimestamp; + + if (pCtx->start.key != INT64_MIN) { + assert(pCtx->start.key < primaryKey[index] && pInfo->lastKey == INT64_MIN); + + pInfo->lastKey = primaryKey[index]; GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, 0)); + + pInfo->dOutput += ((pInfo->lastValue + pCtx->start.val) / 2) * (primaryKey[index] - pCtx->start.key); + pInfo->hasResult = DATA_SET_FLAG; + pInfo->win.skey = pCtx->start.key; + notNullElems++; + } else if (pInfo->lastKey == INT64_MIN) { + pInfo->lastKey = primaryKey[index]; + GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, 0)); + + pInfo->hasResult = DATA_SET_FLAG; + pInfo->win.skey = pInfo->lastKey; notNullElems++; } @@ -3732,6 +3745,14 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si default: assert(0); } + // the last interpolated time window value + if (pCtx->end.key != INT64_MIN) { + pInfo->dOutput += ((pInfo->lastValue + pCtx->end.val) / 2) * (pCtx->end.key - pInfo->lastKey); + pInfo->lastValue = pCtx->end.val; + pInfo->lastKey = pCtx->end.key; + } + + pInfo->win.ekey = pInfo->lastKey; return notNullElems; } @@ -3751,7 +3772,7 @@ static void twa_function(SQLFunctionCtx *pCtx) { return; } - int32_t notNullElems = twa_function_impl(pCtx, 0, pCtx->size); + int32_t notNullElems = twa_function_impl(pCtx, pCtx->startOffset, pCtx->size); SET_VAL(pCtx, notNullElems, 1); if (notNullElems > 0) { @@ -3795,8 +3816,7 @@ static void twa_func_merge(SQLFunctionCtx *pCtx) { numOfNotNull++; pBuf->dOutput += pInput->dOutput; - pBuf->SKey = pInput->SKey; - pBuf->EKey = pInput->EKey; + pBuf->win = pInput->win; pBuf->lastKey = pInput->lastKey; } @@ -3824,17 +3844,17 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); STwaInfo *pInfo = (STwaInfo *)GET_ROWCELL_INTERBUF(pResInfo); - assert(pInfo->EKey >= pInfo->lastKey && pInfo->hasResult == pResInfo->hasResult); + assert(pInfo->win.ekey == pInfo->lastKey && pInfo->hasResult == pResInfo->hasResult); if (pInfo->hasResult != DATA_SET_FLAG) { setNull(pCtx->aOutputBuf, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); return; } - if (pInfo->SKey == pInfo->EKey) { + if (pInfo->win.ekey == pInfo->win.skey) { *(double *)pCtx->aOutputBuf = pInfo->lastValue; } else { - *(double *)pCtx->aOutputBuf = pInfo->dOutput / (pInfo->EKey - pInfo->SKey); + *(double *)pCtx->aOutputBuf = pInfo->dOutput / (pInfo->win.ekey - pInfo->win.skey); } GET_RES_INFO(pCtx)->numOfRes = 1; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 895b414a56..71fc01923a 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -63,9 +63,11 @@ typedef struct SSqlGroupbyExpr { typedef struct SResultRow { int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer - int32_t rowId:15; - bool closed:1; // this result status: closed or opened - uint16_t numOfRows; // number of rows of current time window + int32_t rowId:29; // row index in buffer page + bool startInterp; // the time window start timestamp has done the interpolation already. + bool endInterp; // the time window end timestamp has done the interpolation already. + bool closed; // this result status: closed or opened + uint32_t numOfRows; // number of rows of current time window SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo union {STimeWindow win; char* key;}; // start key of current time window } SResultRow; @@ -187,6 +189,7 @@ typedef struct SQueryRuntimeEnv { bool topBotQuery; // false bool groupbyNormalCol; // denote if this is a groupby normal column query bool hasTagResults; // if there are tag values in final result or not + bool timeWindowInterpo;// if the time window start/end required interpolation int32_t interBufSize; // intermediate buffer sizse int32_t prevGroupId; // previous executed group id SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file @@ -195,6 +198,8 @@ typedef struct SQueryRuntimeEnv { SResultRowPool* pool; // window result object pool int32_t* rowCellInfoOffset;// offset value for each row result cell info + char** prevRow; + char** nextRow; } SQueryRuntimeEnv; enum { diff --git a/src/query/inc/tsqlfunction.h b/src/query/inc/tsqlfunction.h index 43f78a56ef..5a923db52c 100644 --- a/src/query/inc/tsqlfunction.h +++ b/src/query/inc/tsqlfunction.h @@ -152,6 +152,11 @@ typedef struct SResultRowCellInfo { uint32_t numOfRes; // num of output result in current buffer } SResultRowCellInfo; +typedef struct SPoint1 { + int64_t key; + double val; +} SPoint1; + #define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowCellInfo))) struct SQLFunctionCtx; @@ -194,6 +199,8 @@ typedef struct SQLFunctionCtx { SResultRowCellInfo *resultInfo; SExtTagsInfo tagInfo; + SPoint1 start; + SPoint1 end; } SQLFunctionCtx; typedef struct SQLAggFuncElem { @@ -243,13 +250,11 @@ enum { }; typedef struct STwaInfo { - TSKEY lastKey; - int8_t hasResult; // flag to denote has value - int16_t type; // source data type - TSKEY SKey; - TSKEY EKey; - double dOutput; - double lastValue; + TSKEY lastKey; + int8_t hasResult; // flag to denote has value + double dOutput; + double lastValue; + STimeWindow win; } STwaInfo; /* global sql function array */ diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 949616bb43..fd52f6ec59 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -27,6 +27,7 @@ #include "query.h" #include "queryLog.h" #include "tlosertree.h" +#include "ttype.h" #define MAX_ROWS_PER_RESBUF_PAGE ((1u<<12) - 1) @@ -194,6 +195,7 @@ static int32_t setAdditionalInfo(SQInfo *pQInfo, void *pTable, STableQueryInfo * static int32_t flushFromResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo); static int32_t checkForQueryBuf(size_t numOfTables); static void releaseQueryBuf(size_t numOfTables); +static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order); bool doFilterData(SQuery *pQuery, int32_t elemPos) { for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { @@ -400,6 +402,17 @@ static bool isTopBottomQuery(SQuery *pQuery) { return false; } +static bool timeWindowInterpoRequired(SQuery *pQuery) { + for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { + int32_t functionId = pQuery->pExpr1[i].base.functionId; + if (functionId == TSDB_FUNC_TWA) { + return true; + } + } + + return false; +} + static bool hasTagValOutput(SQuery* pQuery) { SExprInfo *pExprInfo = &pQuery->pExpr1[0]; if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) { @@ -596,7 +609,7 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf } static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, SDataBlockInfo* pBockInfo, - STimeWindow *win, bool masterscan, bool* newWind) { + STimeWindow *win, bool masterscan, bool* newWind, SResultRow** pResult) { assert(win->skey <= win->ekey); SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; @@ -604,6 +617,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes if (pResultRow == NULL) { *newWind = false; + // no master scan, no result generated means error occurs return masterscan? -1:0; } @@ -619,15 +633,40 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes // set time window for current result pResultRow->win = (*win); + *pResult = pResultRow; setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow); + return TSDB_CODE_SUCCESS; } -static bool getTimeWindowResStatus(SWindowResInfo *pWindowResInfo, int32_t slot) { +static bool getResultRowStatus(SWindowResInfo *pWindowResInfo, int32_t slot) { assert(slot >= 0 && slot < pWindowResInfo->size); return pWindowResInfo->pResult[slot]->closed; } +typedef enum SResultTsInterpType { + RESULT_ROW_START_INTERP = 1, + RESULT_ROW_END_INTERP = 2, +} SResultTsInterpType; + +static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) { + assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP)); + if (type == RESULT_ROW_START_INTERP) { + pResult->startInterp = true; + } else { + pResult->endInterp = true; + } +} + +static bool isResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) { + assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP)); + if (type == RESULT_ROW_START_INTERP) { + return pResult->startInterp == true; + } else { + return pResult->endInterp == true; + } +} + static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int16_t pos, int16_t order, int64_t *pData) { int32_t forwardStep = 0; @@ -991,6 +1030,113 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas return dataBlock; } +// window start key interpolation +static bool setTimeWindowInterpolationStartTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win) { + SQuery* pQuery = pRuntimeEnv->pQuery; + + TSKEY start = tsCols[pos]; + TSKEY lastTs = *(TSKEY *) pRuntimeEnv->prevRow[0]; + TSKEY prevTs = (pos == 0)? lastTs : tsCols[pos - 1]; + + // if lastTs == INT64_MIN, it is the first block, no need to do the start time interpolation + if (((lastTs != INT64_MIN && pos >= 0) || (lastTs == INT64_MIN && pos > 0)) && win->skey > lastTs && + win->skey < start) { + + for (int32_t k = 0; k < pQuery->numOfCols; ++k) { + SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k); + if (k == 0 && pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + assert(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); + continue; + } + + double v1 = 0, v2 = 0, v = 0; + + char *prevVal = pos == 0 ? pRuntimeEnv->prevRow[k] : pColInfo->pData + (pos - 1) * pColInfo->info.bytes; + + GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)prevVal); + GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + pos * pColInfo->info.bytes); + + SPoint point1 = (SPoint){.key = prevTs, .val = &v1}; + SPoint point2 = (SPoint){.key = start, .val = &v2}; + SPoint point = (SPoint){.key = win->skey, .val = &v}; + taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point); + pRuntimeEnv->pCtx[k].start.key = point.key; + pRuntimeEnv->pCtx[k].start.val = v; + } + + return true; + } else { + for (int32_t k = 0; k < pQuery->numOfCols; ++k) { + pRuntimeEnv->pCtx[k].start.key = INT64_MIN; + } + + return false; + } +} + +static bool setTimeWindowInterpolationEndTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, SArray* pDataBlock, TSKEY* tsCols, TSKEY ekey, STimeWindow* win) { + SQuery* pQuery = pRuntimeEnv->pQuery; + TSKEY trueEndKey = tsCols[pos]; + + if (win->ekey < ekey && win->ekey != trueEndKey) { + int32_t nextIndex = pos + 1; + TSKEY next = tsCols[nextIndex]; + + for (int32_t k = 0; k < pQuery->numOfCols; ++k) { + SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k); + if (k == 0 && pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP && + pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + continue; + } + + double v1 = 0, v2 = 0, v = 0; + GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pColInfo->pData + pos * pColInfo->info.bytes); + GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + nextIndex * pColInfo->info.bytes); + + SPoint point1 = (SPoint){.key = trueEndKey, .val = &v1}; + SPoint point2 = (SPoint){.key = next, .val = &v2}; + SPoint point = (SPoint){.key = win->ekey, .val = &v}; + taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point); + pRuntimeEnv->pCtx[k].end.key = point.key; + pRuntimeEnv->pCtx[k].end.val = v; + } + + return true; + } else { // current time window does not ended in current data block, do nothing + for (int32_t k = 0; k < pQuery->numOfCols; ++k) { + pRuntimeEnv->pCtx[k].end.key = INT64_MIN; + } + + return false; + } +} + +static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock) { + if (pDataBlock == NULL) { + return; + } + + SQuery* pQuery = pRuntimeEnv->pQuery; + for (int32_t k = 0; k < pQuery->numOfCols; ++k) { + SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k); + memcpy(pRuntimeEnv->prevRow[k], pColInfo->pData + (pColInfo->info.bytes * (pDataBlockInfo->rows - 1)), + pColInfo->info.bytes); + } +} + +static TSKEY getStartTsKey(SQuery* pQuery, SDataBlockInfo* pDataBlockInfo, TSKEY* tsCols, int32_t step) { + TSKEY ts = TSKEY_INITIAL_VAL; + + if (tsCols == NULL) { + ts = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.skey : pDataBlockInfo->window.ekey; + } else { + int32_t offset = GET_COL_DATA_POS(pQuery, 0, step); + ts = tsCols[offset]; + } + + return ts; +} + /** * todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions * @param pRuntimeEnv @@ -1004,12 +1150,12 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, SWindowResInfo *pWindowResInfo, __block_search_fn_t searchFn, SArray *pDataBlock) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; - bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); + bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); SQuery *pQuery = pRuntimeEnv->pQuery; TSKEY *tsCols = NULL; if (pDataBlock != NULL) { - SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, 0); + SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, 0); tsCols = (TSKEY *)(pColInfo->pData); } @@ -1018,7 +1164,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); + SQInfo *pQInfo = GET_QINFO_ADDR(pRuntimeEnv); for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k, pQInfo->vgId); @@ -1026,18 +1172,13 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); if (QUERY_IS_INTERVAL_QUERY(pQuery)) { - TSKEY ts = TSKEY_INITIAL_VAL; + TSKEY ts = getStartTsKey(pQuery, pDataBlockInfo, tsCols, step); - if (tsCols == NULL) { - ts = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.skey:pDataBlockInfo->window.ekey; - } else { - int32_t offset = GET_COL_DATA_POS(pQuery, 0, step); - ts = tsCols[offset]; - } - - bool hasTimeWindow = false; + bool hasTimeWindow = false; + SResultRow* pResult = NULL; STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) { + int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult); + if (ret != TSDB_CODE_SUCCESS) { tfree(sasArray); return; } @@ -1050,7 +1191,27 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * TSKEY ekey = reviseWindowEkey(pQuery, &win); forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true); - bool pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); + // window start key interpolation + if (pRuntimeEnv->timeWindowInterpo) { + bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + if (!alreadyInterp) { + bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, pQuery->pos, pDataBlock, tsCols, &win); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + } + } + + alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + if (!alreadyInterp) { + bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, pQuery->pos + forwardStep - 1, pDataBlock, tsCols, + pDataBlockInfo->window.ekey, &win); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + } + } + } + + bool pStatus = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, startPos, forwardStep, tsCols, pDataBlockInfo->rows); } @@ -1066,7 +1227,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * // null data, failed to allocate more memory buffer hasTimeWindow = false; - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) { + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow, &pResult) != + TSDB_CODE_SUCCESS) { break; } @@ -1077,7 +1239,26 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * TSKEY ekey = reviseWindowEkey(pQuery, &nextWin); forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true); - bool closed = getTimeWindowResStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); + // window start(end) key interpolation + if (pRuntimeEnv->timeWindowInterpo) { + bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + if (!alreadyInterp) { + bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startPos, pDataBlock, tsCols, &nextWin); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + } + } + + alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + if (!alreadyInterp) { + bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, startPos + forwardStep - 1, pDataBlock, tsCols, pDataBlockInfo->window.ekey, &nextWin); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + } + } + } + + bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); doBlockwiseApplyFunctions(pRuntimeEnv, closed, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows); } @@ -1097,7 +1278,11 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * } } - for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { + if (pRuntimeEnv->timeWindowInterpo) { + saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock); + } + + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { if (pQuery->pExpr1[i].base.functionId != TSDB_FUNC_ARITHM) { continue; } @@ -1320,20 +1505,20 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS pQuery->order.order, pRuntimeEnv->pTSBuf->cur.order); } - int32_t j = 0; int32_t offset = -1; +// TSKEY prev = -1; - for (j = 0; j < pDataBlockInfo->rows; ++j) { + for (int32_t j = 0; j < pDataBlockInfo->rows; ++j) { offset = GET_COL_DATA_POS(pQuery, j, step); if (pRuntimeEnv->pTSBuf != NULL) { - int32_t r = doTSJoinFilter(pRuntimeEnv, offset); - if (r == TS_JOIN_TAG_NOT_EQUALS) { + int32_t ret = doTSJoinFilter(pRuntimeEnv, offset); + if (ret == TS_JOIN_TAG_NOT_EQUALS) { break; - } else if (r == TS_JOIN_TS_NOT_EQUALS) { + } else if (ret == TS_JOIN_TS_NOT_EQUALS) { continue; } else { - assert(r == TS_JOIN_TS_EQUAL); + assert(ret == TS_JOIN_TS_EQUAL); } } @@ -1346,8 +1531,9 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS int64_t ts = tsCols[offset]; STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); - bool hasTimeWindow = false; - int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow); + bool hasTimeWindow = false; + SResultRow* pResult = NULL; + int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code continue; } @@ -1355,8 +1541,28 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS if (!hasTimeWindow) { continue; } +/* + // window start key interpolation + if (pRuntimeEnv->timeWindowInterpo) { + bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + if (!alreadyInterp) { + bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, pos, pDataBlock, tsCols, &win); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + } + } - bool closed = getTimeWindowResStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); + alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + if (!alreadyInterp) { + bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, pQuery->pos + forwardStep - 1, pDataBlock, tsCols, + pDataBlockInfo->window.ekey, &win); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + } + } + } +*/ + bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); doRowwiseApplyFunctions(pRuntimeEnv, closed, &win, offset); STimeWindow nextWin = win; @@ -1375,12 +1581,32 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS // null data, failed to allocate more memory buffer hasTimeWindow = false; - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) { + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow, &pResult) != TSDB_CODE_SUCCESS) { break; } if (hasTimeWindow) { - closed = getTimeWindowResStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); +/* + // window start(end) key interpolation + if (pRuntimeEnv->timeWindowInterpo) { + bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + if (!alreadyInterp) { + bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startPos, pDataBlock, tsCols, &nextWin); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + } + } + + alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + if (!alreadyInterp) { + bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, startPos + forwardStep - 1, pDataBlock, tsCols, pDataBlockInfo->window.ekey, &nextWin); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + } + } + } +*/ + closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); doRowwiseApplyFunctions(pRuntimeEnv, closed, &nextWin, offset); } } @@ -1405,6 +1631,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } } +// prev = tsCols[offset]; + if (pRuntimeEnv->pTSBuf != NULL) { // if timestamp filter list is empty, quit current query if (!tsBufNextPos(pRuntimeEnv->pTSBuf)) { @@ -1530,10 +1758,10 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY * top/bottom values emerge, so does diff function */ if (functionId == TSDB_FUNC_TWA) { - SResultRowCellInfo* pInfo = GET_RES_INFO(pCtx); - STwaInfo *pTWAInfo = (STwaInfo*) GET_ROWCELL_INTERBUF(pInfo); - pTWAInfo->SKey = pQuery->window.skey; - pTWAInfo->EKey = pQuery->window.ekey; + pCtx->param[1].i64Key = pQuery->window.skey; + pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT; + pCtx->param[2].i64Key = pQuery->window.ekey; + pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; } } else if (functionId == TSDB_FUNC_ARITHM) { @@ -1679,6 +1907,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order pCtx->functionId = pSqlFuncMsg->functionId; pCtx->stableQuery = pRuntimeEnv->stableQuery; pCtx->interBufBytes = pQuery->pExpr1[i].interBytes; + pCtx->start.key = INT64_MIN; + pCtx->end.key = INT64_MIN; pCtx->numOfParams = pSqlFuncMsg->numOfParams; for (int32_t j = 0; j < pCtx->numOfParams; ++j) { @@ -1713,6 +1943,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order } + *(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN; + // if it is group by normal column, do not set output buffer, the output buffer is pResult // fixed output query/multi-output query for normal table if (!pRuntimeEnv->groupbyNormalCol && !pRuntimeEnv->stableQuery && !QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) { @@ -1783,6 +2015,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { tfree(pRuntimeEnv->offset); tfree(pRuntimeEnv->keyBuf); tfree(pRuntimeEnv->rowCellInfoOffset); + tfree(pRuntimeEnv->prevRow); taosHashCleanup(pRuntimeEnv->pResultRowHashTable); pRuntimeEnv->pResultRowHashTable = NULL; @@ -2281,12 +2514,14 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo * pW // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer if (QUERY_IS_INTERVAL_QUERY(pQuery)) { bool hasTimeWindow = false; + SResultRow* pResult = NULL; + bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.skey:pBlockInfo->window.ekey; STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery); - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pBlockInfo, &win, masterScan, &hasTimeWindow) != + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pBlockInfo, &win, masterScan, &hasTimeWindow, &pResult) != TSDB_CODE_SUCCESS) { // todo handle error in set result for timewindow } @@ -3268,7 +3503,7 @@ static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SWindowR SQuery* pQuery = pRuntimeEnv->pQuery; for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - bool closed = getTimeWindowResStatus(pWindowResInfo, i); + bool closed = getResultRowStatus(pWindowResInfo, i); if (!closed) { continue; } @@ -4619,6 +4854,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery); pRuntimeEnv->hasTagResults = hasTagValOutput(pQuery); + pRuntimeEnv->timeWindowInterpo = timeWindowInterpoRequired(pQuery); setScanLimitationByResultBuffer(pQuery); @@ -6449,9 +6685,11 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou goto _cleanup; } + int32_t srcSize = 0; for (int16_t i = 0; i < numOfCols; ++i) { pQuery->colList[i] = pQueryMsg->colList[i]; pQuery->colList[i].filters = tscFilterInfoClone(pQueryMsg->colList[i].filters, pQuery->colList[i].numOfFilters); + srcSize += pQuery->colList[i].bytes; } // calculate the result row size @@ -6520,6 +6758,14 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQInfo->runtimeEnv.pResultRowHashTable = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pQInfo->runtimeEnv.keyBuf = malloc(TSDB_MAX_BYTES_PER_ROW); pQInfo->runtimeEnv.pool = initResultRowPool(getWindowResultSize(&pQInfo->runtimeEnv)); + pQInfo->runtimeEnv.prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + srcSize); + + char* start = POINTER_BYTES * pQuery->numOfCols + (char*) pQInfo->runtimeEnv.prevRow; + pQInfo->runtimeEnv.prevRow[0] = start; + + for(int32_t i = 1; i < pQuery->numOfCols; ++i) { + pQInfo->runtimeEnv.prevRow[i] = pQInfo->runtimeEnv.prevRow[i - 1] + pQuery->colList[i-1].bytes; + } pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); if (pQInfo->pBuf == NULL) { diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 2b4cdef9b0..d736f2b602 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -389,8 +389,7 @@ uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv) { } SQuery* pQuery = pRuntimeEnv->pQuery; - if ((pQuery->checkBuffer == 1 && pQuery->interval.interval == 0) || isPointInterpoQuery(pQuery) || - pRuntimeEnv->groupbyNormalCol) { + if (pQuery->interval.interval == 0 || isPointInterpoQuery(pQuery) || pRuntimeEnv->groupbyNormalCol) { return 0; } -- GitLab