diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 3e4da10aece9d3f928b0c59663a0110882eb9ed2..f931bd472992a22ed7de6e99c1545495a821c620 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -197,7 +197,7 @@ typedef struct SDataBlockList { typedef struct SQueryInfo { int16_t command; // the command may be different for each subclause, so keep it seperately. uint16_t type; // query/insert/import type - char intervalTimeUnit; + char slidingTimeUnit; int64_t etime, stime; int64_t intervalTime; // aggregation time interval diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 9399d188e49aced0d3532fee3ef9a720e4672f61..6892e1a6a790ce47c188644c77946e62613ca49f 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -20,6 +20,7 @@ #include "thistogram.h" #include "tinterpolation.h" #include "tlog.h" +#include "tpercentile.h" #include "tscJoinProcess.h" #include "tscSyntaxtreefunction.h" #include "tscompression.h" @@ -27,7 +28,6 @@ #include "ttime.h" #include "ttypes.h" #include "tutil.h" -#include "tpercentile.h" #define GET_INPUT_CHAR(x) (((char *)((x)->aInputElemBuf)) + ((x)->startOffset) * ((x)->inputBytes)) #define GET_INPUT_CHAR_INDEX(x, y) (GET_INPUT_CHAR(x) + (y) * (x)->inputBytes) @@ -4104,8 +4104,6 @@ static void twa_function(SQLFunctionCtx *pCtx) { if (pResInfo->superTableQ) { memcpy(pCtx->aOutputBuf, pInfo, sizeof(STwaInfo)); } - - // pCtx->numOfIteratedElems += notNullElems; } static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) { @@ -4138,7 +4136,6 @@ static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) { pInfo->lastKey = primaryKey[index]; setTWALastVal(pCtx, pData, 0, pInfo); - // pCtx->numOfIteratedElems += 1; pResInfo->hasResult = DATA_SET_FLAG; if (pResInfo->superTableQ) { @@ -4403,10 +4400,8 @@ static double do_calc_rate(const SRateInfo* pRateInfo) { } } - int64_t duration = pRateInfo->lastKey - pRateInfo->firstKey; - duration = (duration + 500) / 1000; - - double resultVal = ((double)diff) / duration; + double duration = (pRateInfo->lastKey - pRateInfo->firstKey) / 1000.0; + double resultVal = diff / duration; pTrace("do_calc_rate() isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%f lastValue:%f CorrectionValue:%f resultVal:%f", pRateInfo->isIRate, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, resultVal); @@ -4447,62 +4442,156 @@ static void rate_function(SQLFunctionCtx *pCtx) { TSKEY *primaryKey = pCtx->ptsList; pTrace("%p rate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull); - - for (int32_t i = 0; i < pCtx->size; ++i) { - char *pData = GET_INPUT_CHAR_INDEX(pCtx, i); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - pTrace("%p rate_function() index of null data:%d", pCtx, i); - continue; + + if (pCtx->order == TSQL_SO_ASC) { + // prev interpolation exists + if (pCtx->prev.key != -1) { + pRateInfo->firstValue = pCtx->prev.data; + pRateInfo->firstKey = pCtx->prev.key; + pCtx->prev.key = -1; // clear the flag } - - notNullElems++; + + for (int32_t i = 0; i < pCtx->size; ++i) { + char *pData = GET_INPUT_CHAR_INDEX(pCtx, i); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { + pTrace("%p rate_function() index of null data:%d", pCtx, i); + continue; + } - double v = 0; - switch (pCtx->inputType) { - case TSDB_DATA_TYPE_TINYINT: - v = (double)GET_INT8_VAL(pData); - break; - case TSDB_DATA_TYPE_SMALLINT: - v = (double)GET_INT16_VAL(pData); - break; - case TSDB_DATA_TYPE_INT: - v = (double)GET_INT32_VAL(pData); - break; - case TSDB_DATA_TYPE_BIGINT: - v = (double)GET_INT64_VAL(pData); - break; - case TSDB_DATA_TYPE_FLOAT: - v = (double)GET_FLOAT_VAL(pData); - break; - case TSDB_DATA_TYPE_DOUBLE: - v = (double)GET_DOUBLE_VAL(pData); - break; - default: - assert(0); + notNullElems++; + + double v = 0; + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: + v = (double)GET_INT8_VAL(pData); + break; + case TSDB_DATA_TYPE_SMALLINT: + v = (double)GET_INT16_VAL(pData); + break; + case TSDB_DATA_TYPE_INT: + v = (double)GET_INT32_VAL(pData); + break; + case TSDB_DATA_TYPE_BIGINT: + v = (double)GET_INT64_VAL(pData); + break; + case TSDB_DATA_TYPE_FLOAT: + v = (double)GET_FLOAT_VAL(pData); + break; + case TSDB_DATA_TYPE_DOUBLE: + v = (double)GET_DOUBLE_VAL(pData); + break; + default: + assert(0); + } + + if ((-DBL_MAX == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) { + pRateInfo->firstValue = v; + pRateInfo->firstKey = primaryKey[i]; + + pTrace("firstValue:%f firstKey:%" PRId64, pRateInfo->firstValue, pRateInfo->firstKey); + } + + if (-DBL_MAX == pRateInfo->lastValue) { + pRateInfo->lastValue = v; + } else if (v < pRateInfo->lastValue) { + pRateInfo->CorrectionValue += pRateInfo->lastValue; + pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue); + } + + pRateInfo->lastValue = v; + pRateInfo->lastKey = primaryKey[i]; + pTrace("lastValue:%f lastKey:%" PRId64, pRateInfo->lastValue, pRateInfo->lastKey); } - - if ((-DBL_MAX == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) { - pRateInfo->firstValue = v; - pRateInfo->firstKey = primaryKey[i]; + + if (!pCtx->hasNull) { + assert(pCtx->size == notNullElems); + } + + if (pCtx->next.key != -1) { + if (pCtx->next.data < pRateInfo->lastValue) { + pRateInfo->CorrectionValue += pRateInfo->lastValue; + pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue); + } + + pRateInfo->lastValue = pCtx->next.data; + pRateInfo->lastKey = pCtx->next.key; + pCtx->next.key = -1; + } + } else { + if (pCtx->next.key != -1) { + pRateInfo->lastValue = pCtx->next.data; + pRateInfo->lastKey = pCtx->next.key; + pCtx->next.key = -1; + } + + for (int32_t i = pCtx->size - 1; i >= 0; --i) { + char *pData = GET_INPUT_CHAR_INDEX(pCtx, i); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { + pTrace("%p rate_function() index of null data:%d", pCtx, i); + continue; + } + + notNullElems++; + + double v = 0; + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: + v = (double)GET_INT8_VAL(pData); + break; + case TSDB_DATA_TYPE_SMALLINT: + v = (double)GET_INT16_VAL(pData); + break; + case TSDB_DATA_TYPE_INT: + v = (double)GET_INT32_VAL(pData); + break; + case TSDB_DATA_TYPE_BIGINT: + v = (double)GET_INT64_VAL(pData); + break; + case TSDB_DATA_TYPE_FLOAT: + v = (double)GET_FLOAT_VAL(pData); + break; + case TSDB_DATA_TYPE_DOUBLE: + v = (double)GET_DOUBLE_VAL(pData); + break; + default: + assert(0); + } + + if ((-DBL_MAX == pRateInfo->lastValue) || (INT64_MIN == pRateInfo->lastKey)) { + pRateInfo->lastValue = v; + pRateInfo->lastKey = primaryKey[i]; + pTrace("firstValue:%f firstKey:%" PRId64, pRateInfo->lastValue, pRateInfo->lastKey); + } + + if (-DBL_MAX == pRateInfo->firstValue) { + pRateInfo->firstValue = v; + } else if (v > pRateInfo->firstValue) { + pRateInfo->CorrectionValue += pRateInfo->firstValue; + pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue); + } + + pRateInfo->firstValue = v; + pRateInfo->firstKey = primaryKey[i]; pTrace("firstValue:%f firstKey:%" PRId64, pRateInfo->firstValue, pRateInfo->firstKey); } - - if (-DBL_MAX == pRateInfo->lastValue) { - pRateInfo->lastValue = v; - } else if (v < pRateInfo->lastValue) { - pRateInfo->CorrectionValue += pRateInfo->lastValue; - pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue); + + if (!pCtx->hasNull) { + assert(pCtx->size == notNullElems); } - - pRateInfo->lastValue = v; - pRateInfo->lastKey = primaryKey[i]; - pTrace("lastValue:%f lastKey:%" PRId64, pRateInfo->lastValue, pRateInfo->lastKey); - } - - if (!pCtx->hasNull) { - assert(pCtx->size == notNullElems); - } + + if (pCtx->prev.key != -1) { + if (pCtx->prev.data > pRateInfo->firstValue) { + pRateInfo->CorrectionValue += pRateInfo->firstValue; + pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue); + } + + pRateInfo->firstValue = pCtx->prev.data; + pRateInfo->firstKey = pCtx->prev.key; + pCtx->prev.key = -1; + } + + }; SET_VAL(pCtx, notNullElems, 1); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 3b4fc0240cf854eb72e5683db311c58a1e220d96..05e1f16957687bb1236c3b23b4397be8f118b336 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -598,9 +598,6 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { pQueryInfo->intervalTime = pQueryInfo->intervalTime / 1000; } - /* parser has filter the illegal type, no need to check here */ - pQueryInfo->intervalTimeUnit = pQuerySql->interval.z[pQuerySql->interval.n - 1]; - // interval cannot be less than 10 milliseconds if (pQueryInfo->intervalTime < tsMinIntervalTime) { return invalidSqlErrMsg(pQueryInfo->msg, msg2); @@ -689,10 +686,15 @@ int32_t parseSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { if (pQueryInfo->slidingTime > pQueryInfo->intervalTime) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); } + + pQueryInfo->slidingTimeUnit = pQuerySql->sliding.z[pQuerySql->sliding.n - 1]; } else { pQueryInfo->slidingTime = pQueryInfo->intervalTime; + + // parser has filter the illegal type, no need to check here + pQueryInfo->slidingTimeUnit = pQuerySql->interval.z[pQuerySql->interval.n - 1]; } - + return TSDB_CODE_SUCCESS; } @@ -1636,13 +1638,16 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt // set the first column ts for diff query if (optr == TK_DIFF) { colIdx += 1; - SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0}; + SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, TSDB_KEYSIZE); SColumnList ids = getColumnList(1, 0, 0); insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].aName, pExpr); + } else if (optr == TK_RATE) { + SColumnIndex index1 = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; + tscColumnBaseInfoInsert(pQueryInfo, &index1); } // functions can not be applied to tags diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index e84d19086f5e0033a6a5b31e26e9b650904dae62..eac402df86c193a804e8fb4ad37258b27fb5a8fd 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -325,7 +325,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime; int64_t revisedSTime = - taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec); + taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec); SInterpolationInfo *pInterpoInfo = &pReducer->interpolationInfo; taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, @@ -800,7 +800,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime; int64_t revisedSTime = - taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec); + taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec); taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, pLocalReducer->rowSize); @@ -944,7 +944,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo while (1) { int32_t remains = taosNumOfRemainPoints(pInterpoInfo); TSKEY etime = taosGetRevisedEndKey(actualETime, pQueryInfo->order.order, pQueryInfo->intervalTime, - pQueryInfo->intervalTimeUnit, precision); + pQueryInfo->slidingTimeUnit, precision); int32_t nrows = taosGetNumOfResultWithInterpo(pInterpoInfo, pPrimaryKeys, remains, pQueryInfo->intervalTime, etime, pLocalReducer->resColModel->capacity); @@ -1296,7 +1296,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) { int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime; int64_t newTime = - taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, precision); + taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision); taosInitInterpoInfo(&pLocalReducer->interpolationInfo, pQueryInfo->order.order, newTime, pQueryInfo->groupbyExpr.numOfGroupCols, pLocalReducer->rowSize); @@ -1326,7 +1326,7 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) { int32_t remain = taosNumOfRemainPoints(pInterpoInfo); TSKEY ekey = - taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, p); + taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, p); int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pLocalReducer->pBufForInterpo, remain, pQueryInfo->intervalTime, ekey, pLocalReducer->resColModel->capacity); if (rows > 0) { // do interpo @@ -1359,7 +1359,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { int64_t etime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->etime : pQueryInfo->stime; etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, - pQueryInfo->intervalTimeUnit, precision); + pQueryInfo->slidingTimeUnit, precision); int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, NULL, 0, pQueryInfo->intervalTime, etime, pLocalReducer->resColModel->capacity); if (rows > 0) { // do interpo diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 94fba04b3e613ac380ff2d3ad1e62afbe24c1ee2..db9dea2384c0eece14f000a6bd62298488954994 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1681,7 +1681,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime); - pQueryMsg->intervalTimeUnit = pQueryInfo->intervalTimeUnit; + pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit; pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime); if (pQueryInfo->intervalTime < 0) { diff --git a/src/connector/go b/src/connector/go deleted file mode 160000 index 8c58c512b6acda8bcdfa48fdc7140227b5221766..0000000000000000000000000000000000000000 --- a/src/connector/go +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766 diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 4307e41b0b0689c3a9803253b16baccd6685ef5f..56b5a020a641fc9187b2a47fdf25a94f9566b5cc 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -513,7 +513,7 @@ typedef struct { int16_t orderColId; int16_t numOfCols; // the number of columns will be load from vnode - char intervalTimeUnit; // time interval type, for revisement of interval(1d) + char slidingTimeUnit; // time interval type, for revisement of interval(1d) int64_t intervalTime; // time interval for aggregation, in million second int64_t slidingTime; // value for sliding window diff --git a/src/inc/tinterpolation.h b/src/inc/tinterpolation.h index f4b327bcbec82b2b9ca8e2f5c92b044700240dbc..6c0f6f921e4a8214ab440bffefb531995390bd65 100644 --- a/src/inc/tinterpolation.h +++ b/src/inc/tinterpolation.h @@ -30,7 +30,7 @@ typedef struct SInterpolationInfo { char * prevValues; // previous row of data char * nextValues; // next row of data int32_t numOfTags; - char ** pTags; // tags value for current interoplation + char ** pTags; // tags value for current interpolation } SInterpolationInfo; typedef struct SPoint { @@ -83,6 +83,8 @@ int32_t taosDoInterpoResult(SInterpolationInfo *pInterpoInfo, int16_t interpoTyp int taosDoLinearInterpolation(int32_t type, SPoint *point1, SPoint *point2, SPoint *point); +int taosDoLinearInterpolationD(int32_t type, SPoint* point1, SPoint* point2, SPoint* point); + #ifdef __cplusplus } #endif diff --git a/src/inc/tsqlfunction.h b/src/inc/tsqlfunction.h index b42358967213ae182574cd6a4c806fc090431af4..b2e53a931b40ec99e4c1f6672115f5862f808ffb 100644 --- a/src/inc/tsqlfunction.h +++ b/src/inc/tsqlfunction.h @@ -167,6 +167,11 @@ typedef struct SExtTagsInfo { struct SQLFunctionCtx **pTagCtxList; } SExtTagsInfo; +typedef struct SBoundaryData { + TSKEY key; + double data; +} SBoundaryData; + // sql function runtime context typedef struct SQLFunctionCtx { int32_t startOffset; @@ -195,6 +200,8 @@ typedef struct SQLFunctionCtx { SResultInfo *resultInfo; SExtTagsInfo tagInfo; + SBoundaryData prev; // this value may be less or equalled to the start time of time window + SBoundaryData next; // this value may be greater or equalled to the end time of time window } SQLFunctionCtx; typedef struct SQLAggFuncElem { diff --git a/src/system/detail/inc/vnode.h b/src/system/detail/inc/vnode.h index d5ceac2a321744df7f5a6de06315f0d8d7b96046..6dfc4709ed901a61bcfc21cb30ed7419f7e05c36 100644 --- a/src/system/detail/inc/vnode.h +++ b/src/system/detail/inc/vnode.h @@ -261,7 +261,7 @@ typedef struct SQuery { TSKEY ekey; int64_t intervalTime; int64_t slidingTime; // sliding time for sliding window query - char intervalTimeUnit; // interval data type, used for daytime revise + char slidingTimeUnit; // interval data type, used for daytime revise int8_t precision; int16_t numOfOutputCols; int16_t interpoType; diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index c26778b32866ee3252e1c5456a95b565d1cd2e44..71318ca7971c8343e41866275660ac81e8dfda0f 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -85,12 +85,6 @@ typedef enum { QUERY_NO_DATA_TO_CHECK = 0x8u, } vnodeQueryStatus; -typedef struct SPointInterpoSupporter { - int32_t numOfCols; - char** pPrevPoint; - char** pNextPoint; -} SPointInterpoSupporter; - typedef struct SBlockInfo { TSKEY keyFirst; TSKEY keyLast; @@ -285,6 +279,7 @@ void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv); int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo); void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot); void closeAllTimeWindow(SWindowResInfo* pWindowResInfo); +SWindowResult* getWindowRes(SWindowResInfo* pWindowResInfo, size_t index); #ifdef __cplusplus } diff --git a/src/system/detail/inc/vnodeRead.h b/src/system/detail/inc/vnodeRead.h index 0e5f40cd47819c0f400068dce9c7e98eb6a77dd8..0b636db110b9816e14a32cf1de8a3a6b913cdf65 100644 --- a/src/system/detail/inc/vnodeRead.h +++ b/src/system/detail/inc/vnodeRead.h @@ -141,6 +141,12 @@ typedef struct SWindowResInfo { int64_t threshold; // threshold for return completed results. } SWindowResInfo; +typedef struct SPointInterpoSupporter { + int32_t numOfCols; + char** pPrevPoint; + char** pNextPoint; +} SPointInterpoSupporter; + typedef struct SQueryRuntimeEnv { SPositionInfo startPos; /* the start position, used for secondary/third iteration */ SPositionInfo endPos; /* the last access position in query, served as the start pos of reversed order query */ @@ -172,6 +178,10 @@ typedef struct SQueryRuntimeEnv { bool stableQuery; // is super table query or not SQueryDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file + bool hasTimeWindow; + char** lastRowInBlock; + bool interpoSearch; + /* * Temporarily hold the in-memory cache block info during scan cache blocks * Here we do not use the cache block info from pMeterObj, simple because it may change anytime diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 877602c97fafe2fcb03304049ecc27c0519e4bdd..3c1dde7ced23489e5b4ec33f603b1c8b22479119 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -80,6 +80,17 @@ static int32_t getGroupResultId(int32_t groupIndex) { return base + (groupIndex * 10000); } +static bool needsBoundaryTS(SQuery *pQuery) { + for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; + if (functionId == TSDB_FUNC_RATE) { + return true; + } + } + + return false; +} + static FORCE_INLINE bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; } // check the offset value integrity @@ -579,9 +590,9 @@ bool doRevisedResultsByLimit(SQInfo *pQInfo) { return false; } -static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t StartQueryTimestamp, void *inputData, +static void setExecParams(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int64_t StartQueryTimestamp, void *inputData, char *primaryColumnData, int32_t size, int32_t functionId, SField *pField, bool hasNull, - int32_t blockStatus, void *param, int32_t scanFlag); + void *param); void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo); @@ -1518,14 +1529,6 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t w.ekey = w.skey + pQuery->intervalTime - 1; } - /* - * query border check, skey should not be bounded by the query time range, since the value skey will - * be used as the time window index value. So we only change ekey of time window accordingly. - */ - if (w.ekey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) { - w.ekey = pQuery->ekey; - } - assert(ts >= w.skey && ts <= w.ekey && w.skey != 0); return w; @@ -1651,8 +1654,12 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, continue; } - if ((pResult->window.ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) || - (pResult->window.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) { + /* + * when the ekey equals to lastKey of current block, do NOT close it, since the interpolation may + * be involved. + */ + if ((pResult->window.ekey < lastKey && QUERY_IS_ASC_QUERY(pQuery)) || + (pResult->window.skey > lastKey && !QUERY_IS_ASC_QUERY(pQuery))) { closeTimeWindow(pWindowResInfo, i); } else { skey = pResult->window.skey; @@ -1742,7 +1749,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat pCtx[k].size = forwardStep; pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? startPos : startPos - (forwardStep - 1); - if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) { + if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0 || functionId == TSDB_FUNC_RATE) { pCtx[k].ptsList = (TSKEY *)((char*)pRuntimeEnv->primaryColBuffer->data + pCtx[k].startOffset * TSDB_KEYSIZE); } @@ -1829,6 +1836,31 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow } } +static void handleInBlockEkeyInterpolation(SQueryRuntimeEnv* pRuntimeEnv, int32_t endPos, + const TSKEY* primaryKeyCol, STimeWindow* win, SQLFunctionCtx* pCtx) { + // this query time window ended in the current data block + SQuery* pQuery = pRuntimeEnv->pQuery; + TSKEY lastKey = primaryKeyCol[endPos]; + + TSKEY e = win->skey + pQuery->intervalTime; + TSKEY next = primaryKeyCol[endPos + 1]; + + // the next key is beyond the query time range + if ((next > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (next > pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))) { + pCtx->next.key = -1; + return; + } + + pCtx->next.key = e; + char *d = pCtx->aInputElemBuf + pCtx->inputBytes * endPos; + + SPoint point1 = (SPoint){.key = lastKey, .val = d}; + SPoint point2 = (SPoint){.key = next, .val = (d + pCtx->inputBytes)}; + SPoint point = (SPoint){.key = pCtx->next.key, .val = &pCtx->next.data}; + + taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point); +} + static TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) { TSKEY ekey = -1; if (QUERY_IS_ASC_QUERY(pQuery)) { @@ -1846,6 +1878,255 @@ static TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) { return ekey; } +static void interpolateEndKeyValue(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* pBlockInfo, STimeWindow* win, + int32_t endPos, SQLFunctionCtx* pCtx, int32_t index) { + SQuery* pQuery = pRuntimeEnv->pQuery; + + TSKEY *primaryKeyCol = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; + + // if current query window beyonds the whole query window, do not employ the interpolation + if ((win->ekey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (win->ekey >= pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))) { + pCtx->next.key = -1; + return; + } + + if (!QUERY_IS_ASC_QUERY(pQuery) && win->skey >= pBlockInfo->keyLast) { + pCtx->next.key = -1; + return; + } + + if (QUERY_IS_ASC_QUERY(pQuery)) { + + /* + * the time window closed before current data block, use the interpolation to generate + * the final result part, endPos equals to -1 means that this time window ends before current data block. + */ + if (win->ekey < pBlockInfo->keyFirst) { + assert(endPos == -1); + + TSKEY prev = *(int64_t*) pRuntimeEnv->lastRowInBlock[0]; + TSKEY next = pBlockInfo->keyFirst; + + pCtx->next.key = win->skey + pQuery->intervalTime; + + char *d = pCtx->aInputElemBuf; + + SPoint point1 = (SPoint){.key = prev, .val = pRuntimeEnv->lastRowInBlock[index]}; + SPoint point2 = (SPoint){.key = next, .val = d}; + SPoint point = (SPoint){.key = pCtx->next.key, .val = &pCtx->next.data}; + + taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point); + } else if (win->ekey < pBlockInfo->keyLast) { + handleInBlockEkeyInterpolation(pRuntimeEnv, endPos, primaryKeyCol, win, pCtx); + } else { + //do nothing now, the interpolation will be handled before processing the next data block + assert(win->ekey >= pBlockInfo->keyLast); + pCtx->next.key = -1; + } + } else { // desc order query + + //the time window closed before current data block, use the interpolation to generate the final result part. + if (win->ekey >= pBlockInfo->keyLast) { + TSKEY prev = pBlockInfo->keyLast; + TSKEY next = *(TSKEY*) pRuntimeEnv->lastRowInBlock[0]; + + pCtx->next.key = win->skey + pQuery->intervalTime; + + char *d = pCtx->aInputElemBuf + (pBlockInfo->size - 1) * pCtx->inputBytes; + + SPoint point1 = (SPoint){.key = prev, .val = d}; + SPoint point2 = (SPoint){.key = next, .val = pRuntimeEnv->lastRowInBlock[index]}; + SPoint point = (SPoint){.key = pCtx->next.key, .val = &pCtx->next.data}; + + taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point); + } else if (win->ekey < pBlockInfo->keyLast) { + handleInBlockEkeyInterpolation(pRuntimeEnv, endPos, primaryKeyCol, win, pCtx); + } else { + pCtx->next.key = -1; + } + } + +} + +static void handleInBlockSkeyInterpolation (SQueryRuntimeEnv* pRuntimeEnv, int32_t startPos, + const TSKEY* primaryKeyCol, STimeWindow* win, SQLFunctionCtx* pCtx) { + assert(startPos > 0); + + SQuery* pQuery = pRuntimeEnv->pQuery; + + TSKEY prev = primaryKeyCol[startPos - 1]; + TSKEY next = primaryKeyCol[startPos]; + + if (!QUERY_IS_ASC_QUERY(pQuery) && prev < pQuery->ekey) { + pCtx->prev.key = -1; + return; + } + + pCtx->prev.key = win->skey; + char *d = pCtx->aInputElemBuf + pCtx->inputBytes * (startPos - 1); + + SPoint point1 = (SPoint){.key = prev, .val = d}; + SPoint point2 = (SPoint){.key = next, .val = (d + pCtx->inputBytes)}; + SPoint point = (SPoint){.key = pCtx->prev.key, .val = &pCtx->prev.data}; + + taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point); +} + +static void interpolateStartKeyValue(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* pBlockInfo, SWindowResInfo* pWindowResInfo, + STimeWindow* win, int32_t startPos, SQLFunctionCtx* pCtx, int32_t index) { + SQuery* pQuery = pRuntimeEnv->pQuery; + TSKEY *primaryKeyCol = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; + TSKEY skey = primaryKeyCol[startPos]; + + /* + * no need the start time interpolation + * 1. current window is the first window in either ascending or descending order output + * 2. time window start exactly from a timestamp with data + */ + if (skey == win->skey || win->skey < pWindowResInfo->startTime || + (win->skey <= pQuery->skey && QUERY_IS_ASC_QUERY(pQuery)) || + (win->skey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { + pCtx->prev.key = -1; + return; + } + + if (QUERY_IS_ASC_QUERY(pQuery)) { + // the queried time window and time window of data block must be intersect +// assert(win->ekey >= pBlockInfo->keyFirst && win->skey <= pBlockInfo->keyLast); + + /* + * this win should not be the first time window that starts from a less timestamp than + * the skey of current data block + */ + if (win->skey < pBlockInfo->keyFirst) { + TSKEY prev = *(TSKEY*) pRuntimeEnv->lastRowInBlock[0]; + TSKEY next = pBlockInfo->keyFirst; + + pCtx->prev.key = win->skey; + char *d = pCtx->aInputElemBuf; + + SPoint point1 = (SPoint){.key = prev, .val = pRuntimeEnv->lastRowInBlock[index]}; + SPoint point2 = (SPoint){.key = next, .val = d}; + SPoint point = (SPoint){.key = pCtx->prev.key, .val = &pCtx->prev.data}; + + taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point); + } else { + handleInBlockSkeyInterpolation(pRuntimeEnv, startPos, primaryKeyCol, win, pCtx); + } + } else { // desc order + if (win->skey > pBlockInfo->keyLast) { + //this pBlockInfo located before current time window + TSKEY prev = pBlockInfo->keyLast; + TSKEY next = *(TSKEY*) pRuntimeEnv->lastRowInBlock[0]; + + pCtx->prev.key = win->skey; + char *d = pCtx->aInputElemBuf + (pBlockInfo->size - 1) * pCtx->inputBytes; + + SPoint point1 = (SPoint){.key = prev, .val = d}; + SPoint point2 = (SPoint){.key = next, .val = pRuntimeEnv->lastRowInBlock[index]}; + SPoint point = (SPoint){.key = pCtx->prev.key, .val = &pCtx->prev.data}; + + taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point); + } else { + // the queried time window and time window of data block must be intersect + assert(win->ekey >= pBlockInfo->keyFirst && win->skey <= pBlockInfo->keyLast); + + if (win->skey >= pBlockInfo->keyFirst) { + // the pBlockInfo is intersected with query time window + handleInBlockSkeyInterpolation(pRuntimeEnv, startPos, primaryKeyCol, win, pCtx); + } else { + assert(win->skey < pBlockInfo->keyFirst && win->ekey >= pBlockInfo->keyFirst); + pCtx->prev.key = -1; + } + } + } +} + +static void doSetInterpolationDataForTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo *pWindowResInfo, + SBlockInfo* pBlockInfo, STimeWindow* win, int32_t startPos, int32_t forwardStep) { + + SQuery* pQuery = pRuntimeEnv->pQuery; + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + + if (!pRuntimeEnv->interpoSearch) { + return; + } + + int32_t s = startPos; + int32_t e = forwardStep * step + startPos - step; + + if (!QUERY_IS_ASC_QUERY(pQuery)) { + SWAP(s, e, int32_t); + } + + // interpolate for skey value + for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + if (pQuery->pSelectExpr[i].pBase.functionId != TSDB_FUNC_RATE) { + continue; + } + + SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo; + interpolateStartKeyValue(pRuntimeEnv, pBlockInfo, pWindowResInfo, win, s, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf); + } + + // interpolate for ekey value + for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + if (pQuery->pSelectExpr[i].pBase.functionId != TSDB_FUNC_RATE) { + continue; + } + + SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo; + interpolateEndKeyValue(pRuntimeEnv, pBlockInfo, win, e, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf); + } +} + +static void doInterpolatePrevTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo, SBlockInfo* pBlockInfo, + TSKEY ts, int32_t offset, STimeWindow* win) { + // get current not closed time window + SQuery* pQuery = pRuntimeEnv->pQuery; + + int32_t slot = pWindowResInfo->curIndex; + if (slot == -1 || !pRuntimeEnv->interpoSearch) { + return; + } + + while (slot < pWindowResInfo->size) { + STimeWindow w = getWindowResult(pWindowResInfo, slot)->window; + if (w.skey == win->skey) { + assert(w.ekey == win->ekey); + break; + } + + // do not check for the closed time window + SWindowResult* pWindowRes = getWindowRes(pWindowResInfo, slot); + if (pWindowRes->status.closed) { + slot += 1; + continue; + } + + // if current active window locates before current data block, do interpolate the result and close it + assert((w.skey < win->skey && w.ekey < ts && QUERY_IS_ASC_QUERY(pQuery)) || + (w.skey > win->skey && w.skey > ts && !QUERY_IS_ASC_QUERY(pQuery))); + + int32_t forwardStep = 0; + doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &w, offset, forwardStep); + + // set correct output buffer for interplate result. todo handle error + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &w) != TSDB_CODE_SUCCESS) { + continue; + } + + SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, slot); + doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &w, pQuery->pos, forwardStep); + + closeTimeWindow(pWindowResInfo, slot); + + // try next time window + slot += 1; + } +} + /** * * @param pRuntimeEnv @@ -1872,7 +2153,6 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; SField dummyField = {0}; - bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock); char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, forwardStep); @@ -1890,29 +2170,33 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t } } - setExecParams(pQuery, &pCtx[k], pQuery->skey, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField, - hasNull, pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag); + setExecParams(pRuntimeEnv, &pCtx[k], pQuery->skey, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField, + hasNull, &sasArray[k]); } int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); if (isIntervalQuery(pQuery)) { int32_t offset = GET_COL_DATA_POS(pQuery, 0, step); TSKEY ts = primaryKeyCol[offset]; - + STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); + doInterpolatePrevTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, ts, offset, &win); + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win) != TSDB_CODE_SUCCESS) { return 0; } TSKEY ekey = reviseWindowEkey(pQuery, &win); forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, true); - + + doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &win, offset, forwardStep); + SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep); - int32_t index = pWindowResInfo->curIndex; + int32_t index = pWindowResInfo->curIndex; + STimeWindow nextWin = win; - while (1) { int32_t startPos = getNextQualifiedWindow(pRuntimeEnv, &nextWin, pWindowResInfo, pBlockInfo, primaryKeyCol, searchFn); @@ -1928,7 +2212,9 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t ekey = reviseWindowEkey(pQuery, &nextWin); forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, startPos, ekey, searchFn, true); - + + doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &nextWin, startPos, forwardStep); + pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep); } @@ -1942,6 +2228,9 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t */ for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; + pCtx[k].next.key = -1; + pCtx[k].prev.key = -1; + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { aAggs[functionId].xFunction(&pCtx[k]); } @@ -1956,6 +2245,14 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t if (!isIntervalQuery(pQuery)) { num = getNumOfResult(pRuntimeEnv) - prevNumOfRes; } + + // save the last row in current data block + for(int32_t i = 0; i < pQuery->numOfCols; ++i) { + SColumnInfo* pColInfo = &pQuery->colList[i].data; + int32_t s = (QUERY_IS_ASC_QUERY(pQuery))? pColInfo->bytes * (pBlockInfo->size - 1) : 0; + + memcpy(pRuntimeEnv->lastRowInBlock[i], pRuntimeEnv->colDataBuffer[i]->data + s, pColInfo->bytes); + } tfree(sasArray); return (int32_t)num; @@ -2166,6 +2463,31 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) { } } +SWindowResult* getWindowRes(SWindowResInfo* pWindowResInfo, size_t index) { + assert(index < pWindowResInfo->size); + return &pWindowResInfo->pResult[index]; +} + +/* + * remove the results that are not the FIRST time window that spreads beyond the + * the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time + */ +void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order) { + assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size); + + int32_t i = 0; + while(i < pWindowResInfo->size && + ((pWindowResInfo->pResult[i].window.ekey < lastKey && order == QUERY_ASC_FORWARD_STEP) || + (pWindowResInfo->pResult[i].window.skey > lastKey && order == QUERY_DESC_FORWARD_STEP))) { + ++i; + } + +// assert(i < pWindowResInfo->size); + if (i < pWindowResInfo->size) { + pWindowResInfo->size = (i + 1); + } +} + static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes) { if (isNull(pData, type)) { // ignore the null value return -1; @@ -2311,10 +2633,9 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock); char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, *forwardStep); - TSKEY ts = pQuery->skey; // QUERY_IS_ASC_QUERY(pQuery) ? pRuntimeEnv->intervalWindow.skey : - // pRuntimeEnv->intervalWindow.ekey; - setExecParams(pQuery, &pCtx[k], ts, dataBlock, (char *)primaryKeyCol, (*forwardStep), functionId, pFields, hasNull, - pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag); + TSKEY ts = pQuery->skey; + setExecParams(pRuntimeEnv, &pCtx[k], ts, dataBlock, (char *)primaryKeyCol, (*forwardStep), functionId, pFields, hasNull, + &sasArray[k]); } // set the input column data @@ -2340,7 +2661,9 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * int32_t j = 0; TSKEY lastKey = -1; - + int32_t lastIndex = -1; + bool firstAccessedPoint = true; + for (j = 0; j < (*forwardStep); ++j) { int32_t offset = GET_COL_DATA_POS(pQuery, j, step); @@ -2362,8 +2685,31 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * // interval window query if (isIntervalQuery(pQuery)) { // decide the time window according to the primary timestamp - int64_t ts = primaryKeyCol[offset]; + TSKEY ts = primaryKeyCol[offset]; + STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); +// if (firstAccessedPoint) { +// doInterpolatePrevTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, ts, offset, &win); +// firstAccessedPoint = false; +// } else { +// int32_t index = pWindowResInfo->curIndex; +// STimeWindow w = getWindowResult(pWindowResInfo, index)->window; +// +// if (w.skey == win.skey) { // do nothing +// assert(w.ekey == win.ekey); +// } else { +// assert((w.skey < win.skey && w.ekey < ts && QUERY_IS_ASC_QUERY(pQuery)) || +// (w.skey > win.skey && w.skey > ts && !QUERY_IS_ASC_QUERY(pQuery))); +// +// // set the endkey interpolation for the previous +// for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { +// SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo; +// +// interpolateEndKeyValue(pRuntimeEnv, pBlockInfo, win, e, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf); +// } +// +// } +// } int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code @@ -2377,6 +2723,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &win, offset); lastKey = ts; + lastIndex = j; + STimeWindow nextWin = win; int32_t index = pWindowResInfo->curIndex; int32_t sid = pRuntimeEnv->pMeterObj->sid; @@ -2421,6 +2769,9 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; + pCtx[k].next.key = -1; + pCtx[k].prev.key = -1; + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { aAggs[functionId].xFunctionF(&pCtx[k], offset); } @@ -2434,7 +2785,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * break; } } - + /* * pointsOffset is the maximum available space in result buffer update the actual forward step for query that * requires checking buffer during loop @@ -2445,6 +2796,15 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * break; } } + + // save the last accessed row of current data block for interpolation + int32_t index = GET_COL_DATA_POS(pQuery, lastIndex, step); + for(int32_t i = 0; i < pQuery->numOfCols; ++i) { + SColumnInfo* pColInfo = &pQuery->colList[i].data; + int32_t s = pColInfo->bytes * index; + + memcpy(pRuntimeEnv->lastRowInBlock[i], pRuntimeEnv->colDataBuffer[i]->data + s, pColInfo->bytes); + } free(sasArray); @@ -2657,11 +3017,24 @@ int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeter return fileIndex; } -void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t startQueryTimestamp, void *inputData, +static void getOneRowFromDataBlock(SQueryRuntimeEnv *pRuntimeEnv, char **dst, int32_t pos) { + SQuery *pQuery = pRuntimeEnv->pQuery; + + for (int32_t i = 0; i < pQuery->numOfCols; ++i) { + int32_t bytes = pQuery->colList[i].data.bytes; + memcpy(dst[i], pRuntimeEnv->colDataBuffer[i]->data + pos * bytes, bytes); + } +} + +void setExecParams(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int64_t startQueryTimestamp, void *inputData, char *primaryColumnData, int32_t size, int32_t functionId, SField *pField, bool hasNull, - int32_t blockStatus, void *param, int32_t scanFlag) { + void *param) { + SQuery* pQuery = pRuntimeEnv->pQuery; int32_t startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? pQuery->pos : pQuery->pos - (size - 1); - + + int32_t scanFlag = pRuntimeEnv->scanFlag; + int32_t blockStatus = pRuntimeEnv->blockStatus; + pCtx->nStartQueryTimestamp = startQueryTimestamp; pCtx->scanFlag = scanFlag; @@ -2913,6 +3286,12 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { tfree(pRuntimeEnv->pInterpoBuf); } + for (int32_t i = 0; i < pQuery->numOfCols; ++i) { + tfree(pRuntimeEnv->lastRowInBlock[i]); + } + + tfree(pRuntimeEnv->lastRowInBlock); + destroyDiskbasedResultBuf(pRuntimeEnv->pResultBuf); pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); } @@ -2924,6 +3303,7 @@ static int64_t getOldestKey(int32_t numOfFiles, int64_t fileId, SVnodeCfg *pCfg) } bool isQueryKilled(SQuery *pQuery) { + return false; SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); /* @@ -3324,7 +3704,7 @@ void vnodeCheckIfDataExists(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t pKey, int64_t keyFirst, int64_t keyLast, int64_t *actualSkey, int64_t *actualEkey, int64_t *skey, int64_t *ekey) { assert(pKey >= keyFirst && pKey <= keyLast); - *skey = taosGetIntervalStartTimestamp(pKey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision); + *skey = taosGetIntervalStartTimestamp(pKey, pQuery->slidingTime, pQuery->slidingTimeUnit, pQuery->precision); if (keyFirst > (INT64_MAX - pQuery->intervalTime)) { /* @@ -3355,13 +3735,62 @@ void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t pKey, int64_t ke } } -static void getOneRowFromDataBlock(SQueryRuntimeEnv *pRuntimeEnv, char **dst, int32_t pos) { - SQuery *pQuery = pRuntimeEnv->pQuery; - - for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - int32_t bytes = pQuery->colList[i].data.bytes; - memcpy(dst[i], pRuntimeEnv->colDataBuffer[i]->data + pos * bytes, bytes); +static bool loadPrevDataPoint(SQueryRuntimeEnv* pRuntimeEnv, char** result) { + SQuery* pQuery = pRuntimeEnv->pQuery; + SMeterObj* pMeterObj = pRuntimeEnv->pMeterObj; + + /* the qualified point is not the first point in data block */ + if (pQuery->pos > 0) { + int32_t prevPos = pQuery->pos - 1; + + /* save the point that is directly after the specified point */ + getOneRowFromDataBlock(pRuntimeEnv, result, prevPos); + } else { + __block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm]; + + savePointPosition(&pRuntimeEnv->startPos, pQuery->fileId, pQuery->slot, pQuery->pos); + + // backwards movement would not set the pQuery->pos correct. We need to set it manually later. + moveToNextBlock(pRuntimeEnv, QUERY_DESC_FORWARD_STEP, searchFn, true); + + /* + * no previous data exists. + * reset the status and load the data block that contains the qualified point + */ + if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) { + dTrace("QInfo:%p no previous data block, start fileId:%d, slot:%d, pos:%d, qrange:%" PRId64 "-%" PRId64 + ", out of range", + GET_QINFO_ADDR(pQuery), pRuntimeEnv->startPos.fileId, pRuntimeEnv->startPos.slot, + pRuntimeEnv->startPos.pos, pQuery->skey, pQuery->ekey); + + // no result, return immediately + setQueryStatus(pQuery, QUERY_COMPLETED); + return false; + } else { // prev has been located + if (pQuery->fileId >= 0) { + pQuery->pos = pQuery->pBlock[pQuery->slot].numOfPoints - 1; + getOneRowFromDataBlock(pRuntimeEnv, result, pQuery->pos); + + qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery), + pQuery->fileId, pQuery->slot, pQuery->pos, pQuery->pos); + + // restore to the start position + loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->startPos); + } else { + // moveToNextBlock make sure there is a available cache block, if exists + assert(vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD); + SCacheBlock* pBlock = &pRuntimeEnv->cacheBlock; + + pQuery->pos = pBlock->numOfPoints - 1; + getOneRowFromDataBlock(pRuntimeEnv, result, pQuery->pos); + + qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery), + pQuery->fileId, pQuery->slot, pBlock->numOfPoints - 1, pQuery->pos); + } + } } + + return true; } static bool getNeighborPoints(STableQuerySupportObj *pSupporter, SMeterObj *pMeterObj, @@ -3381,10 +3810,8 @@ static bool getNeighborPoints(STableQuerySupportObj *pSupporter, SMeterObj *pMet } else { assert(QUERY_IS_ASC_QUERY(pQuery)); } + assert(pPointInterpSupporter != NULL && pQuery->skey == pQuery->ekey); - - SCacheBlock *pBlock = NULL; - qTrace("QInfo:%p get next data point, fileId:%d, slot:%d, pos:%d", GET_QINFO_ADDR(pQuery), pQuery->fileId, pQuery->slot, pQuery->pos); @@ -3413,55 +3840,9 @@ static bool getNeighborPoints(STableQuerySupportObj *pSupporter, SMeterObj *pMet } return true; } - - /* the qualified point is not the first point in data block */ - if (pQuery->pos > 0) { - int32_t prevPos = pQuery->pos - 1; - - /* save the point that is directly after the specified point */ - getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, prevPos); - } else { - __block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm]; - - savePointPosition(&pRuntimeEnv->startPos, pQuery->fileId, pQuery->slot, pQuery->pos); - - // backwards movement would not set the pQuery->pos correct. We need to set it manually later. - moveToNextBlock(pRuntimeEnv, QUERY_DESC_FORWARD_STEP, searchFn, true); - - /* - * no previous data exists. - * reset the status and load the data block that contains the qualified point - */ - if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) { - dTrace("QInfo:%p no previous data block, start fileId:%d, slot:%d, pos:%d, qrange:%" PRId64 "-%" PRId64 - ", out of range", - GET_QINFO_ADDR(pQuery), pRuntimeEnv->startPos.fileId, pRuntimeEnv->startPos.slot, - pRuntimeEnv->startPos.pos, pQuery->skey, pQuery->ekey); - - // no result, return immediately - setQueryStatus(pQuery, QUERY_COMPLETED); - return false; - } else { // prev has been located - if (pQuery->fileId >= 0) { - pQuery->pos = pQuery->pBlock[pQuery->slot].numOfPoints - 1; - getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos); - - qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery), - pQuery->fileId, pQuery->slot, pQuery->pos, pQuery->pos); - } else { - // moveToNextBlock make sure there is a available cache block, if exists - assert(vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD); - pBlock = &pRuntimeEnv->cacheBlock; - - pQuery->pos = pBlock->numOfPoints - 1; - getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos); - - qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery), - pQuery->fileId, pQuery->slot, pBlock->numOfPoints - 1, pQuery->pos); - } - } - } - + + loadPrevDataPoint(pRuntimeEnv, pPointInterpSupporter->pPrevPoint); + pQuery->skey = *(TSKEY *)pPointInterpSupporter->pPrevPoint[0]; pQuery->ekey = *(TSKEY *)pPointInterpSupporter->pNextPoint[0]; pQuery->lastKey = pQuery->skey; @@ -3635,8 +4016,17 @@ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, STableQuerySup if (key != NULL) { *key = nextKey; } - - return doGetQueryPos(nextKey, pSupporter, pPointInterpSupporter); + + // needs the data before the begin timestamp of query time window + if (nextKey != pQuery->skey) { + if (!pRuntimeEnv->hasTimeWindow) { + pQuery->skey = nextKey; // change the query skey + pQuery->lastKey = pQuery->skey; + } + return true; + } else { + return doGetQueryPos(nextKey, pSupporter, pPointInterpSupporter); + } } // set no data in file @@ -4223,63 +4613,6 @@ static bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo, STableQuerySupportObj * } } -// if (win.ekey <= blockInfo.keyLast) { -// pQuery->limit.offset -= 1; -// -// if (win.ekey == blockInfo.keyLast) { -// moveToNextBlock(pRuntimeEnv, step, searchFn, false); -// if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) { -// break; -// } -// -// // next block does not included in time range, abort query -// blockInfo = getBlockInfo(pRuntimeEnv); -// if ((blockInfo.keyFirst > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || -// (blockInfo.keyLast < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { -// setQueryStatus(pQuery, QUERY_COMPLETED); -// break; -// } -// -// // set the window that start from the next data block -// win = getActiveTimeWindow(pWindowResInfo, blockInfo.keyFirst, pQuery); -// } else { -// // the time window is closed in current data block, load disk file block into memory to -// // check the next time window -// if (IS_DISK_DATA_BLOCK(pQuery)) { -// getTimestampInDiskBlock(pRuntimeEnv, 0); -// } -// -// STimeWindow nextWin = win; -// int32_t startPos = -// getNextQualifiedWindow(pRuntimeEnv, &nextWin, pWindowResInfo, &blockInfo, primaryKey, searchFn); -// -// if (startPos < 0) { // failed to find the qualified time window -// assert((nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || -// (nextWin.ekey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))); -// -// setQueryStatus(pQuery, QUERY_COMPLETED); -// break; -// } else { // set the abort info -// pQuery->pos = startPos; -// pQuery->lastKey = primaryKey[startPos]; -// win = nextWin; -// } -// } -// -// continue; -// } -// -// moveToNextBlock(pRuntimeEnv, step, searchFn, false); -// if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) { -// break; -// } -// -// blockInfo = getBlockInfo(pRuntimeEnv); -// if ((blockInfo.keyFirst > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || -// (blockInfo.keyLast < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { -// setQueryStatus(pQuery, QUERY_COMPLETED); -// break; -// } } if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED) || pQuery->limit.offset > 0) { @@ -4468,7 +4801,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI } void pointInterpSupporterInit(SQuery *pQuery, SPointInterpoSupporter *pInterpoSupport) { - if (isPointInterpoQuery(pQuery)) { + if (isPointInterpoQuery(pQuery) || needsBoundaryTS(pQuery)) { pInterpoSupport->pPrevPoint = malloc(pQuery->numOfCols * POINTER_BYTES); pInterpoSupport->pNextPoint = malloc(pQuery->numOfCols * POINTER_BYTES); @@ -4547,12 +4880,15 @@ static int32_t allocateRuntimeEnvBuf(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *p SQuery *pQuery = pRuntimeEnv->pQuery; // To make sure the start position of each buffer is aligned to 4bytes in 32-bit ARM system. + pRuntimeEnv->lastRowInBlock = calloc(pQuery->numOfCols, POINTER_BYTES); for (int32_t i = 0; i < pQuery->numOfCols; ++i) { int32_t bytes = pQuery->colList[i].data.bytes; pRuntimeEnv->colDataBuffer[i] = calloc(1, sizeof(SData) + EXTRA_BYTES + pMeterObj->pointsPerFileBlock * bytes); if (pRuntimeEnv->colDataBuffer[i] == NULL) { goto _error_clean; } + + pRuntimeEnv->lastRowInBlock[i] = calloc(1, bytes); } // record the maximum column width among columns of this meter/metric @@ -4666,7 +5002,9 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pMeterObj = pMeterObj; - + pRuntimeEnv->hasTimeWindow = !notHasQueryTimeRange(pQuery); + pRuntimeEnv->interpoSearch = needsBoundaryTS(pQuery); + if ((code = allocateRuntimeEnvBuf(pRuntimeEnv, pMeterObj)) != TSDB_CODE_SUCCESS) { return code; } @@ -4721,9 +5059,9 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery /* query on single table */ pSupporter->numOfMeters = 1; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - - SPointInterpoSupporter interpInfo = {0}; - pointInterpSupporterInit(pQuery, &interpInfo); + + SPointInterpoSupporter interpoSupporter = {0}; + pointInterpSupporterInit(pQuery, &interpoSupporter); /* * in case of last_row query without query range, we set the query timestamp to @@ -4731,11 +5069,11 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery */ if (isFirstLastRowQuery(pQuery) && notHasQueryTimeRange(pQuery)) { - if (!normalizeUnBoundLastRowQuery(pSupporter, &interpInfo)) { + if (!normalizeUnBoundLastRowQuery(pSupporter, &interpoSupporter)) { sem_post(&pQInfo->dataReady); pQInfo->over = 1; - pointInterpSupporterDestroy(&interpInfo); + pointInterpSupporterDestroy(&interpoSupporter); return TSDB_CODE_SUCCESS; } } else { // find the skey and ekey in case of sliding query @@ -4749,23 +5087,34 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery } int64_t skey = 0; - if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &skey) == false) || + if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpoSupporter, &skey) == false) || (isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) || (isTopBottomQuery(pQuery) && pQuery->limit.offset >= pQuery->pSelectExpr[1].pBase.arg[0].argValue.i64)) { sem_post(&pQInfo->dataReady); pQInfo->over = 1; - pointInterpSupporterDestroy(&interpInfo); + pointInterpSupporterDestroy(&interpoSupporter); return TSDB_CODE_SUCCESS; } + pQuery->skey = skey; if (!QUERY_IS_ASC_QUERY(pQuery)) { win.skey = minKey; win.ekey = skey; + pQuery->ekey = minKey; } else { win.skey = skey; win.ekey = pQuery->ekey; } + + // empty result + if (QUERY_IS_ASC_QUERY(pQuery) && win.skey > win.ekey) { + sem_post(&pQInfo->dataReady); + pQInfo->over = 1; + + pointInterpSupporterDestroy(&interpoSupporter); + return TSDB_CODE_SUCCESS; + } TSKEY skey1, ekey1; TSKEY windowSKey = 0, windowEKey = 0; @@ -4784,13 +5133,13 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery pQuery->over = QUERY_NOT_COMPLETED; } else { int64_t ekey = 0; - if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &ekey) == false) || + if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpoSupporter, &ekey) == false) || (isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) || (isTopBottomQuery(pQuery) && pQuery->limit.offset >= pQuery->pSelectExpr[1].pBase.arg[0].argValue.i64)) { sem_post(&pQInfo->dataReady); pQInfo->over = 1; - pointInterpSupporterDestroy(&interpInfo); + pointInterpSupporterDestroy(&interpoSupporter); return TSDB_CODE_SUCCESS; } } @@ -4800,14 +5149,14 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery * here we set the value for before and after the specified time into the * parameter for interpolation query */ - pointInterpSupporterSetData(pQInfo, &interpInfo); - pointInterpSupporterDestroy(&interpInfo); + pointInterpSupporterSetData(pQInfo, &interpoSupporter); + pointInterpSupporterDestroy(&interpoSupporter); if (!forwardQueryStartPosIfNeeded(pQInfo, pSupporter, dataInDisk, dataInCache)) { return TSDB_CODE_SUCCESS; } - int64_t rs = taosGetIntervalStartTimestamp(pSupporter->rawSKey, pQuery->intervalTime, pQuery->intervalTimeUnit, + int64_t rs = taosGetIntervalStartTimestamp(pSupporter->rawSKey, pQuery->intervalTime, pQuery->slidingTimeUnit, pQuery->precision); taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, rs, 0, 0); allocMemForInterpo(pSupporter, pQuery, pMeterObj); @@ -4888,6 +5237,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { pSupporter->rawEKey = pQuery->ekey; pSupporter->rawSKey = pQuery->skey; pQuery->lastKey = pQuery->skey; + pRuntimeEnv->interpoSearch = needsBoundaryTS(pQuery); // create runtime environment SColumnModel *pTagSchemaInfo = pSupporter->pSidSet->pColumnModel; @@ -4943,7 +5293,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { } TSKEY revisedStime = taosGetIntervalStartTimestamp(pSupporter->rawSKey, pQuery->intervalTime, - pQuery->intervalTimeUnit, pQuery->precision); + pQuery->slidingTimeUnit, pQuery->precision); taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, revisedStime, 0, 0); pRuntimeEnv->stableQuery = true; @@ -5389,6 +5739,8 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { if (isIntervalQuery(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) { if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { closeAllTimeWindow(&pRuntimeEnv->windowResInfo); + removeRedundantWindow(&pRuntimeEnv->windowResInfo, pQuery->lastKey - step, step); + pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; } else if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { // check if window needs to be closed SBlockInfo blockInfo = getBlockInfo(pRuntimeEnv); @@ -5448,7 +5800,6 @@ void vnodeSetTagValueInParam(tSidSet *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, SM } // set the join tag for first column - SSqlFuncExprMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase; if (pFuncMsg->functionId == TSDB_FUNC_TS && pFuncMsg->colInfo.colIdx == PRIMARYKEY_TIMESTAMP_COL_INDEX && pRuntimeEnv->pTSBuf != NULL) { assert(pFuncMsg->numOfParams == 1); @@ -5474,9 +5825,6 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes pCtx[i].hasNull = true; pCtx[i].nStartQueryTimestamp = timestamp; pCtx[i].aInputElemBuf = getPosInResultPage(pRuntimeEnv, i, pWindowRes); - // pCtx[i].aInputElemBuf = ((char *)inputSrc->data) + - // ((int32_t)pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage) + - // pCtx[i].outputBytes * inputIdx; // in case of tag column, the tag information should be extracted from input buffer if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG) { @@ -7172,7 +7520,6 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, STableQuerySupportO doGetAlignedIntervalQueryRangeImpl(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &windowSKey, &windowEKey); pWindowResInfo->startTime = windowSKey; // windowSKey may be 0 in case of 1970 timestamp - // assert(pWindowResInfo->startTime > 0); if (pWindowResInfo->prevSKey == 0) { if (QUERY_IS_ASC_QUERY(pQuery)) { @@ -7477,6 +7824,8 @@ void stableApplyFunctionsOnBlock(STableQuerySupportObj *pSupporter, SMeterDataIn updateWindowResNumOfRes(pRuntimeEnv, pMeterDataInfo); updatelastkey(pQuery, pMeterQueryInfo); + + doCheckQueryCompleted(pRuntimeEnv, pMeterQueryInfo->lastKey, pWindowResInfo); } // we need to split the refstatsult into different packages. @@ -7536,7 +7885,7 @@ bool vnodeHasRemainResults(void *handle) { // query has completed if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { TSKEY ekey = taosGetRevisedEndKey(pSupporter->rawEKey, pQuery->order.order, pQuery->intervalTime, - pQuery->intervalTimeUnit, pQuery->precision); + pQuery->slidingTimeUnit, pQuery->precision); int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pRuntimeEnv->pInterpoBuf[0]->data, remain, pQuery->intervalTime, ekey, pQuery->pointsToRead); return numOfTotal > 0; @@ -7647,7 +7996,7 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage numOfRows = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo); TSKEY ekey = taosGetRevisedEndKey(pSupporter->rawEKey, pQuery->order.order, pQuery->intervalTime, - pQuery->intervalTimeUnit, pQuery->precision); + pQuery->slidingTimeUnit, pQuery->precision); int32_t numOfFinalRows = taosGetNumOfResultWithInterpo(&pRuntimeEnv->interpoInfo, (TSKEY *)pDataSrc[0]->data, numOfRows, pQuery->intervalTime, ekey, pQuery->pointsToRead); diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index 7e25bfaba478071fa826028d53d232edb6bd29f1..08b0b1eba6f2dcd18ae6d5c9d716404e7bff08a6 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -269,7 +269,7 @@ static SQInfo *vnodeAllocateQInfoEx(SQueryMeterMsg *pQueryMsg, SSqlGroupbyExpr * pQuery->intervalTime = pQueryMsg->intervalTime; pQuery->slidingTime = pQueryMsg->slidingTime; pQuery->interpoType = pQueryMsg->interpoType; - pQuery->intervalTimeUnit = pQueryMsg->intervalTimeUnit; + pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit; pQInfo->query.pointsToRead = vnodeList[pMeterObj->vnode].cfg.rowsInFileBlock; diff --git a/src/util/src/tinterpolation.c b/src/util/src/tinterpolation.c index cb7c8854ce914d22680db5429871857ac445f1fe..5c0b867208db595ad533406ce320c7940495c669 100644 --- a/src/util/src/tinterpolation.c +++ b/src/util/src/tinterpolation.c @@ -22,12 +22,12 @@ #define INTERPOL_IS_ASC_INTERPOL(interp) ((interp)->order == TSQL_SO_ASC) -int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char intervalTimeUnit, int16_t precision) { +int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char slidingTimeUnit, int16_t precision) { if (timeRange == 0) { return startTime; } - if (intervalTimeUnit == 'a' || intervalTimeUnit == 'm' || intervalTimeUnit == 's' || intervalTimeUnit == 'h') { + if (slidingTimeUnit == 'a' || slidingTimeUnit == 'm' || slidingTimeUnit == 's' || slidingTimeUnit == 'h') { return (startTime / timeRange) * timeRange; } else { /* @@ -95,11 +95,11 @@ void taosInterpoSetStartInfo(SInterpolationInfo* pInterpoInfo, int32_t numOfRawD pInterpoInfo->numOfRawDataInRows = numOfRawDataInRows; } -TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int32_t timeInterval, int8_t intervalTimeUnit, int8_t precision) { +TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int32_t timeInterval, int8_t slidingTimeUnit, int8_t precision) { if (order == TSQL_SO_ASC) { return ekey; } else { - return taosGetIntervalStartTimestamp(ekey, timeInterval, intervalTimeUnit, precision); + return taosGetIntervalStartTimestamp(ekey, timeInterval, slidingTimeUnit, precision); } } @@ -191,6 +191,49 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi return 0; } +int taosDoLinearInterpolationD(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) { + switch (type) { + case TSDB_DATA_TYPE_INT: { + *(double*) point->val = doLinearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, point1->key, + point2->key, point->key); + break; + } + case TSDB_DATA_TYPE_FLOAT: { + *(double*)point->val = + doLinearInterpolationImpl(*(float*)point1->val, *(float*)point2->val, point1->key, point2->key, point->key); + break; + }; + case TSDB_DATA_TYPE_DOUBLE: { + *(double*)point->val = + doLinearInterpolationImpl(*(double*)point1->val, *(double*)point2->val, point1->key, point2->key, point->key); + break; + }; + case TSDB_DATA_TYPE_TIMESTAMP: + case TSDB_DATA_TYPE_BIGINT: { + *(double*)point->val = doLinearInterpolationImpl(*(int64_t*)point1->val, *(int64_t*)point2->val, point1->key, + point2->key, point->key); + break; + }; + case TSDB_DATA_TYPE_SMALLINT: { + *(double*)point->val = doLinearInterpolationImpl(*(int16_t*)point1->val, *(int16_t*)point2->val, point1->key, + point2->key, point->key); + break; + }; + case TSDB_DATA_TYPE_TINYINT: { + *(double*)point->val = + doLinearInterpolationImpl(*(int8_t*)point1->val, *(int8_t*)point2->val, point1->key, point2->key, point->key); + break; + }; + default: { + // TODO: Deal with interpolation with bool and strings and timestamp + return -1; + } + } + + return 0; +} + + static char* getPos(char* data, int32_t bytes, int32_t index) { return data + index * bytes; } static void setTagsValueInInterpolation(tFilePage** data, char** pTags, SColumnModel* pModel, int32_t order,