From 6c90a198e3c1edbce735ae17190ab7c382817af4 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 17 Mar 2020 14:00:02 +0800 Subject: [PATCH] support the interpolation search for interval query --- src/client/src/tscFunctionImpl.c | 203 ++++++++++---- src/inc/tinterpolation.h | 2 + src/inc/tsqlfunction.h | 6 +- src/system/detail/inc/vnodeRead.h | 11 +- src/system/detail/src/vnodeQueryImpl.c | 355 +++++++++++++++++++++---- src/util/src/tinterpolation.c | 43 +++ 6 files changed, 497 insertions(+), 123 deletions(-) diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 9399d188e4..9104a0e0bc 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -20,6 +20,7 @@ #include "thistogram.h" #include "tinterpolation.h" #include "tlog.h" +#include "tpercentile.h" #include "tscJoinProcess.h" #include "tscSyntaxtreefunction.h" #include "tscompression.h" @@ -27,7 +28,6 @@ #include "ttime.h" #include "ttypes.h" #include "tutil.h" -#include "tpercentile.h" #define GET_INPUT_CHAR(x) (((char *)((x)->aInputElemBuf)) + ((x)->startOffset) * ((x)->inputBytes)) #define GET_INPUT_CHAR_INDEX(x, y) (GET_INPUT_CHAR(x) + (y) * (x)->inputBytes) @@ -4104,8 +4104,6 @@ static void twa_function(SQLFunctionCtx *pCtx) { if (pResInfo->superTableQ) { memcpy(pCtx->aOutputBuf, pInfo, sizeof(STwaInfo)); } - - // pCtx->numOfIteratedElems += notNullElems; } static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) { @@ -4138,7 +4136,6 @@ static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) { pInfo->lastKey = primaryKey[index]; setTWALastVal(pCtx, pData, 0, pInfo); - // pCtx->numOfIteratedElems += 1; pResInfo->hasResult = DATA_SET_FLAG; if (pResInfo->superTableQ) { @@ -4403,10 +4400,8 @@ static double do_calc_rate(const SRateInfo* pRateInfo) { } } - int64_t duration = pRateInfo->lastKey - pRateInfo->firstKey; - duration = (duration + 500) / 1000; - - double resultVal = ((double)diff) / duration; + double duration = (pRateInfo->lastKey - pRateInfo->firstKey) / 1000; + double resultVal = diff / duration; pTrace("do_calc_rate() isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%f lastValue:%f CorrectionValue:%f resultVal:%f", pRateInfo->isIRate, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, resultVal); @@ -4447,62 +4442,156 @@ static void rate_function(SQLFunctionCtx *pCtx) { TSKEY *primaryKey = pCtx->ptsList; pTrace("%p rate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull); - - for (int32_t i = 0; i < pCtx->size; ++i) { - char *pData = GET_INPUT_CHAR_INDEX(pCtx, i); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - pTrace("%p rate_function() index of null data:%d", pCtx, i); - continue; + + if (pCtx->order == TSQL_SO_ASC) { + // prev interpolation exists + if (pCtx->prev.key != -1) { + pRateInfo->firstValue = pCtx->prev.data; + pRateInfo->firstKey = pCtx->prev.key; + pCtx->prev.key = -1; // clear the flag } - - notNullElems++; + + for (int32_t i = 0; i < pCtx->size; ++i) { + char *pData = GET_INPUT_CHAR_INDEX(pCtx, i); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { + pTrace("%p rate_function() index of null data:%d", pCtx, i); + continue; + } - double v = 0; - switch (pCtx->inputType) { - case TSDB_DATA_TYPE_TINYINT: - v = (double)GET_INT8_VAL(pData); - break; - case TSDB_DATA_TYPE_SMALLINT: - v = (double)GET_INT16_VAL(pData); - break; - case TSDB_DATA_TYPE_INT: - v = (double)GET_INT32_VAL(pData); - break; - case TSDB_DATA_TYPE_BIGINT: - v = (double)GET_INT64_VAL(pData); - break; - case TSDB_DATA_TYPE_FLOAT: - v = (double)GET_FLOAT_VAL(pData); - break; - case TSDB_DATA_TYPE_DOUBLE: - v = (double)GET_DOUBLE_VAL(pData); - break; - default: - assert(0); + notNullElems++; + + double v = 0; + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: + v = (double)GET_INT8_VAL(pData); + break; + case TSDB_DATA_TYPE_SMALLINT: + v = (double)GET_INT16_VAL(pData); + break; + case TSDB_DATA_TYPE_INT: + v = (double)GET_INT32_VAL(pData); + break; + case TSDB_DATA_TYPE_BIGINT: + v = (double)GET_INT64_VAL(pData); + break; + case TSDB_DATA_TYPE_FLOAT: + v = (double)GET_FLOAT_VAL(pData); + break; + case TSDB_DATA_TYPE_DOUBLE: + v = (double)GET_DOUBLE_VAL(pData); + break; + default: + assert(0); + } + + if ((-DBL_MAX == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) { + pRateInfo->firstValue = v; + pRateInfo->firstKey = primaryKey[i]; + + pTrace("firstValue:%f firstKey:%" PRId64, pRateInfo->firstValue, pRateInfo->firstKey); + } + + if (-DBL_MAX == pRateInfo->lastValue) { + pRateInfo->lastValue = v; + } else if (v < pRateInfo->lastValue) { + pRateInfo->CorrectionValue += pRateInfo->lastValue; + pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue); + } + + pRateInfo->lastValue = v; + pRateInfo->lastKey = primaryKey[i]; + pTrace("lastValue:%f lastKey:%" PRId64, pRateInfo->lastValue, pRateInfo->lastKey); } - - if ((-DBL_MAX == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) { - pRateInfo->firstValue = v; - pRateInfo->firstKey = primaryKey[i]; + + if (!pCtx->hasNull) { + assert(pCtx->size == notNullElems); + } + + if (pCtx->next.key != -1) { + if (pCtx->next.data < pRateInfo->lastValue) { + pRateInfo->CorrectionValue += pRateInfo->lastValue; + pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue); + } + + pRateInfo->lastValue = pCtx->next.data; + pRateInfo->lastKey = pCtx->next.key; + pCtx->next.key = -1; + } + } else { + if (pCtx->next.key != -1) { + pRateInfo->lastValue = pCtx->next.data; + pRateInfo->lastKey = pCtx->next.key; + pCtx->next.key = -1; + } + + for (int32_t i = pCtx->size - 1; i >= 0; --i) { + char *pData = GET_INPUT_CHAR_INDEX(pCtx, i); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { + pTrace("%p rate_function() index of null data:%d", pCtx, i); + continue; + } + + notNullElems++; + + double v = 0; + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: + v = (double)GET_INT8_VAL(pData); + break; + case TSDB_DATA_TYPE_SMALLINT: + v = (double)GET_INT16_VAL(pData); + break; + case TSDB_DATA_TYPE_INT: + v = (double)GET_INT32_VAL(pData); + break; + case TSDB_DATA_TYPE_BIGINT: + v = (double)GET_INT64_VAL(pData); + break; + case TSDB_DATA_TYPE_FLOAT: + v = (double)GET_FLOAT_VAL(pData); + break; + case TSDB_DATA_TYPE_DOUBLE: + v = (double)GET_DOUBLE_VAL(pData); + break; + default: + assert(0); + } + + if ((-DBL_MAX == pRateInfo->lastValue) || (INT64_MIN == pRateInfo->lastKey)) { + pRateInfo->lastValue = v; + pRateInfo->lastKey = primaryKey[i]; + pTrace("firstValue:%f firstKey:%" PRId64, pRateInfo->lastValue, pRateInfo->lastKey); + } + + if (-DBL_MAX == pRateInfo->firstValue) { + pRateInfo->firstValue = v; + } else if (v > pRateInfo->firstValue) { + pRateInfo->CorrectionValue += pRateInfo->firstValue; + pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue); + } + + pRateInfo->firstValue = v; + pRateInfo->firstKey = primaryKey[i]; pTrace("firstValue:%f firstKey:%" PRId64, pRateInfo->firstValue, pRateInfo->firstKey); } - - if (-DBL_MAX == pRateInfo->lastValue) { - pRateInfo->lastValue = v; - } else if (v < pRateInfo->lastValue) { - pRateInfo->CorrectionValue += pRateInfo->lastValue; - pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue); + + if (!pCtx->hasNull) { + assert(pCtx->size == notNullElems); } - - pRateInfo->lastValue = v; - pRateInfo->lastKey = primaryKey[i]; - pTrace("lastValue:%f lastKey:%" PRId64, pRateInfo->lastValue, pRateInfo->lastKey); - } - - if (!pCtx->hasNull) { - assert(pCtx->size == notNullElems); - } + + if (pCtx->prev.key != -1) { + if (pCtx->prev.data > pRateInfo->firstValue) { + pRateInfo->CorrectionValue += pRateInfo->firstValue; + pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue); + } + + pRateInfo->firstValue = pCtx->prev.data; + pRateInfo->firstKey = pCtx->prev.key; + pCtx->prev.key = -1; + } + + }; SET_VAL(pCtx, notNullElems, 1); diff --git a/src/inc/tinterpolation.h b/src/inc/tinterpolation.h index 77141e14af..6c0f6f921e 100644 --- a/src/inc/tinterpolation.h +++ b/src/inc/tinterpolation.h @@ -83,6 +83,8 @@ int32_t taosDoInterpoResult(SInterpolationInfo *pInterpoInfo, int16_t interpoTyp int taosDoLinearInterpolation(int32_t type, SPoint *point1, SPoint *point2, SPoint *point); +int taosDoLinearInterpolationD(int32_t type, SPoint* point1, SPoint* point2, SPoint* point); + #ifdef __cplusplus } #endif diff --git a/src/inc/tsqlfunction.h b/src/inc/tsqlfunction.h index 9bc0c1889d..b2e53a931b 100644 --- a/src/inc/tsqlfunction.h +++ b/src/inc/tsqlfunction.h @@ -169,7 +169,7 @@ typedef struct SExtTagsInfo { typedef struct SBoundaryData { TSKEY key; - char* data; + double data; } SBoundaryData; // sql function runtime context @@ -200,8 +200,8 @@ typedef struct SQLFunctionCtx { SResultInfo *resultInfo; SExtTagsInfo tagInfo; - SBoundaryData beforeRow; // this value may be less or equalled to the start time of time window - SBoundaryData afterRow; // this value may be greater or equalled to the end time of time window + SBoundaryData prev; // this value may be less or equalled to the start time of time window + SBoundaryData next; // this value may be greater or equalled to the end time of time window } SQLFunctionCtx; typedef struct SQLAggFuncElem { diff --git a/src/system/detail/inc/vnodeRead.h b/src/system/detail/inc/vnodeRead.h index ef3b6b6e04..0b636db110 100644 --- a/src/system/detail/inc/vnodeRead.h +++ b/src/system/detail/inc/vnodeRead.h @@ -172,15 +172,16 @@ typedef struct SQueryRuntimeEnv { SWindowResInfo windowResInfo; - // require time stamp that are direct before/after query time window - bool boundaryExternalTS; - STSBuf* pTSBuf; STSCursor cur; SQueryCostSummary summary; bool stableQuery; // is super table query or not SQueryDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file + bool hasTimeWindow; + char** lastRowInBlock; + bool interpoSearch; + /* * Temporarily hold the in-memory cache block info during scan cache blocks * Here we do not use the cache block info from pMeterObj, simple because it may change anytime @@ -188,10 +189,6 @@ typedef struct SQueryRuntimeEnv { * So we keep a copy of the support structure as well as the cache block data itself. */ SCacheBlock cacheBlock; - - SPointInterpoSupporter* pInterpoSupporter; - bool hasTimeWindow; - bool interpoSearch; } SQueryRuntimeEnv; /* intermediate pos during multimeter query involves interval */ diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index a5df289449..dab0397959 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -1754,7 +1754,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat pCtx[k].size = forwardStep; pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? startPos : startPos - (forwardStep - 1); - if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) { + if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0 || functionId == TSDB_FUNC_RATE) { pCtx[k].ptsList = (TSKEY *)((char*)pRuntimeEnv->primaryColBuffer->data + pCtx[k].startOffset * TSDB_KEYSIZE); } @@ -1841,6 +1841,31 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow } } +static void handleInBlockEkeyInterpolation(SQueryRuntimeEnv* pRuntimeEnv, int32_t endPos, + const TSKEY* primaryKeyCol, STimeWindow* win, SQLFunctionCtx* pCtx) { + // this query time window ended in the current data block + SQuery* pQuery = pRuntimeEnv->pQuery; + TSKEY lastKey = primaryKeyCol[endPos]; + + TSKEY e = win->skey + pQuery->intervalTime; + TSKEY next = primaryKeyCol[endPos + 1]; + + // the next key is beyond the query time range + if ((next > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (next > pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))) { + pCtx->next.key = -1; + return; + } + + pCtx->next.key = e; + char *d = pCtx->aInputElemBuf + pCtx->inputBytes * endPos; + + SPoint point1 = (SPoint){.key = lastKey, .val = d}; + SPoint point2 = (SPoint){.key = next, .val = (d + pCtx->inputBytes)}; + SPoint point = (SPoint){.key = pCtx->next.key, .val = &pCtx->next.data}; + + taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point); +} + static TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) { TSKEY ekey = -1; if (QUERY_IS_ASC_QUERY(pQuery)) { @@ -1858,6 +1883,208 @@ static TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) { return ekey; } +static void interpolateEndKeyValue(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* pBlockInfo, STimeWindow* win, + int32_t endPos, SQLFunctionCtx* pCtx, int32_t index) { + SQuery* pQuery = pRuntimeEnv->pQuery; + + TSKEY *primaryKeyCol = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; + + // if current query window beyonds the whole query window, do not employ the interpolation + if ((win->ekey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (win->ekey >= pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))) { + pCtx->next.key = -1; + return; + } + + if (!QUERY_IS_ASC_QUERY(pQuery) && win->skey >= pBlockInfo->keyLast) { + pCtx->next.key = -1; + return; + } + + if (QUERY_IS_ASC_QUERY(pQuery)) { + + /* + * the time window closed before current data block, use the interpolation to generate + * the final result part, endPos equals to -1 means that this time window ends before current data block. + */ + if (win->ekey < pBlockInfo->keyFirst) { + assert(endPos == -1); + + TSKEY prev = *(int64_t*) pRuntimeEnv->lastRowInBlock[0]; + TSKEY next = pBlockInfo->keyFirst; + + pCtx->next.key = win->skey + pQuery->intervalTime; + + char *d = pCtx->aInputElemBuf; + + SPoint point1 = (SPoint){.key = prev, .val = pRuntimeEnv->lastRowInBlock[index]}; + SPoint point2 = (SPoint){.key = next, .val = d}; + SPoint point = (SPoint){.key = pCtx->next.key, .val = &pCtx->next.data}; + + taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point); + } else if (win->ekey < pBlockInfo->keyLast) { + handleInBlockEkeyInterpolation(pRuntimeEnv, endPos, primaryKeyCol, win, pCtx); + } else { + //do nothing now, the interpolation will be handled before processing the next data block + assert(win->ekey >= pBlockInfo->keyLast); + pCtx->next.key = -1; + } + } else { // desc order query + + //the time window closed before current data block, use the interpolation to generate the final result part. + if (win->ekey >= pBlockInfo->keyLast) { + TSKEY prev = pBlockInfo->keyLast; + TSKEY next = *(TSKEY*) pRuntimeEnv->lastRowInBlock[0]; + + pCtx->next.key = win->skey + pQuery->intervalTime; + + char *d = pCtx->aInputElemBuf + (pBlockInfo->size - 1) * pCtx->inputBytes; + + SPoint point1 = (SPoint){.key = prev, .val = d}; + SPoint point2 = (SPoint){.key = next, .val = pRuntimeEnv->lastRowInBlock[index]}; + SPoint point = (SPoint){.key = pCtx->next.key, .val = &pCtx->next.data}; + + taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point); + } else if (win->ekey < pBlockInfo->keyLast) { + handleInBlockEkeyInterpolation(pRuntimeEnv, endPos, primaryKeyCol, win, pCtx); + } else { + pCtx->next.key = -1; + } + } + +} + +static void handleInBlockSkeyInterpolation (SQueryRuntimeEnv* pRuntimeEnv, int32_t startPos, + const TSKEY* primaryKeyCol, STimeWindow* win, SQLFunctionCtx* pCtx) { + assert(startPos > 0); + + SQuery* pQuery = pRuntimeEnv->pQuery; + + TSKEY prev = primaryKeyCol[startPos - 1]; + TSKEY next = primaryKeyCol[startPos]; + + if (!QUERY_IS_ASC_QUERY(pQuery) && prev < pQuery->ekey) { + pCtx->prev.key = -1; + return; + } + + pCtx->prev.key = win->skey; + char *d = pCtx->aInputElemBuf + pCtx->inputBytes * (startPos - 1); + + SPoint point1 = (SPoint){.key = prev, .val = d}; + SPoint point2 = (SPoint){.key = next, .val = (d + pCtx->inputBytes)}; + SPoint point = (SPoint){.key = pCtx->prev.key, .val = &pCtx->prev.data}; + + taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point); +} + +static void interpolateStartKeyValue(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* pBlockInfo, SWindowResInfo* pWindowResInfo, + STimeWindow* win, int32_t startPos, SQLFunctionCtx* pCtx, int32_t index) { + SQuery* pQuery = pRuntimeEnv->pQuery; + TSKEY *primaryKeyCol = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; + TSKEY skey = primaryKeyCol[startPos]; + + /* + * no need the start time interpolation + * 1. current window is the first window in either ascending or descending order output + * 2. time window start exactly from a timestamp with data + */ + if (skey == win->skey || win->skey < pWindowResInfo->startTime || + (win->skey < pQuery->skey && QUERY_IS_ASC_QUERY(pQuery)) || + (win->skey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { + pCtx->prev.key = -1; + return; + } + + if (QUERY_IS_ASC_QUERY(pQuery)) { + // the queried time window and time window of data block must be intersect + assert(win->ekey >= pBlockInfo->keyFirst && win->skey <= pBlockInfo->keyLast); + + /* + * this win should not be the first time window that starts from a less timestamp than + * the skey of current data block + */ + if (win->skey < pBlockInfo->keyFirst) { + TSKEY prev = *(TSKEY*) pRuntimeEnv->lastRowInBlock[0]; + TSKEY next = pBlockInfo->keyFirst; + + pCtx->prev.key = win->skey; + char *d = pCtx->aInputElemBuf; + + SPoint point1 = (SPoint){.key = prev, .val = pRuntimeEnv->lastRowInBlock[index]}; + SPoint point2 = (SPoint){.key = next, .val = d}; + SPoint point = (SPoint){.key = pCtx->prev.key, .val = &pCtx->prev.data}; + + taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point); + } else { + handleInBlockSkeyInterpolation(pRuntimeEnv, startPos, primaryKeyCol, win, pCtx); + } + } else { // desc order + if (win->skey > pBlockInfo->keyLast) { + //this pBlockInfo located before current time window + TSKEY prev = pBlockInfo->keyLast; + TSKEY next = *(TSKEY*) pRuntimeEnv->lastRowInBlock[0]; + + pCtx->prev.key = win->skey; + char *d = pCtx->aInputElemBuf + (pBlockInfo->size - 1) * pCtx->inputBytes; + + SPoint point1 = (SPoint){.key = prev, .val = d}; + SPoint point2 = (SPoint){.key = next, .val = pRuntimeEnv->lastRowInBlock[index]}; + SPoint point = (SPoint){.key = pCtx->prev.key, .val = &pCtx->prev.data}; + + taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point); + } else { + // the queried time window and time window of data block must be intersect + assert(win->ekey >= pBlockInfo->keyFirst && win->skey <= pBlockInfo->keyLast); + + if (win->skey >= pBlockInfo->keyFirst) { + // the pBlockInfo is intersected with query time window + handleInBlockSkeyInterpolation(pRuntimeEnv, startPos, primaryKeyCol, win, pCtx); + } else { + assert(win->skey < pBlockInfo->keyFirst && win->ekey >= pBlockInfo->keyFirst); + pCtx->prev.key = -1; + } + } + } +} + +static void doSetInterpolationDataForTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo *pWindowResInfo, + SBlockInfo* pBlockInfo, STimeWindow* win, int32_t startPos, int32_t forwardStep) { + + SQuery* pQuery = pRuntimeEnv->pQuery; + TSKEY* primaryKeyCol = (TSKEY*) pRuntimeEnv->primaryColBuffer->data; + + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + + if (pRuntimeEnv->interpoSearch) { + int32_t s = startPos; + int32_t e = forwardStep * step + startPos - step; + + if (!QUERY_IS_ASC_QUERY(pQuery)) { + SWAP(s, e, int32_t); + } + + for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo; + + interpolateEndKeyValue(pRuntimeEnv, pBlockInfo, win, e, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf); + } + + // the first time window, do not employ the interpolation + if (primaryKeyCol[s] == pWindowResInfo->startTime) { + for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + pRuntimeEnv->pCtx[i].prev.key = -1; + } + } else { + for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo; + + interpolateStartKeyValue(pRuntimeEnv, pBlockInfo, pWindowResInfo, win, s, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf); + } + } + } +} + /** * * @param pRuntimeEnv @@ -1906,26 +2133,56 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t setExecParams(pRuntimeEnv, &pCtx[k], pQuery->skey, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField, hasNull, &sasArray[k], pBlockInfo, index); } + + for(int32_t i = 0; i < pQuery->numOfCols; ++i) { + SColumnInfo* pColInfo = &pQuery->colList[i].data; + int32_t s = (QUERY_IS_ASC_QUERY(pQuery))? pColInfo->bytes * (pBlockInfo->size - 1) : 0; + + memcpy(pRuntimeEnv->lastRowInBlock[i], pRuntimeEnv->colDataBuffer[i]->data + s, pColInfo->bytes); + } int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); if (isIntervalQuery(pQuery)) { int32_t offset = GET_COL_DATA_POS(pQuery, 0, step); TSKEY ts = primaryKeyCol[offset]; - + STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); + + // get current not closed time window + int32_t slot = pWindowResInfo->curIndex; + if (slot != -1) { + STimeWindow w = getWindowResult(pWindowResInfo, slot)->window; + + // if current window is closed and locates before current active block, interpolate the result and close it + if (w.skey != win.skey || w.ekey != win.ekey) { + assert((w.ekey < win.skey && w.ekey < ts && QUERY_IS_ASC_QUERY(pQuery)) || + (w.skey > win.ekey && w.skey > ts && !QUERY_IS_ASC_QUERY(pQuery))); + + forwardStep = 0; + doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &w, offset, forwardStep); + + SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); + doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &w, pQuery->pos, forwardStep); + + closeTimeWindow(pWindowResInfo, curTimeWindow(pWindowResInfo)); + } + } + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win) != TSDB_CODE_SUCCESS) { return 0; } TSKEY ekey = reviseWindowEkey(pQuery, &win); forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, true); - + + doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &win, offset, forwardStep); + SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep); - int32_t index = pWindowResInfo->curIndex; + int32_t index = pWindowResInfo->curIndex; + STimeWindow nextWin = win; - while (1) { int32_t startPos = getNextQualifiedWindow(pRuntimeEnv, &nextWin, pWindowResInfo, pBlockInfo, primaryKeyCol, searchFn); @@ -1941,7 +2198,9 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t ekey = reviseWindowEkey(pQuery, &nextWin); forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, startPos, ekey, searchFn, true); - + + doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &nextWin, startPos, forwardStep); + pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep); } @@ -1955,6 +2214,9 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t */ for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; + pCtx[k].next.key = -1; + pCtx[k].prev.key = -1; + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { aAggs[functionId].xFunction(&pCtx[k]); } @@ -2689,35 +2951,6 @@ void setExecParams(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int64_t int32_t scanFlag = pRuntimeEnv->scanFlag; int32_t blockStatus = pRuntimeEnv->blockStatus; - int64_t* tsArray = (int64_t*)primaryColumnData; - - if (needsBoundaryTS(pQuery)) { - SPointInterpoSupporter* pSupporter = pRuntimeEnv->pInterpoSupporter; - if (pRuntimeEnv->hasTimeWindow) { - if (startQueryTimestamp != tsArray[startOffset]) { - assert(startQueryTimestamp <= tsArray[startOffset]); - pCtx->beforeRow = (SBoundaryData){*(int64_t*)pSupporter->pPrevPoint[0], pSupporter->pPrevPoint[index]}; - } else { - pCtx->beforeRow.key = -1; - } - - if (startOffset + size < pBlockInfo->size) { - if (pQuery->ekey < tsArray[startOffset + size]) { - getOneRowFromDataBlock(pRuntimeEnv, pRuntimeEnv->pInterpoSupporter->pNextPoint, startOffset + size); - pCtx->afterRow = (SBoundaryData){*(int64_t*)pSupporter->pNextPoint[0], pSupporter->pNextPoint[index]}; - } else { - pCtx->afterRow.key = -1; - } - } else {// current query ekey is greater than current data block, do not set the after row value - pCtx->afterRow.key = -1; - } - - } else { - pCtx->beforeRow.key = -1; - pCtx->afterRow.key = -1; - } - } - pCtx->nStartQueryTimestamp = startQueryTimestamp; pCtx->scanFlag = scanFlag; @@ -3487,10 +3720,8 @@ static bool getNeighborPoints(STableQuerySupportObj *pSupporter, SMeterObj *pMet } else { assert(QUERY_IS_ASC_QUERY(pQuery)); } + assert(pPointInterpSupporter != NULL && pQuery->skey == pQuery->ekey); - - SCacheBlock *pBlock = NULL; - qTrace("QInfo:%p get next data point, fileId:%d, slot:%d, pos:%d", GET_QINFO_ADDR(pQuery), pQuery->fileId, pQuery->slot, pQuery->pos); @@ -3697,13 +3928,11 @@ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, STableQuerySup } // needs the data before the begin timestamp of query time window - if ((*key) != pQuery->skey && needsBoundaryTS(pQuery)) { + if ((*key) != pQuery->skey) { if (!pRuntimeEnv->hasTimeWindow) { - pQuery->skey = nextKey; // change the query skey - return true; - } else { - return loadPrevDataPoint(pRuntimeEnv, pPointInterpSupporter->pPrevPoint); + pQuery->skey = nextKey; // change the query skey } + return true; } else { return doGetQueryPos(nextKey, pSupporter, pPointInterpSupporter); } @@ -4560,12 +4789,15 @@ static int32_t allocateRuntimeEnvBuf(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *p SQuery *pQuery = pRuntimeEnv->pQuery; // To make sure the start position of each buffer is aligned to 4bytes in 32-bit ARM system. + pRuntimeEnv->lastRowInBlock = calloc(pQuery->numOfCols, POINTER_BYTES); for (int32_t i = 0; i < pQuery->numOfCols; ++i) { int32_t bytes = pQuery->colList[i].data.bytes; pRuntimeEnv->colDataBuffer[i] = calloc(1, sizeof(SData) + EXTRA_BYTES + pMeterObj->pointsPerFileBlock * bytes); if (pRuntimeEnv->colDataBuffer[i] == NULL) { goto _error_clean; } + + pRuntimeEnv->lastRowInBlock[i] = calloc(1, bytes); } // record the maximum column width among columns of this meter/metric @@ -4679,9 +4911,9 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pMeterObj = pMeterObj; - pRuntimeEnv->boundaryExternalTS = needsBoundaryTS(pQuery); pRuntimeEnv->hasTimeWindow = !notHasQueryTimeRange(pQuery); - + pRuntimeEnv->interpoSearch = needsBoundaryTS(pQuery); + if ((code = allocateRuntimeEnvBuf(pRuntimeEnv, pMeterObj)) != TSDB_CODE_SUCCESS) { return code; } @@ -4736,9 +4968,9 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery /* query on single table */ pSupporter->numOfMeters = 1; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - - pRuntimeEnv->pInterpoSupporter = calloc(1, sizeof(SPointInterpoSupporter)); - pointInterpSupporterInit(pQuery, pRuntimeEnv->pInterpoSupporter); + + SPointInterpoSupporter interpoSupporter = {0}; + pointInterpSupporterInit(pQuery, &interpoSupporter); /* * in case of last_row query without query range, we set the query timestamp to @@ -4746,11 +4978,11 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery */ if (isFirstLastRowQuery(pQuery) && notHasQueryTimeRange(pQuery)) { - if (!normalizeUnBoundLastRowQuery(pSupporter, pRuntimeEnv->pInterpoSupporter)) { + if (!normalizeUnBoundLastRowQuery(pSupporter, &interpoSupporter)) { sem_post(&pQInfo->dataReady); pQInfo->over = 1; -// pointInterpSupporterDestroy(pRuntimeEnv->pInterpoSupporter); + pointInterpSupporterDestroy(&interpoSupporter); return TSDB_CODE_SUCCESS; } } else { // find the skey and ekey in case of sliding query @@ -4764,23 +4996,34 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery } int64_t skey = 0; - if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, pRuntimeEnv->pInterpoSupporter, &skey) == false) || + if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpoSupporter, &skey) == false) || (isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) || (isTopBottomQuery(pQuery) && pQuery->limit.offset >= pQuery->pSelectExpr[1].pBase.arg[0].argValue.i64)) { sem_post(&pQInfo->dataReady); pQInfo->over = 1; -// pointInterpSupporterDestroy(&interpInfo); + pointInterpSupporterDestroy(&interpoSupporter); return TSDB_CODE_SUCCESS; } + pQuery->skey = skey; if (!QUERY_IS_ASC_QUERY(pQuery)) { win.skey = minKey; win.ekey = skey; + pQuery->ekey = minKey; } else { win.skey = skey; win.ekey = pQuery->ekey; } + + // empty result + if (QUERY_IS_ASC_QUERY(pQuery) && win.skey > win.ekey) { + sem_post(&pQInfo->dataReady); + pQInfo->over = 1; + + pointInterpSupporterDestroy(&interpoSupporter); + return TSDB_CODE_SUCCESS; + } TSKEY skey1, ekey1; TSKEY windowSKey = 0, windowEKey = 0; @@ -4799,13 +5042,13 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery pQuery->over = QUERY_NOT_COMPLETED; } else { int64_t ekey = 0; - if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, pRuntimeEnv->pInterpoSupporter, &ekey) == false) || + if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpoSupporter, &ekey) == false) || (isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) || (isTopBottomQuery(pQuery) && pQuery->limit.offset >= pQuery->pSelectExpr[1].pBase.arg[0].argValue.i64)) { sem_post(&pQInfo->dataReady); pQInfo->over = 1; -// pointInterpSupporterDestroy(&interpInfo); + pointInterpSupporterDestroy(&interpoSupporter); return TSDB_CODE_SUCCESS; } } @@ -4815,8 +5058,8 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery * here we set the value for before and after the specified time into the * parameter for interpolation query */ - pointInterpSupporterSetData(pQInfo, pRuntimeEnv->pInterpoSupporter); -// pointInterpSupporterDestroy(&interpInfo); + pointInterpSupporterSetData(pQInfo, &interpoSupporter); + pointInterpSupporterDestroy(&interpoSupporter); if (!forwardQueryStartPosIfNeeded(pQInfo, pSupporter, dataInDisk, dataInCache)) { return TSDB_CODE_SUCCESS; diff --git a/src/util/src/tinterpolation.c b/src/util/src/tinterpolation.c index cb7c8854ce..8c1abd3328 100644 --- a/src/util/src/tinterpolation.c +++ b/src/util/src/tinterpolation.c @@ -191,6 +191,49 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi return 0; } +int taosDoLinearInterpolationD(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) { + switch (type) { + case TSDB_DATA_TYPE_INT: { + *(double*) point->val = doLinearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, point1->key, + point2->key, point->key); + break; + } + case TSDB_DATA_TYPE_FLOAT: { + *(double*)point->val = + doLinearInterpolationImpl(*(float*)point1->val, *(float*)point2->val, point1->key, point2->key, point->key); + break; + }; + case TSDB_DATA_TYPE_DOUBLE: { + *(double*)point->val = + doLinearInterpolationImpl(*(double*)point1->val, *(double*)point2->val, point1->key, point2->key, point->key); + break; + }; + case TSDB_DATA_TYPE_TIMESTAMP: + case TSDB_DATA_TYPE_BIGINT: { + *(double*)point->val = doLinearInterpolationImpl(*(int64_t*)point1->val, *(int64_t*)point2->val, point1->key, + point2->key, point->key); + break; + }; + case TSDB_DATA_TYPE_SMALLINT: { + *(double*)point->val = doLinearInterpolationImpl(*(int16_t*)point1->val, *(int16_t*)point2->val, point1->key, + point2->key, point->key); + break; + }; + case TSDB_DATA_TYPE_TINYINT: { + *(double*)point->val = + doLinearInterpolationImpl(*(int8_t*)point1->val, *(int8_t*)point2->val, point1->key, point2->key, point->key); + break; + }; + default: { + // TODO: Deal with interpolation with bool and strings and timestamp + return -1; + } + } + + return 0; +} + + static char* getPos(char* data, int32_t bytes, int32_t index) { return data + index * bytes; } static void setTagsValueInInterpolation(tFilePage** data, char** pTags, SColumnModel* pModel, int32_t order, -- GitLab