提交 cb00cc8c 编写于 作者: H hjxilinx

[TD-28]fix rate function bugs in reversed traverse query, and interpolation...

[TD-28]fix rate function bugs in reversed traverse query, and interpolation for start and end timestamp of query time window
上级 6c90a198
...@@ -197,7 +197,7 @@ typedef struct SDataBlockList { ...@@ -197,7 +197,7 @@ typedef struct SDataBlockList {
typedef struct SQueryInfo { typedef struct SQueryInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately. int16_t command; // the command may be different for each subclause, so keep it seperately.
uint16_t type; // query/insert/import type uint16_t type; // query/insert/import type
char intervalTimeUnit; char slidingTimeUnit;
int64_t etime, stime; int64_t etime, stime;
int64_t intervalTime; // aggregation time interval int64_t intervalTime; // aggregation time interval
......
...@@ -4400,7 +4400,7 @@ static double do_calc_rate(const SRateInfo* pRateInfo) { ...@@ -4400,7 +4400,7 @@ static double do_calc_rate(const SRateInfo* pRateInfo) {
} }
} }
double duration = (pRateInfo->lastKey - pRateInfo->firstKey) / 1000; double duration = (pRateInfo->lastKey - pRateInfo->firstKey) / 1000.0;
double resultVal = diff / duration; double resultVal = diff / duration;
pTrace("do_calc_rate() isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%f lastValue:%f CorrectionValue:%f resultVal:%f", pTrace("do_calc_rate() isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%f lastValue:%f CorrectionValue:%f resultVal:%f",
......
...@@ -598,9 +598,6 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { ...@@ -598,9 +598,6 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
pQueryInfo->intervalTime = pQueryInfo->intervalTime / 1000; pQueryInfo->intervalTime = pQueryInfo->intervalTime / 1000;
} }
/* parser has filter the illegal type, no need to check here */
pQueryInfo->intervalTimeUnit = pQuerySql->interval.z[pQuerySql->interval.n - 1];
// interval cannot be less than 10 milliseconds // interval cannot be less than 10 milliseconds
if (pQueryInfo->intervalTime < tsMinIntervalTime) { if (pQueryInfo->intervalTime < tsMinIntervalTime) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2); return invalidSqlErrMsg(pQueryInfo->msg, msg2);
...@@ -689,10 +686,15 @@ int32_t parseSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { ...@@ -689,10 +686,15 @@ int32_t parseSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
if (pQueryInfo->slidingTime > pQueryInfo->intervalTime) { if (pQueryInfo->slidingTime > pQueryInfo->intervalTime) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1); return invalidSqlErrMsg(pQueryInfo->msg, msg1);
} }
pQueryInfo->slidingTimeUnit = pQuerySql->sliding.z[pQuerySql->sliding.n - 1];
} else { } else {
pQueryInfo->slidingTime = pQueryInfo->intervalTime; pQueryInfo->slidingTime = pQueryInfo->intervalTime;
// parser has filter the illegal type, no need to check here
pQueryInfo->slidingTimeUnit = pQuerySql->interval.z[pQuerySql->interval.n - 1];
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -324,7 +324,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -324,7 +324,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime; int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t revisedSTime = int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec); taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec);
SInterpolationInfo *pInterpoInfo = &pReducer->interpolationInfo; SInterpolationInfo *pInterpoInfo = &pReducer->interpolationInfo;
taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
...@@ -779,7 +779,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo ...@@ -779,7 +779,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime; int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t revisedSTime = int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec); taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec);
taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
pLocalReducer->rowSize); pLocalReducer->rowSize);
...@@ -923,7 +923,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo ...@@ -923,7 +923,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
while (1) { while (1) {
int32_t remains = taosNumOfRemainPoints(pInterpoInfo); int32_t remains = taosNumOfRemainPoints(pInterpoInfo);
TSKEY etime = taosGetRevisedEndKey(actualETime, pQueryInfo->order.order, pQueryInfo->intervalTime, TSKEY etime = taosGetRevisedEndKey(actualETime, pQueryInfo->order.order, pQueryInfo->intervalTime,
pQueryInfo->intervalTimeUnit, precision); pQueryInfo->slidingTimeUnit, precision);
int32_t nrows = taosGetNumOfResultWithInterpo(pInterpoInfo, pPrimaryKeys, remains, pQueryInfo->intervalTime, etime, int32_t nrows = taosGetNumOfResultWithInterpo(pInterpoInfo, pPrimaryKeys, remains, pQueryInfo->intervalTime, etime,
pLocalReducer->resColModel->capacity); pLocalReducer->resColModel->capacity);
...@@ -1275,7 +1275,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer ...@@ -1275,7 +1275,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) { if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime; int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t newTime = int64_t newTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, precision); taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision);
taosInitInterpoInfo(&pLocalReducer->interpolationInfo, pQueryInfo->order.order, newTime, taosInitInterpoInfo(&pLocalReducer->interpolationInfo, pQueryInfo->order.order, newTime,
pQueryInfo->groupbyExpr.numOfGroupCols, pLocalReducer->rowSize); pQueryInfo->groupbyExpr.numOfGroupCols, pLocalReducer->rowSize);
...@@ -1305,7 +1305,7 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) { ...@@ -1305,7 +1305,7 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) {
int32_t remain = taosNumOfRemainPoints(pInterpoInfo); int32_t remain = taosNumOfRemainPoints(pInterpoInfo);
TSKEY ekey = TSKEY ekey =
taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, p); taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, p);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pLocalReducer->pBufForInterpo, remain, int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pLocalReducer->pBufForInterpo, remain,
pQueryInfo->intervalTime, ekey, pLocalReducer->resColModel->capacity); pQueryInfo->intervalTime, ekey, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo if (rows > 0) { // do interpo
...@@ -1338,7 +1338,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { ...@@ -1338,7 +1338,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
int64_t etime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->etime : pQueryInfo->stime; int64_t etime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->etime : pQueryInfo->stime;
etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime,
pQueryInfo->intervalTimeUnit, precision); pQueryInfo->slidingTimeUnit, precision);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, NULL, 0, pQueryInfo->intervalTime, etime, int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, NULL, 0, pQueryInfo->intervalTime, etime,
pLocalReducer->resColModel->capacity); pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo if (rows > 0) { // do interpo
......
...@@ -1680,7 +1680,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1680,7 +1680,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime); pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime);
pQueryMsg->intervalTimeUnit = pQueryInfo->intervalTimeUnit; pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime); pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime);
if (pQueryInfo->intervalTime < 0) { if (pQueryInfo->intervalTime < 0) {
......
...@@ -513,7 +513,7 @@ typedef struct { ...@@ -513,7 +513,7 @@ typedef struct {
int16_t orderColId; int16_t orderColId;
int16_t numOfCols; // the number of columns will be load from vnode int16_t numOfCols; // the number of columns will be load from vnode
char intervalTimeUnit; // time interval type, for revisement of interval(1d) char slidingTimeUnit; // time interval type, for revisement of interval(1d)
int64_t intervalTime; // time interval for aggregation, in million second int64_t intervalTime; // time interval for aggregation, in million second
int64_t slidingTime; // value for sliding window int64_t slidingTime; // value for sliding window
......
...@@ -261,7 +261,7 @@ typedef struct SQuery { ...@@ -261,7 +261,7 @@ typedef struct SQuery {
TSKEY ekey; TSKEY ekey;
int64_t intervalTime; int64_t intervalTime;
int64_t slidingTime; // sliding time for sliding window query int64_t slidingTime; // sliding time for sliding window query
char intervalTimeUnit; // interval data type, used for daytime revise char slidingTimeUnit; // interval data type, used for daytime revise
int8_t precision; int8_t precision;
int16_t numOfOutputCols; int16_t numOfOutputCols;
int16_t interpoType; int16_t interpoType;
......
...@@ -1663,8 +1663,12 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, ...@@ -1663,8 +1663,12 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
continue; continue;
} }
if ((pResult->window.ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) || /*
(pResult->window.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) { * when the ekey equals to lastKey of current block, do NOT close it, since the interpolation may
* be involved.
*/
if ((pResult->window.ekey < lastKey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pResult->window.skey > lastKey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeTimeWindow(pWindowResInfo, i); closeTimeWindow(pWindowResInfo, i);
} else { } else {
skey = pResult->window.skey; skey = pResult->window.skey;
...@@ -1998,7 +2002,7 @@ static void interpolateStartKeyValue(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* ...@@ -1998,7 +2002,7 @@ static void interpolateStartKeyValue(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo*
if (QUERY_IS_ASC_QUERY(pQuery)) { if (QUERY_IS_ASC_QUERY(pQuery)) {
// the queried time window and time window of data block must be intersect // the queried time window and time window of data block must be intersect
assert(win->ekey >= pBlockInfo->keyFirst && win->skey <= pBlockInfo->keyLast); // assert(win->ekey >= pBlockInfo->keyFirst && win->skey <= pBlockInfo->keyLast);
/* /*
* this win should not be the first time window that starts from a less timestamp than * this win should not be the first time window that starts from a less timestamp than
...@@ -2151,20 +2155,27 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t ...@@ -2151,20 +2155,27 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
// get current not closed time window // get current not closed time window
int32_t slot = pWindowResInfo->curIndex; int32_t slot = pWindowResInfo->curIndex;
if (slot != -1) { if (slot != -1) {
STimeWindow w = getWindowResult(pWindowResInfo, slot)->window; while (slot < pWindowResInfo->size) {
STimeWindow w = getWindowResult(pWindowResInfo, slot)->window;
// if current window is closed and locates before current active block, interpolate the result and close it
if (w.skey != win.skey || w.ekey != win.ekey) { // if current active window locates before current data block, do interpolate the result and close it
assert((w.ekey < win.skey && w.ekey < ts && QUERY_IS_ASC_QUERY(pQuery)) || if (w.skey != win.skey || w.ekey != win.ekey) {
(w.skey > win.ekey && w.skey > ts && !QUERY_IS_ASC_QUERY(pQuery))); assert((w.skey < win.skey && w.ekey < ts && QUERY_IS_ASC_QUERY(pQuery)) ||
(w.skey > win.skey && w.skey > ts && !QUERY_IS_ASC_QUERY(pQuery)));
forwardStep = 0;
doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &w, offset, forwardStep); forwardStep = 0;
doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &w, offset, forwardStep);
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &w, pQuery->pos, forwardStep); SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &w, pQuery->pos, forwardStep);
closeTimeWindow(pWindowResInfo, curTimeWindow(pWindowResInfo));
closeTimeWindow(pWindowResInfo, curTimeWindow(pWindowResInfo));
} else {
break;
}
// try next time window
slot += 1;
} }
} }
...@@ -2441,6 +2452,26 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) { ...@@ -2441,6 +2452,26 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) {
} }
} }
/*
* remove the results that are not the FIRST time window that spreads beyond the
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time
*/
void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order) {
assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size);
int32_t i = 0;
while(i < pWindowResInfo->size &&
((pWindowResInfo->pResult[i].window.ekey < lastKey && order == QUERY_ASC_FORWARD_STEP) ||
(pWindowResInfo->pResult[i].window.skey > lastKey && order == QUERY_DESC_FORWARD_STEP))) {
++i;
}
// assert(i < pWindowResInfo->size);
if (i < pWindowResInfo->size) {
pWindowResInfo->size = (i + 1);
}
}
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes) { static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes) {
if (isNull(pData, type)) { // ignore the null value if (isNull(pData, type)) { // ignore the null value
return -1; return -1;
...@@ -3614,7 +3645,7 @@ void vnodeCheckIfDataExists(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, ...@@ -3614,7 +3645,7 @@ void vnodeCheckIfDataExists(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj,
void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t pKey, int64_t keyFirst, int64_t keyLast, void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t pKey, int64_t keyFirst, int64_t keyLast,
int64_t *actualSkey, int64_t *actualEkey, int64_t *skey, int64_t *ekey) { int64_t *actualSkey, int64_t *actualEkey, int64_t *skey, int64_t *ekey) {
assert(pKey >= keyFirst && pKey <= keyLast); assert(pKey >= keyFirst && pKey <= keyLast);
*skey = taosGetIntervalStartTimestamp(pKey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision); *skey = taosGetIntervalStartTimestamp(pKey, pQuery->slidingTime, pQuery->slidingTimeUnit, pQuery->precision);
if (keyFirst > (INT64_MAX - pQuery->intervalTime)) { if (keyFirst > (INT64_MAX - pQuery->intervalTime)) {
/* /*
...@@ -5065,7 +5096,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery ...@@ -5065,7 +5096,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int64_t rs = taosGetIntervalStartTimestamp(pSupporter->rawSKey, pQuery->intervalTime, pQuery->intervalTimeUnit, int64_t rs = taosGetIntervalStartTimestamp(pSupporter->rawSKey, pQuery->intervalTime, pQuery->slidingTimeUnit,
pQuery->precision); pQuery->precision);
taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, rs, 0, 0); taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, rs, 0, 0);
allocMemForInterpo(pSupporter, pQuery, pMeterObj); allocMemForInterpo(pSupporter, pQuery, pMeterObj);
...@@ -5201,7 +5232,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { ...@@ -5201,7 +5232,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
} }
TSKEY revisedStime = taosGetIntervalStartTimestamp(pSupporter->rawSKey, pQuery->intervalTime, TSKEY revisedStime = taosGetIntervalStartTimestamp(pSupporter->rawSKey, pQuery->intervalTime,
pQuery->intervalTimeUnit, pQuery->precision); pQuery->slidingTimeUnit, pQuery->precision);
taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, revisedStime, 0, 0); taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, revisedStime, 0, 0);
pRuntimeEnv->stableQuery = true; pRuntimeEnv->stableQuery = true;
...@@ -5647,6 +5678,8 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -5647,6 +5678,8 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
if (isIntervalQuery(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) { if (isIntervalQuery(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) {
if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) {
closeAllTimeWindow(&pRuntimeEnv->windowResInfo); closeAllTimeWindow(&pRuntimeEnv->windowResInfo);
removeRedundantWindow(&pRuntimeEnv->windowResInfo, pQuery->lastKey - step, step);
pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1;
} else if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { // check if window needs to be closed } else if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { // check if window needs to be closed
SBlockInfo blockInfo = getBlockInfo(pRuntimeEnv); SBlockInfo blockInfo = getBlockInfo(pRuntimeEnv);
...@@ -7789,7 +7822,7 @@ bool vnodeHasRemainResults(void *handle) { ...@@ -7789,7 +7822,7 @@ bool vnodeHasRemainResults(void *handle) {
// query has completed // query has completed
if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) {
TSKEY ekey = taosGetRevisedEndKey(pSupporter->rawEKey, pQuery->order.order, pQuery->intervalTime, TSKEY ekey = taosGetRevisedEndKey(pSupporter->rawEKey, pQuery->order.order, pQuery->intervalTime,
pQuery->intervalTimeUnit, pQuery->precision); pQuery->slidingTimeUnit, pQuery->precision);
int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pRuntimeEnv->pInterpoBuf[0]->data, int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pRuntimeEnv->pInterpoBuf[0]->data,
remain, pQuery->intervalTime, ekey, pQuery->pointsToRead); remain, pQuery->intervalTime, ekey, pQuery->pointsToRead);
return numOfTotal > 0; return numOfTotal > 0;
...@@ -7900,7 +7933,7 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage ...@@ -7900,7 +7933,7 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage
numOfRows = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo); numOfRows = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo);
TSKEY ekey = taosGetRevisedEndKey(pSupporter->rawEKey, pQuery->order.order, pQuery->intervalTime, TSKEY ekey = taosGetRevisedEndKey(pSupporter->rawEKey, pQuery->order.order, pQuery->intervalTime,
pQuery->intervalTimeUnit, pQuery->precision); pQuery->slidingTimeUnit, pQuery->precision);
int32_t numOfFinalRows = taosGetNumOfResultWithInterpo(&pRuntimeEnv->interpoInfo, (TSKEY *)pDataSrc[0]->data, int32_t numOfFinalRows = taosGetNumOfResultWithInterpo(&pRuntimeEnv->interpoInfo, (TSKEY *)pDataSrc[0]->data,
numOfRows, pQuery->intervalTime, ekey, pQuery->pointsToRead); numOfRows, pQuery->intervalTime, ekey, pQuery->pointsToRead);
......
...@@ -269,7 +269,7 @@ static SQInfo *vnodeAllocateQInfoEx(SQueryMeterMsg *pQueryMsg, SSqlGroupbyExpr * ...@@ -269,7 +269,7 @@ static SQInfo *vnodeAllocateQInfoEx(SQueryMeterMsg *pQueryMsg, SSqlGroupbyExpr *
pQuery->intervalTime = pQueryMsg->intervalTime; pQuery->intervalTime = pQueryMsg->intervalTime;
pQuery->slidingTime = pQueryMsg->slidingTime; pQuery->slidingTime = pQueryMsg->slidingTime;
pQuery->interpoType = pQueryMsg->interpoType; pQuery->interpoType = pQueryMsg->interpoType;
pQuery->intervalTimeUnit = pQueryMsg->intervalTimeUnit; pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit;
pQInfo->query.pointsToRead = vnodeList[pMeterObj->vnode].cfg.rowsInFileBlock; pQInfo->query.pointsToRead = vnodeList[pMeterObj->vnode].cfg.rowsInFileBlock;
......
...@@ -22,12 +22,12 @@ ...@@ -22,12 +22,12 @@
#define INTERPOL_IS_ASC_INTERPOL(interp) ((interp)->order == TSQL_SO_ASC) #define INTERPOL_IS_ASC_INTERPOL(interp) ((interp)->order == TSQL_SO_ASC)
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char intervalTimeUnit, int16_t precision) { int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char slidingTimeUnit, int16_t precision) {
if (timeRange == 0) { if (timeRange == 0) {
return startTime; return startTime;
} }
if (intervalTimeUnit == 'a' || intervalTimeUnit == 'm' || intervalTimeUnit == 's' || intervalTimeUnit == 'h') { if (slidingTimeUnit == 'a' || slidingTimeUnit == 'm' || slidingTimeUnit == 's' || slidingTimeUnit == 'h') {
return (startTime / timeRange) * timeRange; return (startTime / timeRange) * timeRange;
} else { } else {
/* /*
...@@ -95,11 +95,11 @@ void taosInterpoSetStartInfo(SInterpolationInfo* pInterpoInfo, int32_t numOfRawD ...@@ -95,11 +95,11 @@ void taosInterpoSetStartInfo(SInterpolationInfo* pInterpoInfo, int32_t numOfRawD
pInterpoInfo->numOfRawDataInRows = numOfRawDataInRows; pInterpoInfo->numOfRawDataInRows = numOfRawDataInRows;
} }
TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int32_t timeInterval, int8_t intervalTimeUnit, int8_t precision) { TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int32_t timeInterval, int8_t slidingTimeUnit, int8_t precision) {
if (order == TSQL_SO_ASC) { if (order == TSQL_SO_ASC) {
return ekey; return ekey;
} else { } else {
return taosGetIntervalStartTimestamp(ekey, timeInterval, intervalTimeUnit, precision); return taosGetIntervalStartTimestamp(ekey, timeInterval, slidingTimeUnit, precision);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册