未验证 提交 a160b2dc 编写于 作者: P plum-lihui 提交者: GitHub

Merge pull request #1545 from taosdata/feature/liaohj

Feature/liaohj
...@@ -197,7 +197,7 @@ typedef struct SDataBlockList { ...@@ -197,7 +197,7 @@ typedef struct SDataBlockList {
typedef struct SQueryInfo { typedef struct SQueryInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately. int16_t command; // the command may be different for each subclause, so keep it seperately.
uint16_t type; // query/insert/import type uint16_t type; // query/insert/import type
char intervalTimeUnit; char slidingTimeUnit;
int64_t etime, stime; int64_t etime, stime;
int64_t intervalTime; // aggregation time interval int64_t intervalTime; // aggregation time interval
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "thistogram.h" #include "thistogram.h"
#include "tinterpolation.h" #include "tinterpolation.h"
#include "tlog.h" #include "tlog.h"
#include "tpercentile.h"
#include "tscJoinProcess.h" #include "tscJoinProcess.h"
#include "tscSyntaxtreefunction.h" #include "tscSyntaxtreefunction.h"
#include "tscompression.h" #include "tscompression.h"
...@@ -27,7 +28,6 @@ ...@@ -27,7 +28,6 @@
#include "ttime.h" #include "ttime.h"
#include "ttypes.h" #include "ttypes.h"
#include "tutil.h" #include "tutil.h"
#include "tpercentile.h"
#define GET_INPUT_CHAR(x) (((char *)((x)->aInputElemBuf)) + ((x)->startOffset) * ((x)->inputBytes)) #define GET_INPUT_CHAR(x) (((char *)((x)->aInputElemBuf)) + ((x)->startOffset) * ((x)->inputBytes))
#define GET_INPUT_CHAR_INDEX(x, y) (GET_INPUT_CHAR(x) + (y) * (x)->inputBytes) #define GET_INPUT_CHAR_INDEX(x, y) (GET_INPUT_CHAR(x) + (y) * (x)->inputBytes)
...@@ -4104,8 +4104,6 @@ static void twa_function(SQLFunctionCtx *pCtx) { ...@@ -4104,8 +4104,6 @@ static void twa_function(SQLFunctionCtx *pCtx) {
if (pResInfo->superTableQ) { if (pResInfo->superTableQ) {
memcpy(pCtx->aOutputBuf, pInfo, sizeof(STwaInfo)); memcpy(pCtx->aOutputBuf, pInfo, sizeof(STwaInfo));
} }
// pCtx->numOfIteratedElems += notNullElems;
} }
static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) { static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) {
...@@ -4138,7 +4136,6 @@ static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) { ...@@ -4138,7 +4136,6 @@ static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) {
pInfo->lastKey = primaryKey[index]; pInfo->lastKey = primaryKey[index];
setTWALastVal(pCtx, pData, 0, pInfo); setTWALastVal(pCtx, pData, 0, pInfo);
// pCtx->numOfIteratedElems += 1;
pResInfo->hasResult = DATA_SET_FLAG; pResInfo->hasResult = DATA_SET_FLAG;
if (pResInfo->superTableQ) { if (pResInfo->superTableQ) {
...@@ -4403,10 +4400,8 @@ static double do_calc_rate(const SRateInfo* pRateInfo) { ...@@ -4403,10 +4400,8 @@ static double do_calc_rate(const SRateInfo* pRateInfo) {
} }
} }
int64_t duration = pRateInfo->lastKey - pRateInfo->firstKey; double duration = (pRateInfo->lastKey - pRateInfo->firstKey) / 1000.0;
duration = (duration + 500) / 1000; double resultVal = diff / duration;
double resultVal = ((double)diff) / duration;
pTrace("do_calc_rate() isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%f lastValue:%f CorrectionValue:%f resultVal:%f", pTrace("do_calc_rate() isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%f lastValue:%f CorrectionValue:%f resultVal:%f",
pRateInfo->isIRate, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, resultVal); pRateInfo->isIRate, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, resultVal);
...@@ -4447,62 +4442,156 @@ static void rate_function(SQLFunctionCtx *pCtx) { ...@@ -4447,62 +4442,156 @@ static void rate_function(SQLFunctionCtx *pCtx) {
TSKEY *primaryKey = pCtx->ptsList; TSKEY *primaryKey = pCtx->ptsList;
pTrace("%p rate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull); pTrace("%p rate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull);
for (int32_t i = 0; i < pCtx->size; ++i) { if (pCtx->order == TSQL_SO_ASC) {
char *pData = GET_INPUT_CHAR_INDEX(pCtx, i); // prev interpolation exists
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { if (pCtx->prev.key != -1) {
pTrace("%p rate_function() index of null data:%d", pCtx, i); pRateInfo->firstValue = pCtx->prev.data;
continue; pRateInfo->firstKey = pCtx->prev.key;
pCtx->prev.key = -1; // clear the flag
} }
notNullElems++; for (int32_t i = 0; i < pCtx->size; ++i) {
char *pData = GET_INPUT_CHAR_INDEX(pCtx, i);
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
pTrace("%p rate_function() index of null data:%d", pCtx, i);
continue;
}
double v = 0; notNullElems++;
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_TINYINT: double v = 0;
v = (double)GET_INT8_VAL(pData); switch (pCtx->inputType) {
break; case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT: v = (double)GET_INT8_VAL(pData);
v = (double)GET_INT16_VAL(pData); break;
break; case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT: v = (double)GET_INT16_VAL(pData);
v = (double)GET_INT32_VAL(pData); break;
break; case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT: v = (double)GET_INT32_VAL(pData);
v = (double)GET_INT64_VAL(pData); break;
break; case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_FLOAT: v = (double)GET_INT64_VAL(pData);
v = (double)GET_FLOAT_VAL(pData); break;
break; case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: v = (double)GET_FLOAT_VAL(pData);
v = (double)GET_DOUBLE_VAL(pData); break;
break; case TSDB_DATA_TYPE_DOUBLE:
default: v = (double)GET_DOUBLE_VAL(pData);
assert(0); break;
default:
assert(0);
}
if ((-DBL_MAX == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) {
pRateInfo->firstValue = v;
pRateInfo->firstKey = primaryKey[i];
pTrace("firstValue:%f firstKey:%" PRId64, pRateInfo->firstValue, pRateInfo->firstKey);
}
if (-DBL_MAX == pRateInfo->lastValue) {
pRateInfo->lastValue = v;
} else if (v < pRateInfo->lastValue) {
pRateInfo->CorrectionValue += pRateInfo->lastValue;
pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue);
}
pRateInfo->lastValue = v;
pRateInfo->lastKey = primaryKey[i];
pTrace("lastValue:%f lastKey:%" PRId64, pRateInfo->lastValue, pRateInfo->lastKey);
} }
if ((-DBL_MAX == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) { if (!pCtx->hasNull) {
pRateInfo->firstValue = v; assert(pCtx->size == notNullElems);
pRateInfo->firstKey = primaryKey[i]; }
if (pCtx->next.key != -1) {
if (pCtx->next.data < pRateInfo->lastValue) {
pRateInfo->CorrectionValue += pRateInfo->lastValue;
pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue);
}
pRateInfo->lastValue = pCtx->next.data;
pRateInfo->lastKey = pCtx->next.key;
pCtx->next.key = -1;
}
} else {
if (pCtx->next.key != -1) {
pRateInfo->lastValue = pCtx->next.data;
pRateInfo->lastKey = pCtx->next.key;
pCtx->next.key = -1;
}
for (int32_t i = pCtx->size - 1; i >= 0; --i) {
char *pData = GET_INPUT_CHAR_INDEX(pCtx, i);
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
pTrace("%p rate_function() index of null data:%d", pCtx, i);
continue;
}
notNullElems++;
double v = 0;
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_TINYINT:
v = (double)GET_INT8_VAL(pData);
break;
case TSDB_DATA_TYPE_SMALLINT:
v = (double)GET_INT16_VAL(pData);
break;
case TSDB_DATA_TYPE_INT:
v = (double)GET_INT32_VAL(pData);
break;
case TSDB_DATA_TYPE_BIGINT:
v = (double)GET_INT64_VAL(pData);
break;
case TSDB_DATA_TYPE_FLOAT:
v = (double)GET_FLOAT_VAL(pData);
break;
case TSDB_DATA_TYPE_DOUBLE:
v = (double)GET_DOUBLE_VAL(pData);
break;
default:
assert(0);
}
if ((-DBL_MAX == pRateInfo->lastValue) || (INT64_MIN == pRateInfo->lastKey)) {
pRateInfo->lastValue = v;
pRateInfo->lastKey = primaryKey[i];
pTrace("firstValue:%f firstKey:%" PRId64, pRateInfo->lastValue, pRateInfo->lastKey);
}
if (-DBL_MAX == pRateInfo->firstValue) {
pRateInfo->firstValue = v;
} else if (v > pRateInfo->firstValue) {
pRateInfo->CorrectionValue += pRateInfo->firstValue;
pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue);
}
pRateInfo->firstValue = v;
pRateInfo->firstKey = primaryKey[i];
pTrace("firstValue:%f firstKey:%" PRId64, pRateInfo->firstValue, pRateInfo->firstKey); pTrace("firstValue:%f firstKey:%" PRId64, pRateInfo->firstValue, pRateInfo->firstKey);
} }
if (-DBL_MAX == pRateInfo->lastValue) { if (!pCtx->hasNull) {
pRateInfo->lastValue = v; assert(pCtx->size == notNullElems);
} else if (v < pRateInfo->lastValue) {
pRateInfo->CorrectionValue += pRateInfo->lastValue;
pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue);
} }
pRateInfo->lastValue = v; if (pCtx->prev.key != -1) {
pRateInfo->lastKey = primaryKey[i]; if (pCtx->prev.data > pRateInfo->firstValue) {
pTrace("lastValue:%f lastKey:%" PRId64, pRateInfo->lastValue, pRateInfo->lastKey); pRateInfo->CorrectionValue += pRateInfo->firstValue;
} pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue);
}
if (!pCtx->hasNull) {
assert(pCtx->size == notNullElems); pRateInfo->firstValue = pCtx->prev.data;
} pRateInfo->firstKey = pCtx->prev.key;
pCtx->prev.key = -1;
}
};
SET_VAL(pCtx, notNullElems, 1); SET_VAL(pCtx, notNullElems, 1);
......
...@@ -598,9 +598,6 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { ...@@ -598,9 +598,6 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
pQueryInfo->intervalTime = pQueryInfo->intervalTime / 1000; pQueryInfo->intervalTime = pQueryInfo->intervalTime / 1000;
} }
/* parser has filter the illegal type, no need to check here */
pQueryInfo->intervalTimeUnit = pQuerySql->interval.z[pQuerySql->interval.n - 1];
// interval cannot be less than 10 milliseconds // interval cannot be less than 10 milliseconds
if (pQueryInfo->intervalTime < tsMinIntervalTime) { if (pQueryInfo->intervalTime < tsMinIntervalTime) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2); return invalidSqlErrMsg(pQueryInfo->msg, msg2);
...@@ -689,10 +686,15 @@ int32_t parseSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { ...@@ -689,10 +686,15 @@ int32_t parseSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
if (pQueryInfo->slidingTime > pQueryInfo->intervalTime) { if (pQueryInfo->slidingTime > pQueryInfo->intervalTime) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1); return invalidSqlErrMsg(pQueryInfo->msg, msg1);
} }
pQueryInfo->slidingTimeUnit = pQuerySql->sliding.z[pQuerySql->sliding.n - 1];
} else { } else {
pQueryInfo->slidingTime = pQueryInfo->intervalTime; pQueryInfo->slidingTime = pQueryInfo->intervalTime;
// parser has filter the illegal type, no need to check here
pQueryInfo->slidingTimeUnit = pQuerySql->interval.z[pQuerySql->interval.n - 1];
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1636,13 +1638,16 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt ...@@ -1636,13 +1638,16 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
// set the first column ts for diff query // set the first column ts for diff query
if (optr == TK_DIFF) { if (optr == TK_DIFF) {
colIdx += 1; colIdx += 1;
SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0}; SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP,
TSDB_KEYSIZE, TSDB_KEYSIZE); TSDB_KEYSIZE, TSDB_KEYSIZE);
SColumnList ids = getColumnList(1, 0, 0); SColumnList ids = getColumnList(1, 0, 0);
insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].aName, insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].aName,
pExpr); pExpr);
} else if (optr == TK_RATE) {
SColumnIndex index1 = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscColumnBaseInfoInsert(pQueryInfo, &index1);
} }
// functions can not be applied to tags // functions can not be applied to tags
......
...@@ -325,7 +325,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -325,7 +325,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime; int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t revisedSTime = int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec); taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec);
SInterpolationInfo *pInterpoInfo = &pReducer->interpolationInfo; SInterpolationInfo *pInterpoInfo = &pReducer->interpolationInfo;
taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
...@@ -800,7 +800,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo ...@@ -800,7 +800,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime; int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t revisedSTime = int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec); taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec);
taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
pLocalReducer->rowSize); pLocalReducer->rowSize);
...@@ -944,7 +944,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo ...@@ -944,7 +944,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
while (1) { while (1) {
int32_t remains = taosNumOfRemainPoints(pInterpoInfo); int32_t remains = taosNumOfRemainPoints(pInterpoInfo);
TSKEY etime = taosGetRevisedEndKey(actualETime, pQueryInfo->order.order, pQueryInfo->intervalTime, TSKEY etime = taosGetRevisedEndKey(actualETime, pQueryInfo->order.order, pQueryInfo->intervalTime,
pQueryInfo->intervalTimeUnit, precision); pQueryInfo->slidingTimeUnit, precision);
int32_t nrows = taosGetNumOfResultWithInterpo(pInterpoInfo, pPrimaryKeys, remains, pQueryInfo->intervalTime, etime, int32_t nrows = taosGetNumOfResultWithInterpo(pInterpoInfo, pPrimaryKeys, remains, pQueryInfo->intervalTime, etime,
pLocalReducer->resColModel->capacity); pLocalReducer->resColModel->capacity);
...@@ -1296,7 +1296,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer ...@@ -1296,7 +1296,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) { if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime; int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t newTime = int64_t newTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, precision); taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision);
taosInitInterpoInfo(&pLocalReducer->interpolationInfo, pQueryInfo->order.order, newTime, taosInitInterpoInfo(&pLocalReducer->interpolationInfo, pQueryInfo->order.order, newTime,
pQueryInfo->groupbyExpr.numOfGroupCols, pLocalReducer->rowSize); pQueryInfo->groupbyExpr.numOfGroupCols, pLocalReducer->rowSize);
...@@ -1326,7 +1326,7 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) { ...@@ -1326,7 +1326,7 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) {
int32_t remain = taosNumOfRemainPoints(pInterpoInfo); int32_t remain = taosNumOfRemainPoints(pInterpoInfo);
TSKEY ekey = TSKEY ekey =
taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, p); taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, p);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pLocalReducer->pBufForInterpo, remain, int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pLocalReducer->pBufForInterpo, remain,
pQueryInfo->intervalTime, ekey, pLocalReducer->resColModel->capacity); pQueryInfo->intervalTime, ekey, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo if (rows > 0) { // do interpo
...@@ -1359,7 +1359,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { ...@@ -1359,7 +1359,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
int64_t etime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->etime : pQueryInfo->stime; int64_t etime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->etime : pQueryInfo->stime;
etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime,
pQueryInfo->intervalTimeUnit, precision); pQueryInfo->slidingTimeUnit, precision);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, NULL, 0, pQueryInfo->intervalTime, etime, int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, NULL, 0, pQueryInfo->intervalTime, etime,
pLocalReducer->resColModel->capacity); pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo if (rows > 0) { // do interpo
......
...@@ -1681,7 +1681,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1681,7 +1681,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime); pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime);
pQueryMsg->intervalTimeUnit = pQueryInfo->intervalTimeUnit; pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime); pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime);
if (pQueryInfo->intervalTime < 0) { if (pQueryInfo->intervalTime < 0) {
......
Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766
...@@ -513,7 +513,7 @@ typedef struct { ...@@ -513,7 +513,7 @@ typedef struct {
int16_t orderColId; int16_t orderColId;
int16_t numOfCols; // the number of columns will be load from vnode int16_t numOfCols; // the number of columns will be load from vnode
char intervalTimeUnit; // time interval type, for revisement of interval(1d) char slidingTimeUnit; // time interval type, for revisement of interval(1d)
int64_t intervalTime; // time interval for aggregation, in million second int64_t intervalTime; // time interval for aggregation, in million second
int64_t slidingTime; // value for sliding window int64_t slidingTime; // value for sliding window
......
...@@ -30,7 +30,7 @@ typedef struct SInterpolationInfo { ...@@ -30,7 +30,7 @@ typedef struct SInterpolationInfo {
char * prevValues; // previous row of data char * prevValues; // previous row of data
char * nextValues; // next row of data char * nextValues; // next row of data
int32_t numOfTags; int32_t numOfTags;
char ** pTags; // tags value for current interoplation char ** pTags; // tags value for current interpolation
} SInterpolationInfo; } SInterpolationInfo;
typedef struct SPoint { typedef struct SPoint {
...@@ -83,6 +83,8 @@ int32_t taosDoInterpoResult(SInterpolationInfo *pInterpoInfo, int16_t interpoTyp ...@@ -83,6 +83,8 @@ int32_t taosDoInterpoResult(SInterpolationInfo *pInterpoInfo, int16_t interpoTyp
int taosDoLinearInterpolation(int32_t type, SPoint *point1, SPoint *point2, SPoint *point); int taosDoLinearInterpolation(int32_t type, SPoint *point1, SPoint *point2, SPoint *point);
int taosDoLinearInterpolationD(int32_t type, SPoint* point1, SPoint* point2, SPoint* point);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -167,6 +167,11 @@ typedef struct SExtTagsInfo { ...@@ -167,6 +167,11 @@ typedef struct SExtTagsInfo {
struct SQLFunctionCtx **pTagCtxList; struct SQLFunctionCtx **pTagCtxList;
} SExtTagsInfo; } SExtTagsInfo;
typedef struct SBoundaryData {
TSKEY key;
double data;
} SBoundaryData;
// sql function runtime context // sql function runtime context
typedef struct SQLFunctionCtx { typedef struct SQLFunctionCtx {
int32_t startOffset; int32_t startOffset;
...@@ -195,6 +200,8 @@ typedef struct SQLFunctionCtx { ...@@ -195,6 +200,8 @@ typedef struct SQLFunctionCtx {
SResultInfo *resultInfo; SResultInfo *resultInfo;
SExtTagsInfo tagInfo; SExtTagsInfo tagInfo;
SBoundaryData prev; // this value may be less or equalled to the start time of time window
SBoundaryData next; // this value may be greater or equalled to the end time of time window
} SQLFunctionCtx; } SQLFunctionCtx;
typedef struct SQLAggFuncElem { typedef struct SQLAggFuncElem {
......
...@@ -261,7 +261,7 @@ typedef struct SQuery { ...@@ -261,7 +261,7 @@ typedef struct SQuery {
TSKEY ekey; TSKEY ekey;
int64_t intervalTime; int64_t intervalTime;
int64_t slidingTime; // sliding time for sliding window query int64_t slidingTime; // sliding time for sliding window query
char intervalTimeUnit; // interval data type, used for daytime revise char slidingTimeUnit; // interval data type, used for daytime revise
int8_t precision; int8_t precision;
int16_t numOfOutputCols; int16_t numOfOutputCols;
int16_t interpoType; int16_t interpoType;
......
...@@ -85,12 +85,6 @@ typedef enum { ...@@ -85,12 +85,6 @@ typedef enum {
QUERY_NO_DATA_TO_CHECK = 0x8u, QUERY_NO_DATA_TO_CHECK = 0x8u,
} vnodeQueryStatus; } vnodeQueryStatus;
typedef struct SPointInterpoSupporter {
int32_t numOfCols;
char** pPrevPoint;
char** pNextPoint;
} SPointInterpoSupporter;
typedef struct SBlockInfo { typedef struct SBlockInfo {
TSKEY keyFirst; TSKEY keyFirst;
TSKEY keyLast; TSKEY keyLast;
...@@ -285,6 +279,7 @@ void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv); ...@@ -285,6 +279,7 @@ void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv);
int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo); int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo);
void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot); void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
void closeAllTimeWindow(SWindowResInfo* pWindowResInfo); void closeAllTimeWindow(SWindowResInfo* pWindowResInfo);
SWindowResult* getWindowRes(SWindowResInfo* pWindowResInfo, size_t index);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -141,6 +141,12 @@ typedef struct SWindowResInfo { ...@@ -141,6 +141,12 @@ typedef struct SWindowResInfo {
int64_t threshold; // threshold for return completed results. int64_t threshold; // threshold for return completed results.
} SWindowResInfo; } SWindowResInfo;
typedef struct SPointInterpoSupporter {
int32_t numOfCols;
char** pPrevPoint;
char** pNextPoint;
} SPointInterpoSupporter;
typedef struct SQueryRuntimeEnv { typedef struct SQueryRuntimeEnv {
SPositionInfo startPos; /* the start position, used for secondary/third iteration */ SPositionInfo startPos; /* the start position, used for secondary/third iteration */
SPositionInfo endPos; /* the last access position in query, served as the start pos of reversed order query */ SPositionInfo endPos; /* the last access position in query, served as the start pos of reversed order query */
...@@ -172,6 +178,10 @@ typedef struct SQueryRuntimeEnv { ...@@ -172,6 +178,10 @@ typedef struct SQueryRuntimeEnv {
bool stableQuery; // is super table query or not bool stableQuery; // is super table query or not
SQueryDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SQueryDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
bool hasTimeWindow;
char** lastRowInBlock;
bool interpoSearch;
/* /*
* Temporarily hold the in-memory cache block info during scan cache blocks * Temporarily hold the in-memory cache block info during scan cache blocks
* Here we do not use the cache block info from pMeterObj, simple because it may change anytime * Here we do not use the cache block info from pMeterObj, simple because it may change anytime
......
...@@ -80,6 +80,17 @@ static int32_t getGroupResultId(int32_t groupIndex) { ...@@ -80,6 +80,17 @@ static int32_t getGroupResultId(int32_t groupIndex) {
return base + (groupIndex * 10000); return base + (groupIndex * 10000);
} }
static bool needsBoundaryTS(SQuery *pQuery) {
for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
if (functionId == TSDB_FUNC_RATE) {
return true;
}
}
return false;
}
static FORCE_INLINE bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; } static FORCE_INLINE bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
// check the offset value integrity // check the offset value integrity
...@@ -579,9 +590,9 @@ bool doRevisedResultsByLimit(SQInfo *pQInfo) { ...@@ -579,9 +590,9 @@ bool doRevisedResultsByLimit(SQInfo *pQInfo) {
return false; return false;
} }
static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t StartQueryTimestamp, void *inputData, static void setExecParams(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int64_t StartQueryTimestamp, void *inputData,
char *primaryColumnData, int32_t size, int32_t functionId, SField *pField, bool hasNull, char *primaryColumnData, int32_t size, int32_t functionId, SField *pField, bool hasNull,
int32_t blockStatus, void *param, int32_t scanFlag); void *param);
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo); void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo);
...@@ -1518,14 +1529,6 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t ...@@ -1518,14 +1529,6 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
w.ekey = w.skey + pQuery->intervalTime - 1; w.ekey = w.skey + pQuery->intervalTime - 1;
} }
/*
* query border check, skey should not be bounded by the query time range, since the value skey will
* be used as the time window index value. So we only change ekey of time window accordingly.
*/
if (w.ekey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) {
w.ekey = pQuery->ekey;
}
assert(ts >= w.skey && ts <= w.ekey && w.skey != 0); assert(ts >= w.skey && ts <= w.ekey && w.skey != 0);
return w; return w;
...@@ -1651,8 +1654,12 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, ...@@ -1651,8 +1654,12 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
continue; continue;
} }
if ((pResult->window.ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) || /*
(pResult->window.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) { * when the ekey equals to lastKey of current block, do NOT close it, since the interpolation may
* be involved.
*/
if ((pResult->window.ekey < lastKey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pResult->window.skey > lastKey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeTimeWindow(pWindowResInfo, i); closeTimeWindow(pWindowResInfo, i);
} else { } else {
skey = pResult->window.skey; skey = pResult->window.skey;
...@@ -1742,7 +1749,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat ...@@ -1742,7 +1749,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat
pCtx[k].size = forwardStep; pCtx[k].size = forwardStep;
pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? startPos : startPos - (forwardStep - 1); pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? startPos : startPos - (forwardStep - 1);
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) { if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0 || functionId == TSDB_FUNC_RATE) {
pCtx[k].ptsList = (TSKEY *)((char*)pRuntimeEnv->primaryColBuffer->data + pCtx[k].startOffset * TSDB_KEYSIZE); pCtx[k].ptsList = (TSKEY *)((char*)pRuntimeEnv->primaryColBuffer->data + pCtx[k].startOffset * TSDB_KEYSIZE);
} }
...@@ -1829,6 +1836,31 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow ...@@ -1829,6 +1836,31 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
} }
} }
static void handleInBlockEkeyInterpolation(SQueryRuntimeEnv* pRuntimeEnv, int32_t endPos,
const TSKEY* primaryKeyCol, STimeWindow* win, SQLFunctionCtx* pCtx) {
// this query time window ended in the current data block
SQuery* pQuery = pRuntimeEnv->pQuery;
TSKEY lastKey = primaryKeyCol[endPos];
TSKEY e = win->skey + pQuery->intervalTime;
TSKEY next = primaryKeyCol[endPos + 1];
// the next key is beyond the query time range
if ((next > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (next > pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))) {
pCtx->next.key = -1;
return;
}
pCtx->next.key = e;
char *d = pCtx->aInputElemBuf + pCtx->inputBytes * endPos;
SPoint point1 = (SPoint){.key = lastKey, .val = d};
SPoint point2 = (SPoint){.key = next, .val = (d + pCtx->inputBytes)};
SPoint point = (SPoint){.key = pCtx->next.key, .val = &pCtx->next.data};
taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point);
}
static TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) { static TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) {
TSKEY ekey = -1; TSKEY ekey = -1;
if (QUERY_IS_ASC_QUERY(pQuery)) { if (QUERY_IS_ASC_QUERY(pQuery)) {
...@@ -1846,6 +1878,255 @@ static TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) { ...@@ -1846,6 +1878,255 @@ static TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) {
return ekey; return ekey;
} }
static void interpolateEndKeyValue(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* pBlockInfo, STimeWindow* win,
int32_t endPos, SQLFunctionCtx* pCtx, int32_t index) {
SQuery* pQuery = pRuntimeEnv->pQuery;
TSKEY *primaryKeyCol = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
// if current query window beyonds the whole query window, do not employ the interpolation
if ((win->ekey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(win->ekey >= pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))) {
pCtx->next.key = -1;
return;
}
if (!QUERY_IS_ASC_QUERY(pQuery) && win->skey >= pBlockInfo->keyLast) {
pCtx->next.key = -1;
return;
}
if (QUERY_IS_ASC_QUERY(pQuery)) {
/*
* the time window closed before current data block, use the interpolation to generate
* the final result part, endPos equals to -1 means that this time window ends before current data block.
*/
if (win->ekey < pBlockInfo->keyFirst) {
assert(endPos == -1);
TSKEY prev = *(int64_t*) pRuntimeEnv->lastRowInBlock[0];
TSKEY next = pBlockInfo->keyFirst;
pCtx->next.key = win->skey + pQuery->intervalTime;
char *d = pCtx->aInputElemBuf;
SPoint point1 = (SPoint){.key = prev, .val = pRuntimeEnv->lastRowInBlock[index]};
SPoint point2 = (SPoint){.key = next, .val = d};
SPoint point = (SPoint){.key = pCtx->next.key, .val = &pCtx->next.data};
taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point);
} else if (win->ekey < pBlockInfo->keyLast) {
handleInBlockEkeyInterpolation(pRuntimeEnv, endPos, primaryKeyCol, win, pCtx);
} else {
//do nothing now, the interpolation will be handled before processing the next data block
assert(win->ekey >= pBlockInfo->keyLast);
pCtx->next.key = -1;
}
} else { // desc order query
//the time window closed before current data block, use the interpolation to generate the final result part.
if (win->ekey >= pBlockInfo->keyLast) {
TSKEY prev = pBlockInfo->keyLast;
TSKEY next = *(TSKEY*) pRuntimeEnv->lastRowInBlock[0];
pCtx->next.key = win->skey + pQuery->intervalTime;
char *d = pCtx->aInputElemBuf + (pBlockInfo->size - 1) * pCtx->inputBytes;
SPoint point1 = (SPoint){.key = prev, .val = d};
SPoint point2 = (SPoint){.key = next, .val = pRuntimeEnv->lastRowInBlock[index]};
SPoint point = (SPoint){.key = pCtx->next.key, .val = &pCtx->next.data};
taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point);
} else if (win->ekey < pBlockInfo->keyLast) {
handleInBlockEkeyInterpolation(pRuntimeEnv, endPos, primaryKeyCol, win, pCtx);
} else {
pCtx->next.key = -1;
}
}
}
static void handleInBlockSkeyInterpolation (SQueryRuntimeEnv* pRuntimeEnv, int32_t startPos,
const TSKEY* primaryKeyCol, STimeWindow* win, SQLFunctionCtx* pCtx) {
assert(startPos > 0);
SQuery* pQuery = pRuntimeEnv->pQuery;
TSKEY prev = primaryKeyCol[startPos - 1];
TSKEY next = primaryKeyCol[startPos];
if (!QUERY_IS_ASC_QUERY(pQuery) && prev < pQuery->ekey) {
pCtx->prev.key = -1;
return;
}
pCtx->prev.key = win->skey;
char *d = pCtx->aInputElemBuf + pCtx->inputBytes * (startPos - 1);
SPoint point1 = (SPoint){.key = prev, .val = d};
SPoint point2 = (SPoint){.key = next, .val = (d + pCtx->inputBytes)};
SPoint point = (SPoint){.key = pCtx->prev.key, .val = &pCtx->prev.data};
taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point);
}
static void interpolateStartKeyValue(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* pBlockInfo, SWindowResInfo* pWindowResInfo,
STimeWindow* win, int32_t startPos, SQLFunctionCtx* pCtx, int32_t index) {
SQuery* pQuery = pRuntimeEnv->pQuery;
TSKEY *primaryKeyCol = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
TSKEY skey = primaryKeyCol[startPos];
/*
* no need the start time interpolation
* 1. current window is the first window in either ascending or descending order output
* 2. time window start exactly from a timestamp with data
*/
if (skey == win->skey || win->skey < pWindowResInfo->startTime ||
(win->skey <= pQuery->skey && QUERY_IS_ASC_QUERY(pQuery)) ||
(win->skey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
pCtx->prev.key = -1;
return;
}
if (QUERY_IS_ASC_QUERY(pQuery)) {
// the queried time window and time window of data block must be intersect
// assert(win->ekey >= pBlockInfo->keyFirst && win->skey <= pBlockInfo->keyLast);
/*
* this win should not be the first time window that starts from a less timestamp than
* the skey of current data block
*/
if (win->skey < pBlockInfo->keyFirst) {
TSKEY prev = *(TSKEY*) pRuntimeEnv->lastRowInBlock[0];
TSKEY next = pBlockInfo->keyFirst;
pCtx->prev.key = win->skey;
char *d = pCtx->aInputElemBuf;
SPoint point1 = (SPoint){.key = prev, .val = pRuntimeEnv->lastRowInBlock[index]};
SPoint point2 = (SPoint){.key = next, .val = d};
SPoint point = (SPoint){.key = pCtx->prev.key, .val = &pCtx->prev.data};
taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point);
} else {
handleInBlockSkeyInterpolation(pRuntimeEnv, startPos, primaryKeyCol, win, pCtx);
}
} else { // desc order
if (win->skey > pBlockInfo->keyLast) {
//this pBlockInfo located before current time window
TSKEY prev = pBlockInfo->keyLast;
TSKEY next = *(TSKEY*) pRuntimeEnv->lastRowInBlock[0];
pCtx->prev.key = win->skey;
char *d = pCtx->aInputElemBuf + (pBlockInfo->size - 1) * pCtx->inputBytes;
SPoint point1 = (SPoint){.key = prev, .val = d};
SPoint point2 = (SPoint){.key = next, .val = pRuntimeEnv->lastRowInBlock[index]};
SPoint point = (SPoint){.key = pCtx->prev.key, .val = &pCtx->prev.data};
taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point);
} else {
// the queried time window and time window of data block must be intersect
assert(win->ekey >= pBlockInfo->keyFirst && win->skey <= pBlockInfo->keyLast);
if (win->skey >= pBlockInfo->keyFirst) {
// the pBlockInfo is intersected with query time window
handleInBlockSkeyInterpolation(pRuntimeEnv, startPos, primaryKeyCol, win, pCtx);
} else {
assert(win->skey < pBlockInfo->keyFirst && win->ekey >= pBlockInfo->keyFirst);
pCtx->prev.key = -1;
}
}
}
}
static void doSetInterpolationDataForTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo *pWindowResInfo,
SBlockInfo* pBlockInfo, STimeWindow* win, int32_t startPos, int32_t forwardStep) {
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (!pRuntimeEnv->interpoSearch) {
return;
}
int32_t s = startPos;
int32_t e = forwardStep * step + startPos - step;
if (!QUERY_IS_ASC_QUERY(pQuery)) {
SWAP(s, e, int32_t);
}
// interpolate for skey value
for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
if (pQuery->pSelectExpr[i].pBase.functionId != TSDB_FUNC_RATE) {
continue;
}
SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo;
interpolateStartKeyValue(pRuntimeEnv, pBlockInfo, pWindowResInfo, win, s, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf);
}
// interpolate for ekey value
for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
if (pQuery->pSelectExpr[i].pBase.functionId != TSDB_FUNC_RATE) {
continue;
}
SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo;
interpolateEndKeyValue(pRuntimeEnv, pBlockInfo, win, e, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf);
}
}
static void doInterpolatePrevTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo, SBlockInfo* pBlockInfo,
TSKEY ts, int32_t offset, STimeWindow* win) {
// get current not closed time window
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t slot = pWindowResInfo->curIndex;
if (slot == -1 || !pRuntimeEnv->interpoSearch) {
return;
}
while (slot < pWindowResInfo->size) {
STimeWindow w = getWindowResult(pWindowResInfo, slot)->window;
if (w.skey == win->skey) {
assert(w.ekey == win->ekey);
break;
}
// do not check for the closed time window
SWindowResult* pWindowRes = getWindowRes(pWindowResInfo, slot);
if (pWindowRes->status.closed) {
slot += 1;
continue;
}
// if current active window locates before current data block, do interpolate the result and close it
assert((w.skey < win->skey && w.ekey < ts && QUERY_IS_ASC_QUERY(pQuery)) ||
(w.skey > win->skey && w.skey > ts && !QUERY_IS_ASC_QUERY(pQuery)));
int32_t forwardStep = 0;
doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &w, offset, forwardStep);
// set correct output buffer for interplate result. todo handle error
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &w) != TSDB_CODE_SUCCESS) {
continue;
}
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, slot);
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &w, pQuery->pos, forwardStep);
closeTimeWindow(pWindowResInfo, slot);
// try next time window
slot += 1;
}
}
/** /**
* *
* @param pRuntimeEnv * @param pRuntimeEnv
...@@ -1872,7 +2153,6 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t ...@@ -1872,7 +2153,6 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
SField dummyField = {0}; SField dummyField = {0};
bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock); bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock);
char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, forwardStep); char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, forwardStep);
...@@ -1890,29 +2170,33 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t ...@@ -1890,29 +2170,33 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
} }
} }
setExecParams(pQuery, &pCtx[k], pQuery->skey, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField, setExecParams(pRuntimeEnv, &pCtx[k], pQuery->skey, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField,
hasNull, pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag); hasNull, &sasArray[k]);
} }
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (isIntervalQuery(pQuery)) { if (isIntervalQuery(pQuery)) {
int32_t offset = GET_COL_DATA_POS(pQuery, 0, step); int32_t offset = GET_COL_DATA_POS(pQuery, 0, step);
TSKEY ts = primaryKeyCol[offset]; TSKEY ts = primaryKeyCol[offset];
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
doInterpolatePrevTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, ts, offset, &win);
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win) != TSDB_CODE_SUCCESS) { if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win) != TSDB_CODE_SUCCESS) {
return 0; return 0;
} }
TSKEY ekey = reviseWindowEkey(pQuery, &win); TSKEY ekey = reviseWindowEkey(pQuery, &win);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, true); forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, true);
doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &win, offset, forwardStep);
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep); doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep);
int32_t index = pWindowResInfo->curIndex; int32_t index = pWindowResInfo->curIndex;
STimeWindow nextWin = win; STimeWindow nextWin = win;
while (1) { while (1) {
int32_t startPos = int32_t startPos =
getNextQualifiedWindow(pRuntimeEnv, &nextWin, pWindowResInfo, pBlockInfo, primaryKeyCol, searchFn); getNextQualifiedWindow(pRuntimeEnv, &nextWin, pWindowResInfo, pBlockInfo, primaryKeyCol, searchFn);
...@@ -1928,7 +2212,9 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t ...@@ -1928,7 +2212,9 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
ekey = reviseWindowEkey(pQuery, &nextWin); ekey = reviseWindowEkey(pQuery, &nextWin);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, startPos, ekey, searchFn, true); forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, startPos, ekey, searchFn, true);
doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &nextWin, startPos, forwardStep);
pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep); doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep);
} }
...@@ -1942,6 +2228,9 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t ...@@ -1942,6 +2228,9 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
*/ */
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
pCtx[k].next.key = -1;
pCtx[k].prev.key = -1;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]); aAggs[functionId].xFunction(&pCtx[k]);
} }
...@@ -1956,6 +2245,14 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t ...@@ -1956,6 +2245,14 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
if (!isIntervalQuery(pQuery)) { if (!isIntervalQuery(pQuery)) {
num = getNumOfResult(pRuntimeEnv) - prevNumOfRes; num = getNumOfResult(pRuntimeEnv) - prevNumOfRes;
} }
// save the last row in current data block
for(int32_t i = 0; i < pQuery->numOfCols; ++i) {
SColumnInfo* pColInfo = &pQuery->colList[i].data;
int32_t s = (QUERY_IS_ASC_QUERY(pQuery))? pColInfo->bytes * (pBlockInfo->size - 1) : 0;
memcpy(pRuntimeEnv->lastRowInBlock[i], pRuntimeEnv->colDataBuffer[i]->data + s, pColInfo->bytes);
}
tfree(sasArray); tfree(sasArray);
return (int32_t)num; return (int32_t)num;
...@@ -2166,6 +2463,31 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) { ...@@ -2166,6 +2463,31 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) {
} }
} }
SWindowResult* getWindowRes(SWindowResInfo* pWindowResInfo, size_t index) {
assert(index < pWindowResInfo->size);
return &pWindowResInfo->pResult[index];
}
/*
* remove the results that are not the FIRST time window that spreads beyond the
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time
*/
void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order) {
assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size);
int32_t i = 0;
while(i < pWindowResInfo->size &&
((pWindowResInfo->pResult[i].window.ekey < lastKey && order == QUERY_ASC_FORWARD_STEP) ||
(pWindowResInfo->pResult[i].window.skey > lastKey && order == QUERY_DESC_FORWARD_STEP))) {
++i;
}
// assert(i < pWindowResInfo->size);
if (i < pWindowResInfo->size) {
pWindowResInfo->size = (i + 1);
}
}
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes) { static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes) {
if (isNull(pData, type)) { // ignore the null value if (isNull(pData, type)) { // ignore the null value
return -1; return -1;
...@@ -2311,10 +2633,9 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * ...@@ -2311,10 +2633,9 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock); bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock);
char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, *forwardStep); char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, *forwardStep);
TSKEY ts = pQuery->skey; // QUERY_IS_ASC_QUERY(pQuery) ? pRuntimeEnv->intervalWindow.skey : TSKEY ts = pQuery->skey;
// pRuntimeEnv->intervalWindow.ekey; setExecParams(pRuntimeEnv, &pCtx[k], ts, dataBlock, (char *)primaryKeyCol, (*forwardStep), functionId, pFields, hasNull,
setExecParams(pQuery, &pCtx[k], ts, dataBlock, (char *)primaryKeyCol, (*forwardStep), functionId, pFields, hasNull, &sasArray[k]);
pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag);
} }
// set the input column data // set the input column data
...@@ -2340,7 +2661,9 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * ...@@ -2340,7 +2661,9 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
int32_t j = 0; int32_t j = 0;
TSKEY lastKey = -1; TSKEY lastKey = -1;
int32_t lastIndex = -1;
bool firstAccessedPoint = true;
for (j = 0; j < (*forwardStep); ++j) { for (j = 0; j < (*forwardStep); ++j) {
int32_t offset = GET_COL_DATA_POS(pQuery, j, step); int32_t offset = GET_COL_DATA_POS(pQuery, j, step);
...@@ -2362,8 +2685,31 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * ...@@ -2362,8 +2685,31 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
// interval window query // interval window query
if (isIntervalQuery(pQuery)) { if (isIntervalQuery(pQuery)) {
// decide the time window according to the primary timestamp // decide the time window according to the primary timestamp
int64_t ts = primaryKeyCol[offset]; TSKEY ts = primaryKeyCol[offset];
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
// if (firstAccessedPoint) {
// doInterpolatePrevTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, ts, offset, &win);
// firstAccessedPoint = false;
// } else {
// int32_t index = pWindowResInfo->curIndex;
// STimeWindow w = getWindowResult(pWindowResInfo, index)->window;
//
// if (w.skey == win.skey) { // do nothing
// assert(w.ekey == win.ekey);
// } else {
// assert((w.skey < win.skey && w.ekey < ts && QUERY_IS_ASC_QUERY(pQuery)) ||
// (w.skey > win.skey && w.skey > ts && !QUERY_IS_ASC_QUERY(pQuery)));
//
// // set the endkey interpolation for the previous
// for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
// SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo;
//
// interpolateEndKeyValue(pRuntimeEnv, pBlockInfo, win, e, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf);
// }
//
// }
// }
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win); int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
...@@ -2377,6 +2723,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * ...@@ -2377,6 +2723,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &win, offset); doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &win, offset);
lastKey = ts; lastKey = ts;
lastIndex = j;
STimeWindow nextWin = win; STimeWindow nextWin = win;
int32_t index = pWindowResInfo->curIndex; int32_t index = pWindowResInfo->curIndex;
int32_t sid = pRuntimeEnv->pMeterObj->sid; int32_t sid = pRuntimeEnv->pMeterObj->sid;
...@@ -2421,6 +2769,9 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * ...@@ -2421,6 +2769,9 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
pCtx[k].next.key = -1;
pCtx[k].prev.key = -1;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset); aAggs[functionId].xFunctionF(&pCtx[k], offset);
} }
...@@ -2434,7 +2785,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * ...@@ -2434,7 +2785,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
break; break;
} }
} }
/* /*
* pointsOffset is the maximum available space in result buffer update the actual forward step for query that * pointsOffset is the maximum available space in result buffer update the actual forward step for query that
* requires checking buffer during loop * requires checking buffer during loop
...@@ -2445,6 +2796,15 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * ...@@ -2445,6 +2796,15 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
break; break;
} }
} }
// save the last accessed row of current data block for interpolation
int32_t index = GET_COL_DATA_POS(pQuery, lastIndex, step);
for(int32_t i = 0; i < pQuery->numOfCols; ++i) {
SColumnInfo* pColInfo = &pQuery->colList[i].data;
int32_t s = pColInfo->bytes * index;
memcpy(pRuntimeEnv->lastRowInBlock[i], pRuntimeEnv->colDataBuffer[i]->data + s, pColInfo->bytes);
}
free(sasArray); free(sasArray);
...@@ -2657,11 +3017,24 @@ int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeter ...@@ -2657,11 +3017,24 @@ int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeter
return fileIndex; return fileIndex;
} }
void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t startQueryTimestamp, void *inputData, static void getOneRowFromDataBlock(SQueryRuntimeEnv *pRuntimeEnv, char **dst, int32_t pos) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
int32_t bytes = pQuery->colList[i].data.bytes;
memcpy(dst[i], pRuntimeEnv->colDataBuffer[i]->data + pos * bytes, bytes);
}
}
void setExecParams(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int64_t startQueryTimestamp, void *inputData,
char *primaryColumnData, int32_t size, int32_t functionId, SField *pField, bool hasNull, char *primaryColumnData, int32_t size, int32_t functionId, SField *pField, bool hasNull,
int32_t blockStatus, void *param, int32_t scanFlag) { void *param) {
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? pQuery->pos : pQuery->pos - (size - 1); int32_t startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? pQuery->pos : pQuery->pos - (size - 1);
int32_t scanFlag = pRuntimeEnv->scanFlag;
int32_t blockStatus = pRuntimeEnv->blockStatus;
pCtx->nStartQueryTimestamp = startQueryTimestamp; pCtx->nStartQueryTimestamp = startQueryTimestamp;
pCtx->scanFlag = scanFlag; pCtx->scanFlag = scanFlag;
...@@ -2913,6 +3286,12 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2913,6 +3286,12 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tfree(pRuntimeEnv->pInterpoBuf); tfree(pRuntimeEnv->pInterpoBuf);
} }
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
tfree(pRuntimeEnv->lastRowInBlock[i]);
}
tfree(pRuntimeEnv->lastRowInBlock);
destroyDiskbasedResultBuf(pRuntimeEnv->pResultBuf); destroyDiskbasedResultBuf(pRuntimeEnv->pResultBuf);
pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf);
} }
...@@ -2924,6 +3303,7 @@ static int64_t getOldestKey(int32_t numOfFiles, int64_t fileId, SVnodeCfg *pCfg) ...@@ -2924,6 +3303,7 @@ static int64_t getOldestKey(int32_t numOfFiles, int64_t fileId, SVnodeCfg *pCfg)
} }
bool isQueryKilled(SQuery *pQuery) { bool isQueryKilled(SQuery *pQuery) {
return false;
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
/* /*
...@@ -3324,7 +3704,7 @@ void vnodeCheckIfDataExists(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, ...@@ -3324,7 +3704,7 @@ void vnodeCheckIfDataExists(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj,
void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t pKey, int64_t keyFirst, int64_t keyLast, void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t pKey, int64_t keyFirst, int64_t keyLast,
int64_t *actualSkey, int64_t *actualEkey, int64_t *skey, int64_t *ekey) { int64_t *actualSkey, int64_t *actualEkey, int64_t *skey, int64_t *ekey) {
assert(pKey >= keyFirst && pKey <= keyLast); assert(pKey >= keyFirst && pKey <= keyLast);
*skey = taosGetIntervalStartTimestamp(pKey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision); *skey = taosGetIntervalStartTimestamp(pKey, pQuery->slidingTime, pQuery->slidingTimeUnit, pQuery->precision);
if (keyFirst > (INT64_MAX - pQuery->intervalTime)) { if (keyFirst > (INT64_MAX - pQuery->intervalTime)) {
/* /*
...@@ -3355,13 +3735,62 @@ void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t pKey, int64_t ke ...@@ -3355,13 +3735,62 @@ void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t pKey, int64_t ke
} }
} }
static void getOneRowFromDataBlock(SQueryRuntimeEnv *pRuntimeEnv, char **dst, int32_t pos) { static bool loadPrevDataPoint(SQueryRuntimeEnv* pRuntimeEnv, char** result) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
SMeterObj* pMeterObj = pRuntimeEnv->pMeterObj;
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
int32_t bytes = pQuery->colList[i].data.bytes; /* the qualified point is not the first point in data block */
memcpy(dst[i], pRuntimeEnv->colDataBuffer[i]->data + pos * bytes, bytes); if (pQuery->pos > 0) {
int32_t prevPos = pQuery->pos - 1;
/* save the point that is directly after the specified point */
getOneRowFromDataBlock(pRuntimeEnv, result, prevPos);
} else {
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm];
savePointPosition(&pRuntimeEnv->startPos, pQuery->fileId, pQuery->slot, pQuery->pos);
// backwards movement would not set the pQuery->pos correct. We need to set it manually later.
moveToNextBlock(pRuntimeEnv, QUERY_DESC_FORWARD_STEP, searchFn, true);
/*
* no previous data exists.
* reset the status and load the data block that contains the qualified point
*/
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
dTrace("QInfo:%p no previous data block, start fileId:%d, slot:%d, pos:%d, qrange:%" PRId64 "-%" PRId64
", out of range",
GET_QINFO_ADDR(pQuery), pRuntimeEnv->startPos.fileId, pRuntimeEnv->startPos.slot,
pRuntimeEnv->startPos.pos, pQuery->skey, pQuery->ekey);
// no result, return immediately
setQueryStatus(pQuery, QUERY_COMPLETED);
return false;
} else { // prev has been located
if (pQuery->fileId >= 0) {
pQuery->pos = pQuery->pBlock[pQuery->slot].numOfPoints - 1;
getOneRowFromDataBlock(pRuntimeEnv, result, pQuery->pos);
qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery),
pQuery->fileId, pQuery->slot, pQuery->pos, pQuery->pos);
// restore to the start position
loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->startPos);
} else {
// moveToNextBlock make sure there is a available cache block, if exists
assert(vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD);
SCacheBlock* pBlock = &pRuntimeEnv->cacheBlock;
pQuery->pos = pBlock->numOfPoints - 1;
getOneRowFromDataBlock(pRuntimeEnv, result, pQuery->pos);
qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery),
pQuery->fileId, pQuery->slot, pBlock->numOfPoints - 1, pQuery->pos);
}
}
} }
return true;
} }
static bool getNeighborPoints(STableQuerySupportObj *pSupporter, SMeterObj *pMeterObj, static bool getNeighborPoints(STableQuerySupportObj *pSupporter, SMeterObj *pMeterObj,
...@@ -3381,10 +3810,8 @@ static bool getNeighborPoints(STableQuerySupportObj *pSupporter, SMeterObj *pMet ...@@ -3381,10 +3810,8 @@ static bool getNeighborPoints(STableQuerySupportObj *pSupporter, SMeterObj *pMet
} else { } else {
assert(QUERY_IS_ASC_QUERY(pQuery)); assert(QUERY_IS_ASC_QUERY(pQuery));
} }
assert(pPointInterpSupporter != NULL && pQuery->skey == pQuery->ekey); assert(pPointInterpSupporter != NULL && pQuery->skey == pQuery->ekey);
SCacheBlock *pBlock = NULL;
qTrace("QInfo:%p get next data point, fileId:%d, slot:%d, pos:%d", GET_QINFO_ADDR(pQuery), pQuery->fileId, qTrace("QInfo:%p get next data point, fileId:%d, slot:%d, pos:%d", GET_QINFO_ADDR(pQuery), pQuery->fileId,
pQuery->slot, pQuery->pos); pQuery->slot, pQuery->pos);
...@@ -3413,55 +3840,9 @@ static bool getNeighborPoints(STableQuerySupportObj *pSupporter, SMeterObj *pMet ...@@ -3413,55 +3840,9 @@ static bool getNeighborPoints(STableQuerySupportObj *pSupporter, SMeterObj *pMet
} }
return true; return true;
} }
/* the qualified point is not the first point in data block */ loadPrevDataPoint(pRuntimeEnv, pPointInterpSupporter->pPrevPoint);
if (pQuery->pos > 0) {
int32_t prevPos = pQuery->pos - 1;
/* save the point that is directly after the specified point */
getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, prevPos);
} else {
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm];
savePointPosition(&pRuntimeEnv->startPos, pQuery->fileId, pQuery->slot, pQuery->pos);
// backwards movement would not set the pQuery->pos correct. We need to set it manually later.
moveToNextBlock(pRuntimeEnv, QUERY_DESC_FORWARD_STEP, searchFn, true);
/*
* no previous data exists.
* reset the status and load the data block that contains the qualified point
*/
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
dTrace("QInfo:%p no previous data block, start fileId:%d, slot:%d, pos:%d, qrange:%" PRId64 "-%" PRId64
", out of range",
GET_QINFO_ADDR(pQuery), pRuntimeEnv->startPos.fileId, pRuntimeEnv->startPos.slot,
pRuntimeEnv->startPos.pos, pQuery->skey, pQuery->ekey);
// no result, return immediately
setQueryStatus(pQuery, QUERY_COMPLETED);
return false;
} else { // prev has been located
if (pQuery->fileId >= 0) {
pQuery->pos = pQuery->pBlock[pQuery->slot].numOfPoints - 1;
getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos);
qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery),
pQuery->fileId, pQuery->slot, pQuery->pos, pQuery->pos);
} else {
// moveToNextBlock make sure there is a available cache block, if exists
assert(vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD);
pBlock = &pRuntimeEnv->cacheBlock;
pQuery->pos = pBlock->numOfPoints - 1;
getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos);
qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery),
pQuery->fileId, pQuery->slot, pBlock->numOfPoints - 1, pQuery->pos);
}
}
}
pQuery->skey = *(TSKEY *)pPointInterpSupporter->pPrevPoint[0]; pQuery->skey = *(TSKEY *)pPointInterpSupporter->pPrevPoint[0];
pQuery->ekey = *(TSKEY *)pPointInterpSupporter->pNextPoint[0]; pQuery->ekey = *(TSKEY *)pPointInterpSupporter->pNextPoint[0];
pQuery->lastKey = pQuery->skey; pQuery->lastKey = pQuery->skey;
...@@ -3635,8 +4016,17 @@ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, STableQuerySup ...@@ -3635,8 +4016,17 @@ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, STableQuerySup
if (key != NULL) { if (key != NULL) {
*key = nextKey; *key = nextKey;
} }
return doGetQueryPos(nextKey, pSupporter, pPointInterpSupporter); // needs the data before the begin timestamp of query time window
if (nextKey != pQuery->skey) {
if (!pRuntimeEnv->hasTimeWindow) {
pQuery->skey = nextKey; // change the query skey
pQuery->lastKey = pQuery->skey;
}
return true;
} else {
return doGetQueryPos(nextKey, pSupporter, pPointInterpSupporter);
}
} }
// set no data in file // set no data in file
...@@ -4223,63 +4613,6 @@ static bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo, STableQuerySupportObj * ...@@ -4223,63 +4613,6 @@ static bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo, STableQuerySupportObj *
} }
} }
// if (win.ekey <= blockInfo.keyLast) {
// pQuery->limit.offset -= 1;
//
// if (win.ekey == blockInfo.keyLast) {
// moveToNextBlock(pRuntimeEnv, step, searchFn, false);
// if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
// break;
// }
//
// // next block does not included in time range, abort query
// blockInfo = getBlockInfo(pRuntimeEnv);
// if ((blockInfo.keyFirst > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
// (blockInfo.keyLast < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
// setQueryStatus(pQuery, QUERY_COMPLETED);
// break;
// }
//
// // set the window that start from the next data block
// win = getActiveTimeWindow(pWindowResInfo, blockInfo.keyFirst, pQuery);
// } else {
// // the time window is closed in current data block, load disk file block into memory to
// // check the next time window
// if (IS_DISK_DATA_BLOCK(pQuery)) {
// getTimestampInDiskBlock(pRuntimeEnv, 0);
// }
//
// STimeWindow nextWin = win;
// int32_t startPos =
// getNextQualifiedWindow(pRuntimeEnv, &nextWin, pWindowResInfo, &blockInfo, primaryKey, searchFn);
//
// if (startPos < 0) { // failed to find the qualified time window
// assert((nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
// (nextWin.ekey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery)));
//
// setQueryStatus(pQuery, QUERY_COMPLETED);
// break;
// } else { // set the abort info
// pQuery->pos = startPos;
// pQuery->lastKey = primaryKey[startPos];
// win = nextWin;
// }
// }
//
// continue;
// }
//
// moveToNextBlock(pRuntimeEnv, step, searchFn, false);
// if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
// break;
// }
//
// blockInfo = getBlockInfo(pRuntimeEnv);
// if ((blockInfo.keyFirst > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
// (blockInfo.keyLast < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
// setQueryStatus(pQuery, QUERY_COMPLETED);
// break;
// }
} }
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED) || pQuery->limit.offset > 0) { if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED) || pQuery->limit.offset > 0) {
...@@ -4468,7 +4801,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI ...@@ -4468,7 +4801,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
} }
void pointInterpSupporterInit(SQuery *pQuery, SPointInterpoSupporter *pInterpoSupport) { void pointInterpSupporterInit(SQuery *pQuery, SPointInterpoSupporter *pInterpoSupport) {
if (isPointInterpoQuery(pQuery)) { if (isPointInterpoQuery(pQuery) || needsBoundaryTS(pQuery)) {
pInterpoSupport->pPrevPoint = malloc(pQuery->numOfCols * POINTER_BYTES); pInterpoSupport->pPrevPoint = malloc(pQuery->numOfCols * POINTER_BYTES);
pInterpoSupport->pNextPoint = malloc(pQuery->numOfCols * POINTER_BYTES); pInterpoSupport->pNextPoint = malloc(pQuery->numOfCols * POINTER_BYTES);
...@@ -4547,12 +4880,15 @@ static int32_t allocateRuntimeEnvBuf(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *p ...@@ -4547,12 +4880,15 @@ static int32_t allocateRuntimeEnvBuf(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *p
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
// To make sure the start position of each buffer is aligned to 4bytes in 32-bit ARM system. // To make sure the start position of each buffer is aligned to 4bytes in 32-bit ARM system.
pRuntimeEnv->lastRowInBlock = calloc(pQuery->numOfCols, POINTER_BYTES);
for (int32_t i = 0; i < pQuery->numOfCols; ++i) { for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
int32_t bytes = pQuery->colList[i].data.bytes; int32_t bytes = pQuery->colList[i].data.bytes;
pRuntimeEnv->colDataBuffer[i] = calloc(1, sizeof(SData) + EXTRA_BYTES + pMeterObj->pointsPerFileBlock * bytes); pRuntimeEnv->colDataBuffer[i] = calloc(1, sizeof(SData) + EXTRA_BYTES + pMeterObj->pointsPerFileBlock * bytes);
if (pRuntimeEnv->colDataBuffer[i] == NULL) { if (pRuntimeEnv->colDataBuffer[i] == NULL) {
goto _error_clean; goto _error_clean;
} }
pRuntimeEnv->lastRowInBlock[i] = calloc(1, bytes);
} }
// record the maximum column width among columns of this meter/metric // record the maximum column width among columns of this meter/metric
...@@ -4666,7 +5002,9 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery ...@@ -4666,7 +5002,9 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pMeterObj = pMeterObj; pRuntimeEnv->pMeterObj = pMeterObj;
pRuntimeEnv->hasTimeWindow = !notHasQueryTimeRange(pQuery);
pRuntimeEnv->interpoSearch = needsBoundaryTS(pQuery);
if ((code = allocateRuntimeEnvBuf(pRuntimeEnv, pMeterObj)) != TSDB_CODE_SUCCESS) { if ((code = allocateRuntimeEnvBuf(pRuntimeEnv, pMeterObj)) != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -4721,9 +5059,9 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery ...@@ -4721,9 +5059,9 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
/* query on single table */ /* query on single table */
pSupporter->numOfMeters = 1; pSupporter->numOfMeters = 1;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED); setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
SPointInterpoSupporter interpInfo = {0}; SPointInterpoSupporter interpoSupporter = {0};
pointInterpSupporterInit(pQuery, &interpInfo); pointInterpSupporterInit(pQuery, &interpoSupporter);
/* /*
* in case of last_row query without query range, we set the query timestamp to * in case of last_row query without query range, we set the query timestamp to
...@@ -4731,11 +5069,11 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery ...@@ -4731,11 +5069,11 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
*/ */
if (isFirstLastRowQuery(pQuery) && notHasQueryTimeRange(pQuery)) { if (isFirstLastRowQuery(pQuery) && notHasQueryTimeRange(pQuery)) {
if (!normalizeUnBoundLastRowQuery(pSupporter, &interpInfo)) { if (!normalizeUnBoundLastRowQuery(pSupporter, &interpoSupporter)) {
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
pQInfo->over = 1; pQInfo->over = 1;
pointInterpSupporterDestroy(&interpInfo); pointInterpSupporterDestroy(&interpoSupporter);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else { // find the skey and ekey in case of sliding query } else { // find the skey and ekey in case of sliding query
...@@ -4749,23 +5087,34 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery ...@@ -4749,23 +5087,34 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
} }
int64_t skey = 0; int64_t skey = 0;
if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &skey) == false) || if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpoSupporter, &skey) == false) ||
(isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) || (isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) ||
(isTopBottomQuery(pQuery) && pQuery->limit.offset >= pQuery->pSelectExpr[1].pBase.arg[0].argValue.i64)) { (isTopBottomQuery(pQuery) && pQuery->limit.offset >= pQuery->pSelectExpr[1].pBase.arg[0].argValue.i64)) {
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
pQInfo->over = 1; pQInfo->over = 1;
pointInterpSupporterDestroy(&interpInfo); pointInterpSupporterDestroy(&interpoSupporter);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pQuery->skey = skey;
if (!QUERY_IS_ASC_QUERY(pQuery)) { if (!QUERY_IS_ASC_QUERY(pQuery)) {
win.skey = minKey; win.skey = minKey;
win.ekey = skey; win.ekey = skey;
pQuery->ekey = minKey;
} else { } else {
win.skey = skey; win.skey = skey;
win.ekey = pQuery->ekey; win.ekey = pQuery->ekey;
} }
// empty result
if (QUERY_IS_ASC_QUERY(pQuery) && win.skey > win.ekey) {
sem_post(&pQInfo->dataReady);
pQInfo->over = 1;
pointInterpSupporterDestroy(&interpoSupporter);
return TSDB_CODE_SUCCESS;
}
TSKEY skey1, ekey1; TSKEY skey1, ekey1;
TSKEY windowSKey = 0, windowEKey = 0; TSKEY windowSKey = 0, windowEKey = 0;
...@@ -4784,13 +5133,13 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery ...@@ -4784,13 +5133,13 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
pQuery->over = QUERY_NOT_COMPLETED; pQuery->over = QUERY_NOT_COMPLETED;
} else { } else {
int64_t ekey = 0; int64_t ekey = 0;
if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &ekey) == false) || if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpoSupporter, &ekey) == false) ||
(isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) || (isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) ||
(isTopBottomQuery(pQuery) && pQuery->limit.offset >= pQuery->pSelectExpr[1].pBase.arg[0].argValue.i64)) { (isTopBottomQuery(pQuery) && pQuery->limit.offset >= pQuery->pSelectExpr[1].pBase.arg[0].argValue.i64)) {
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
pQInfo->over = 1; pQInfo->over = 1;
pointInterpSupporterDestroy(&interpInfo); pointInterpSupporterDestroy(&interpoSupporter);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
...@@ -4800,14 +5149,14 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery ...@@ -4800,14 +5149,14 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
* here we set the value for before and after the specified time into the * here we set the value for before and after the specified time into the
* parameter for interpolation query * parameter for interpolation query
*/ */
pointInterpSupporterSetData(pQInfo, &interpInfo); pointInterpSupporterSetData(pQInfo, &interpoSupporter);
pointInterpSupporterDestroy(&interpInfo); pointInterpSupporterDestroy(&interpoSupporter);
if (!forwardQueryStartPosIfNeeded(pQInfo, pSupporter, dataInDisk, dataInCache)) { if (!forwardQueryStartPosIfNeeded(pQInfo, pSupporter, dataInDisk, dataInCache)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int64_t rs = taosGetIntervalStartTimestamp(pSupporter->rawSKey, pQuery->intervalTime, pQuery->intervalTimeUnit, int64_t rs = taosGetIntervalStartTimestamp(pSupporter->rawSKey, pQuery->intervalTime, pQuery->slidingTimeUnit,
pQuery->precision); pQuery->precision);
taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, rs, 0, 0); taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, rs, 0, 0);
allocMemForInterpo(pSupporter, pQuery, pMeterObj); allocMemForInterpo(pSupporter, pQuery, pMeterObj);
...@@ -4888,6 +5237,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { ...@@ -4888,6 +5237,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
pSupporter->rawEKey = pQuery->ekey; pSupporter->rawEKey = pQuery->ekey;
pSupporter->rawSKey = pQuery->skey; pSupporter->rawSKey = pQuery->skey;
pQuery->lastKey = pQuery->skey; pQuery->lastKey = pQuery->skey;
pRuntimeEnv->interpoSearch = needsBoundaryTS(pQuery);
// create runtime environment // create runtime environment
SColumnModel *pTagSchemaInfo = pSupporter->pSidSet->pColumnModel; SColumnModel *pTagSchemaInfo = pSupporter->pSidSet->pColumnModel;
...@@ -4943,7 +5293,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { ...@@ -4943,7 +5293,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
} }
TSKEY revisedStime = taosGetIntervalStartTimestamp(pSupporter->rawSKey, pQuery->intervalTime, TSKEY revisedStime = taosGetIntervalStartTimestamp(pSupporter->rawSKey, pQuery->intervalTime,
pQuery->intervalTimeUnit, pQuery->precision); pQuery->slidingTimeUnit, pQuery->precision);
taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, revisedStime, 0, 0); taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, revisedStime, 0, 0);
pRuntimeEnv->stableQuery = true; pRuntimeEnv->stableQuery = true;
...@@ -5389,6 +5739,8 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -5389,6 +5739,8 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
if (isIntervalQuery(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) { if (isIntervalQuery(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) {
if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) {
closeAllTimeWindow(&pRuntimeEnv->windowResInfo); closeAllTimeWindow(&pRuntimeEnv->windowResInfo);
removeRedundantWindow(&pRuntimeEnv->windowResInfo, pQuery->lastKey - step, step);
pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1;
} else if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { // check if window needs to be closed } else if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { // check if window needs to be closed
SBlockInfo blockInfo = getBlockInfo(pRuntimeEnv); SBlockInfo blockInfo = getBlockInfo(pRuntimeEnv);
...@@ -5448,7 +5800,6 @@ void vnodeSetTagValueInParam(tSidSet *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, SM ...@@ -5448,7 +5800,6 @@ void vnodeSetTagValueInParam(tSidSet *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, SM
} }
// set the join tag for first column // set the join tag for first column
SSqlFuncExprMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase;
if (pFuncMsg->functionId == TSDB_FUNC_TS && pFuncMsg->colInfo.colIdx == PRIMARYKEY_TIMESTAMP_COL_INDEX && if (pFuncMsg->functionId == TSDB_FUNC_TS && pFuncMsg->colInfo.colIdx == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
pRuntimeEnv->pTSBuf != NULL) { pRuntimeEnv->pTSBuf != NULL) {
assert(pFuncMsg->numOfParams == 1); assert(pFuncMsg->numOfParams == 1);
...@@ -5474,9 +5825,6 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes ...@@ -5474,9 +5825,6 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
pCtx[i].hasNull = true; pCtx[i].hasNull = true;
pCtx[i].nStartQueryTimestamp = timestamp; pCtx[i].nStartQueryTimestamp = timestamp;
pCtx[i].aInputElemBuf = getPosInResultPage(pRuntimeEnv, i, pWindowRes); pCtx[i].aInputElemBuf = getPosInResultPage(pRuntimeEnv, i, pWindowRes);
// pCtx[i].aInputElemBuf = ((char *)inputSrc->data) +
// ((int32_t)pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage) +
// pCtx[i].outputBytes * inputIdx;
// in case of tag column, the tag information should be extracted from input buffer // in case of tag column, the tag information should be extracted from input buffer
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG) { if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG) {
...@@ -7172,7 +7520,6 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, STableQuerySupportO ...@@ -7172,7 +7520,6 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, STableQuerySupportO
doGetAlignedIntervalQueryRangeImpl(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &windowSKey, &windowEKey); doGetAlignedIntervalQueryRangeImpl(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &windowSKey, &windowEKey);
pWindowResInfo->startTime = windowSKey; // windowSKey may be 0 in case of 1970 timestamp pWindowResInfo->startTime = windowSKey; // windowSKey may be 0 in case of 1970 timestamp
// assert(pWindowResInfo->startTime > 0);
if (pWindowResInfo->prevSKey == 0) { if (pWindowResInfo->prevSKey == 0) {
if (QUERY_IS_ASC_QUERY(pQuery)) { if (QUERY_IS_ASC_QUERY(pQuery)) {
...@@ -7477,6 +7824,8 @@ void stableApplyFunctionsOnBlock(STableQuerySupportObj *pSupporter, SMeterDataIn ...@@ -7477,6 +7824,8 @@ void stableApplyFunctionsOnBlock(STableQuerySupportObj *pSupporter, SMeterDataIn
updateWindowResNumOfRes(pRuntimeEnv, pMeterDataInfo); updateWindowResNumOfRes(pRuntimeEnv, pMeterDataInfo);
updatelastkey(pQuery, pMeterQueryInfo); updatelastkey(pQuery, pMeterQueryInfo);
doCheckQueryCompleted(pRuntimeEnv, pMeterQueryInfo->lastKey, pWindowResInfo);
} }
// we need to split the refstatsult into different packages. // we need to split the refstatsult into different packages.
...@@ -7536,7 +7885,7 @@ bool vnodeHasRemainResults(void *handle) { ...@@ -7536,7 +7885,7 @@ bool vnodeHasRemainResults(void *handle) {
// query has completed // query has completed
if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) {
TSKEY ekey = taosGetRevisedEndKey(pSupporter->rawEKey, pQuery->order.order, pQuery->intervalTime, TSKEY ekey = taosGetRevisedEndKey(pSupporter->rawEKey, pQuery->order.order, pQuery->intervalTime,
pQuery->intervalTimeUnit, pQuery->precision); pQuery->slidingTimeUnit, pQuery->precision);
int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pRuntimeEnv->pInterpoBuf[0]->data, int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pRuntimeEnv->pInterpoBuf[0]->data,
remain, pQuery->intervalTime, ekey, pQuery->pointsToRead); remain, pQuery->intervalTime, ekey, pQuery->pointsToRead);
return numOfTotal > 0; return numOfTotal > 0;
...@@ -7647,7 +7996,7 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage ...@@ -7647,7 +7996,7 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage
numOfRows = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo); numOfRows = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo);
TSKEY ekey = taosGetRevisedEndKey(pSupporter->rawEKey, pQuery->order.order, pQuery->intervalTime, TSKEY ekey = taosGetRevisedEndKey(pSupporter->rawEKey, pQuery->order.order, pQuery->intervalTime,
pQuery->intervalTimeUnit, pQuery->precision); pQuery->slidingTimeUnit, pQuery->precision);
int32_t numOfFinalRows = taosGetNumOfResultWithInterpo(&pRuntimeEnv->interpoInfo, (TSKEY *)pDataSrc[0]->data, int32_t numOfFinalRows = taosGetNumOfResultWithInterpo(&pRuntimeEnv->interpoInfo, (TSKEY *)pDataSrc[0]->data,
numOfRows, pQuery->intervalTime, ekey, pQuery->pointsToRead); numOfRows, pQuery->intervalTime, ekey, pQuery->pointsToRead);
......
...@@ -269,7 +269,7 @@ static SQInfo *vnodeAllocateQInfoEx(SQueryMeterMsg *pQueryMsg, SSqlGroupbyExpr * ...@@ -269,7 +269,7 @@ static SQInfo *vnodeAllocateQInfoEx(SQueryMeterMsg *pQueryMsg, SSqlGroupbyExpr *
pQuery->intervalTime = pQueryMsg->intervalTime; pQuery->intervalTime = pQueryMsg->intervalTime;
pQuery->slidingTime = pQueryMsg->slidingTime; pQuery->slidingTime = pQueryMsg->slidingTime;
pQuery->interpoType = pQueryMsg->interpoType; pQuery->interpoType = pQueryMsg->interpoType;
pQuery->intervalTimeUnit = pQueryMsg->intervalTimeUnit; pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit;
pQInfo->query.pointsToRead = vnodeList[pMeterObj->vnode].cfg.rowsInFileBlock; pQInfo->query.pointsToRead = vnodeList[pMeterObj->vnode].cfg.rowsInFileBlock;
......
...@@ -22,12 +22,12 @@ ...@@ -22,12 +22,12 @@
#define INTERPOL_IS_ASC_INTERPOL(interp) ((interp)->order == TSQL_SO_ASC) #define INTERPOL_IS_ASC_INTERPOL(interp) ((interp)->order == TSQL_SO_ASC)
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char intervalTimeUnit, int16_t precision) { int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char slidingTimeUnit, int16_t precision) {
if (timeRange == 0) { if (timeRange == 0) {
return startTime; return startTime;
} }
if (intervalTimeUnit == 'a' || intervalTimeUnit == 'm' || intervalTimeUnit == 's' || intervalTimeUnit == 'h') { if (slidingTimeUnit == 'a' || slidingTimeUnit == 'm' || slidingTimeUnit == 's' || slidingTimeUnit == 'h') {
return (startTime / timeRange) * timeRange; return (startTime / timeRange) * timeRange;
} else { } else {
/* /*
...@@ -95,11 +95,11 @@ void taosInterpoSetStartInfo(SInterpolationInfo* pInterpoInfo, int32_t numOfRawD ...@@ -95,11 +95,11 @@ void taosInterpoSetStartInfo(SInterpolationInfo* pInterpoInfo, int32_t numOfRawD
pInterpoInfo->numOfRawDataInRows = numOfRawDataInRows; pInterpoInfo->numOfRawDataInRows = numOfRawDataInRows;
} }
TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int32_t timeInterval, int8_t intervalTimeUnit, int8_t precision) { TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int32_t timeInterval, int8_t slidingTimeUnit, int8_t precision) {
if (order == TSQL_SO_ASC) { if (order == TSQL_SO_ASC) {
return ekey; return ekey;
} else { } else {
return taosGetIntervalStartTimestamp(ekey, timeInterval, intervalTimeUnit, precision); return taosGetIntervalStartTimestamp(ekey, timeInterval, slidingTimeUnit, precision);
} }
} }
...@@ -191,6 +191,49 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi ...@@ -191,6 +191,49 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi
return 0; return 0;
} }
int taosDoLinearInterpolationD(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) {
switch (type) {
case TSDB_DATA_TYPE_INT: {
*(double*) point->val = doLinearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, point1->key,
point2->key, point->key);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
*(double*)point->val =
doLinearInterpolationImpl(*(float*)point1->val, *(float*)point2->val, point1->key, point2->key, point->key);
break;
};
case TSDB_DATA_TYPE_DOUBLE: {
*(double*)point->val =
doLinearInterpolationImpl(*(double*)point1->val, *(double*)point2->val, point1->key, point2->key, point->key);
break;
};
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT: {
*(double*)point->val = doLinearInterpolationImpl(*(int64_t*)point1->val, *(int64_t*)point2->val, point1->key,
point2->key, point->key);
break;
};
case TSDB_DATA_TYPE_SMALLINT: {
*(double*)point->val = doLinearInterpolationImpl(*(int16_t*)point1->val, *(int16_t*)point2->val, point1->key,
point2->key, point->key);
break;
};
case TSDB_DATA_TYPE_TINYINT: {
*(double*)point->val =
doLinearInterpolationImpl(*(int8_t*)point1->val, *(int8_t*)point2->val, point1->key, point2->key, point->key);
break;
};
default: {
// TODO: Deal with interpolation with bool and strings and timestamp
return -1;
}
}
return 0;
}
static char* getPos(char* data, int32_t bytes, int32_t index) { return data + index * bytes; } static char* getPos(char* data, int32_t bytes, int32_t index) { return data + index * bytes; }
static void setTagsValueInInterpolation(tFilePage** data, char** pTags, SColumnModel* pModel, int32_t order, static void setTagsValueInInterpolation(tFilePage** data, char** pTags, SColumnModel* pModel, int32_t order,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册