From 76d407395ff917e2aee084ef3b2be59626f26eff Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Fri, 11 Sep 2020 15:40:26 +0800 Subject: [PATCH] td-1245: refactor interval/sliding/offset to one struct --- src/client/inc/tscUtil.h | 1 + src/client/inc/tsclient.h | 7 +-- src/client/src/tscLocalMerge.c | 25 ++++---- src/client/src/tscSQLParser.c | 98 +++++++++++++++++++------------- src/client/src/tscServer.c | 14 +++-- src/client/src/tscStream.c | 36 ++++++------ src/client/src/tscSubquery.c | 10 +++- src/client/src/tscUtil.c | 5 +- src/inc/taosmsg.h | 7 +-- src/os/inc/osTime.h | 13 +++++ src/os/src/detail/osTime.c | 96 +++++++++++++++++++++++++++++++ src/query/inc/qExecutor.h | 5 +- src/query/inc/qFill.h | 3 +- src/query/src/qExecutor.c | 101 +++++++++++++++++---------------- src/query/src/qFill.c | 32 +++++------ src/query/src/qUtil.c | 2 +- 16 files changed, 284 insertions(+), 171 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 748b19069b..c4fba06426 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -69,6 +69,7 @@ typedef struct SJoinSupporter { SSubqueryState* pState; SSqlObj* pObj; // parent SqlObj int32_t subqueryIndex; // index of sub query + SInterval interval; SLimitVal limit; // limit info uint64_t uid; // query table uid SArray* colList; // previous query information, no need to use this attribute, and the corresponding attribution diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index d98826e339..17c4e7fc55 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -226,13 +226,8 @@ typedef struct SQueryInfo { int16_t command; // the command may be different for each subclause, so keep it seperately. uint32_t type; // query/insert type // TODO refactor - char intervalTimeUnit; - char slidingTimeUnit; - char offsetTimeUnit; STimeWindow window; // query time window - int64_t intervalTime; // aggregation time window range - int64_t slidingTime; // sliding window in mseconds - int64_t offsetTime; // start offset of each time window + SInterval interval; int32_t tz; // query client timezone SSqlGroupbyExpr groupbyExpr; // group by tags info diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index af6a546ff4..19abbe32e5 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -368,13 +368,12 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); TSKEY stime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.skey : pQueryInfo->window.ekey; - int64_t revisedSTime = - taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision); + int64_t revisedSTime = taosTimeTruncate(stime, &pQueryInfo->interval, tinfo.precision); if (pQueryInfo->fillType != TSDB_FILL_NONE) { SFillColInfo* pFillCol = createFillColInfo(pQueryInfo); pReducer->pFillInfo = taosInitFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, - 4096, (int32_t)numOfCols, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, + 4096, (int32_t)numOfCols, pQueryInfo->interval.sliding, pQueryInfo->interval.slidingUnit, tinfo.precision, pQueryInfo->fillType, pFillCol); } } @@ -551,7 +550,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm } // primary timestamp column is involved in final result - if (pQueryInfo->intervalTime != 0 || tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { + if (pQueryInfo->interval.interval != 0 || tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { numOfGroupByCols++; } @@ -568,7 +567,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm orderIdx[i] = startCols++; } - if (pQueryInfo->intervalTime != 0) { + if (pQueryInfo->interval.interval != 0) { // the first column is the timestamp, handles queries like "interval(10m) group by tags" orderIdx[numOfGroupByCols - 1] = PRIMARYKEY_TIMESTAMP_COL_INDEX; } @@ -612,12 +611,12 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage * super table interval query * if the order columns is the primary timestamp, all result data belongs to one group */ - assert(pQueryInfo->intervalTime > 0); + assert(pQueryInfo->interval.interval > 0); if (numOfCols == 1) { return true; } } else { // simple group by query - assert(pQueryInfo->intervalTime == 0); + assert(pQueryInfo->interval.interval == 0); } // only one row exists @@ -825,8 +824,7 @@ void savePrevRecordAndSetupFillInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQ if (pFillInfo != NULL) { int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey; - int64_t revisedSTime = - taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision); + int64_t revisedSTime = taosTimeTruncate(stime, &pQueryInfo->interval, tinfo.precision); taosResetFillInfo(pFillInfo, revisedSTime); } @@ -839,7 +837,7 @@ void savePrevRecordAndSetupFillInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQ } static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer, SQueryInfo* pQueryInfo) { - assert(pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE); + assert(pQueryInfo->interval.interval == 0 || pQueryInfo->fillType == TSDB_FILL_NONE); tFilePage * pBeforeFillData = pLocalReducer->pResultBuf; @@ -1220,7 +1218,7 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur #endif // no interval query, no fill operation - if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) { + if (pQueryInfo->interval.interval == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) { genFinalResWithoutFill(pRes, pLocalReducer, pQueryInfo); } else { SFillInfo* pFillInfo = pLocalReducer->pFillInfo; @@ -1258,13 +1256,10 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - int8_t precision = tinfo.precision; - // for group result interpolation, do not return if not data is generated if (pQueryInfo->fillType != TSDB_FILL_NONE) { TSKEY skey = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.skey:pQueryInfo->window.ekey;//MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey); - int64_t newTime = - taosGetIntervalStartTimestamp(skey, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision); + int64_t newTime = taosTimeTruncate(skey, &pQueryInfo->interval, tinfo.precision); taosResetFillInfo(pLocalReducer->pFillInfo, newTime); } } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 83d3bd35b2..7ed2e7dbde 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -596,18 +596,18 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ // interval is not null SStrToken* t = &pQuerySql->interval; - if (parseDuration(t->z, t->n, &pQueryInfo->intervalTime, &pQueryInfo->intervalTimeUnit) != TSDB_CODE_SUCCESS) { + if (parseDuration(t->z, t->n, &pQueryInfo->interval.interval, &pQueryInfo->interval.intervalUnit) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } - if (pQueryInfo->intervalTimeUnit != 'n' && pQueryInfo->intervalTimeUnit != 'y') { + if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit != 'y') { // if the unit of time window value is millisecond, change the value from microsecond if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) { - pQueryInfo->intervalTime = pQueryInfo->intervalTime / 1000; + pQueryInfo->interval.interval = pQueryInfo->interval.interval / 1000; } // interval cannot be less than 10 milliseconds - if (pQueryInfo->intervalTime < tsMinIntervalTime) { + if (pQueryInfo->interval.interval < tsMinIntervalTime) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } } @@ -641,7 +641,7 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ * check invalid SQL: * select tbname, tags_fields from super_table_name interval(1s) */ - if (tscQueryTags(pQueryInfo) && pQueryInfo->intervalTime > 0) { + if (tscQueryTags(pQueryInfo) && pQueryInfo->interval.interval > 0) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } @@ -679,34 +679,52 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ } int32_t parseOffsetClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { + const char* msg1 = "interval offset cannot be negative"; + const char* msg2 = "interval offset should be shorter than interval"; + const char* msg3 = "cannot use 'year' as offset when interval is 'month'"; + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - SStrToken* pOffset = &pQuerySql->offset; - if (pOffset->n == 0) { - pQueryInfo->offsetTimeUnit = pQueryInfo->offsetTimeUnit; - pQueryInfo->offsetTime = 0; + SStrToken* t = &pQuerySql->offset; + if (t->n == 0) { + pQueryInfo->interval.offsetUnit = pQueryInfo->interval.intervalUnit; + pQueryInfo->interval.offset = 0; return TSDB_CODE_SUCCESS; } - getTimestampInUsFromStr(pOffset->z, pOffset->n, &pQueryInfo->offsetTime); - if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) { - pQueryInfo->offsetTime /= 1000; - } - -/* - if (pQueryInfo->offsetTime < 0) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0); + if (parseDuration(t->z, t->n, &pQueryInfo->interval.offset, &pQueryInfo->interval.offsetUnit) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_TSC_INVALID_SQL; } - if (pQueryInfo->slidingTime >= pQueryInfo->intervalTime) { + if (pQueryInfo->interval.offset < 0) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if ((pQueryInfo->intervalTime != 0) && (pQueryInfo->intervalTime/pQueryInfo->slidingTime > INTERVAL_SLIDING_FACTOR)) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + if (pQueryInfo->interval.offsetUnit != 'n' && pQueryInfo->interval.offsetUnit != 'y') { + // if the unit of time window value is millisecond, change the value from microsecond + if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) { + pQueryInfo->interval.offset = pQueryInfo->interval.offset / 1000; + } + if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit != 'y') { + if (pQueryInfo->interval.offset >= pQueryInfo->interval.interval) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + } + } + } else if (pQueryInfo->interval.offsetUnit == pQueryInfo->interval.intervalUnit) { + if (pQueryInfo->interval.offset >= pQueryInfo->interval.interval) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + } + } else if (pQueryInfo->interval.intervalUnit == 'n' && pQueryInfo->interval.offsetUnit == 'y') { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); + } else if (pQueryInfo->interval.intervalUnit == 'y' && pQueryInfo->interval.offsetUnit == 'n') { + if (pQueryInfo->interval.interval * 12 <= pQueryInfo->interval.offset) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + } + } else { + // TODO: offset should be shorter than interval, but how to check + // conflicts like 30days offset and 1 month interval } - */ return TSDB_CODE_SUCCESS; } @@ -724,29 +742,29 @@ int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu SStrToken* pSliding = &pQuerySql->sliding; if (pSliding->n == 0) { - pQueryInfo->slidingTimeUnit = pQueryInfo->intervalTimeUnit; - pQueryInfo->slidingTime = pQueryInfo->intervalTime; + pQueryInfo->interval.slidingUnit = pQueryInfo->interval.intervalUnit; + pQueryInfo->interval.sliding = pQueryInfo->interval.interval; return TSDB_CODE_SUCCESS; } - if (pQueryInfo->intervalTimeUnit == 'n' || pQueryInfo->intervalTimeUnit == 'y') { + if (pQueryInfo->interval.intervalUnit == 'n' || pQueryInfo->interval.intervalUnit == 'y') { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } - getTimestampInUsFromStr(pSliding->z, pSliding->n, &pQueryInfo->slidingTime); + getTimestampInUsFromStr(pSliding->z, pSliding->n, &pQueryInfo->interval.sliding); if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) { - pQueryInfo->slidingTime /= 1000; + pQueryInfo->interval.sliding /= 1000; } - if (pQueryInfo->slidingTime < tsMinSlidingTime) { + if (pQueryInfo->interval.sliding < tsMinSlidingTime) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0); } - if (pQueryInfo->slidingTime > pQueryInfo->intervalTime) { + if (pQueryInfo->interval.sliding > pQueryInfo->interval.interval) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if ((pQueryInfo->intervalTime != 0) && (pQueryInfo->intervalTime/pQueryInfo->slidingTime > INTERVAL_SLIDING_FACTOR)) { + if ((pQueryInfo->interval.interval != 0) && (pQueryInfo->interval.interval/pQueryInfo->interval.sliding > INTERVAL_SLIDING_FACTOR)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } @@ -4758,9 +4776,9 @@ int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { const char* msg0 = "sample interval can not be less than 10ms."; const char* msg1 = "functions not allowed in select clause"; - if (pQueryInfo->intervalTime != 0 && pQueryInfo->intervalTime < 10 && - pQueryInfo->intervalTimeUnit != 'n' && - pQueryInfo->intervalTimeUnit != 'y') { + if (pQueryInfo->interval.interval != 0 && pQueryInfo->interval.interval < 10 && + pQueryInfo->interval.intervalUnit != 'n' && + pQueryInfo->interval.intervalUnit != 'y') { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0); } @@ -5545,7 +5563,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo insertResultField(pQueryInfo, (int32_t)size, &ids, bytes, (int8_t)type, name, pExpr); } else { // if this query is "group by" normal column, interval is not allowed - if (pQueryInfo->intervalTime > 0) { + if (pQueryInfo->interval.interval > 0) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } @@ -5578,7 +5596,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { // only retrieve tags, group by is not supportted if (tscQueryTags(pQueryInfo)) { - if (pQueryInfo->groupbyExpr.numOfGroupCols > 0 || pQueryInfo->intervalTime > 0) { + if (pQueryInfo->groupbyExpr.numOfGroupCols > 0 || pQueryInfo->interval.interval > 0) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4); } else { return TSDB_CODE_SUCCESS; @@ -6030,7 +6048,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { if (parseIntervalClause(pCmd, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } else { - if ((pQueryInfo->intervalTime > 0) && + if ((pQueryInfo->interval.interval > 0) && (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) { return TSDB_CODE_TSC_INVALID_SQL; } @@ -6060,7 +6078,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { * not here. */ if (pQuerySql->fillType != NULL) { - if (pQueryInfo->intervalTime == 0) { + if (pQueryInfo->interval.interval == 0) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } @@ -6228,7 +6246,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { if (parseIntervalClause(pCmd, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } else { - if ((pQueryInfo->intervalTime > 0) && + if ((pQueryInfo->interval.interval > 0) && (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) { return TSDB_CODE_TSC_INVALID_SQL; } @@ -6279,14 +6297,14 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { * the columns may be increased due to group by operation */ if (pQuerySql->fillType != NULL) { - if (pQueryInfo->intervalTime == 0 && (!tscIsPointInterpQuery(pQueryInfo))) { + if (pQueryInfo->interval.interval == 0 && (!tscIsPointInterpQuery(pQueryInfo))) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5); } - if (pQueryInfo->intervalTime > 0 && pQueryInfo->intervalTimeUnit != 'n' && pQueryInfo->intervalTimeUnit != 'y') { + if (pQueryInfo->interval.interval > 0 && pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit != 'y') { int64_t timeRange = ABS(pQueryInfo->window.skey - pQueryInfo->window.ekey); // number of result is not greater than 10,000,000 - if ((timeRange == 0) || (timeRange / pQueryInfo->intervalTime) > MAX_INTERVAL_TIME_WINDOW) { + if ((timeRange == 0) || (timeRange / pQueryInfo->interval.interval) > MAX_INTERVAL_TIME_WINDOW) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); } } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 5639719c15..c6e9cbafd7 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -647,8 +647,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_TSC_INVALID_SQL; } - if (pQueryInfo->intervalTime < 0) { - tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime); + if (pQueryInfo->interval.interval < 0) { + tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->interval.interval); return TSDB_CODE_TSC_INVALID_SQL; } @@ -675,10 +675,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->limit = htobe64(pQueryInfo->limit.limit); pQueryMsg->offset = htobe64(pQueryInfo->limit.offset); pQueryMsg->numOfCols = htons((int16_t)taosArrayGetSize(pQueryInfo->colList)); - pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime); - pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime); - pQueryMsg->intervalTimeUnit = pQueryInfo->intervalTimeUnit; - pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit; + pQueryMsg->interval.interval = htobe64(pQueryInfo->interval.interval); + pQueryMsg->interval.sliding = htobe64(pQueryInfo->interval.sliding); + pQueryMsg->interval.offset = htobe64(pQueryInfo->interval.offset); + pQueryMsg->interval.intervalUnit = pQueryInfo->interval.intervalUnit; + pQueryMsg->interval.slidingUnit = pQueryInfo->interval.slidingUnit; + pQueryMsg->interval.offsetUnit = pQueryInfo->interval.offsetUnit; pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 71ba76dc6c..99669ecd89 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -400,43 +400,43 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - if (pQueryInfo->intervalTimeUnit != 'n' && pQueryInfo->intervalTimeUnit != 'y' && pQueryInfo->intervalTime < minIntervalTime) { + if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit!= 'y' && pQueryInfo->interval.interval < minIntervalTime) { tscWarn("%p stream:%p, original sample interval:%ld too small, reset to:%" PRId64, pSql, pStream, - pQueryInfo->intervalTime, minIntervalTime); - pQueryInfo->intervalTime = minIntervalTime; + pQueryInfo->interval.interval, minIntervalTime); + pQueryInfo->interval.interval = minIntervalTime; } - pStream->intervalTimeUnit = pQueryInfo->intervalTimeUnit; - pStream->intervalTime = pQueryInfo->intervalTime; // it shall be derived from sql string + pStream->intervalTimeUnit = pQueryInfo->interval.intervalUnit; + pStream->intervalTime = pQueryInfo->interval.interval; // it shall be derived from sql string - if (pQueryInfo->slidingTime <= 0) { - pQueryInfo->slidingTime = pQueryInfo->intervalTime; - pQueryInfo->slidingTimeUnit = pQueryInfo->intervalTimeUnit; + if (pQueryInfo->interval.sliding <= 0) { + pQueryInfo->interval.sliding = pQueryInfo->interval.interval; + pQueryInfo->interval.slidingUnit = pQueryInfo->interval.intervalUnit; } int64_t minSlidingTime = (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime; - if (pQueryInfo->intervalTimeUnit != 'n' && pQueryInfo->intervalTimeUnit != 'y' && pQueryInfo->slidingTime < minSlidingTime) { + if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit!= 'y' && pQueryInfo->interval.sliding < minSlidingTime) { tscWarn("%p stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64, pSql, pStream, - pQueryInfo->slidingTime, minSlidingTime); + pQueryInfo->interval.sliding, minSlidingTime); - pQueryInfo->slidingTime = minSlidingTime; + pQueryInfo->interval.sliding = minSlidingTime; } - if (pQueryInfo->slidingTime > pQueryInfo->intervalTime) { + if (pQueryInfo->interval.sliding > pQueryInfo->interval.interval) { tscWarn("%p stream:%p, sliding value:%" PRId64 " can not be larger than interval range, reset to:%" PRId64, pSql, pStream, - pQueryInfo->slidingTime, pQueryInfo->intervalTime); + pQueryInfo->interval.sliding, pQueryInfo->interval.interval); - pQueryInfo->slidingTime = pQueryInfo->intervalTime; + pQueryInfo->interval.sliding = pQueryInfo->interval.interval; } - pStream->slidingTimeUnit = pQueryInfo->slidingTimeUnit; - pStream->slidingTime = pQueryInfo->slidingTime; + pStream->slidingTimeUnit = pQueryInfo->interval.slidingUnit; + pStream->slidingTime = pQueryInfo->interval.sliding; if (pStream->isProject) { - pQueryInfo->intervalTime = 0; // clear the interval value to avoid the force time window split by query processor - pQueryInfo->slidingTime = 0; + pQueryInfo->interval.interval = 0; // clear the interval value to avoid the force time window split by query processor + pQueryInfo->interval.sliding = 0; } } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 4246abf52d..c13e864293 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -113,7 +113,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ * in case of stable query, limit/offset is not applied here. the limit/offset is applied to the * final results which is acquired after the secondry merge of in the client. */ - if (pLimit->offset == 0 || pQueryInfo->intervalTime > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { + if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { if (win->skey > elem1.ts) { win->skey = elem1.ts; } @@ -178,6 +178,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in pSupporter->subqueryIndex = index; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); + memcpy(&pSupporter->interval, &pQueryInfo->interval, sizeof(pSupporter->interval)); pSupporter->limit = pQueryInfo->limit; STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index); @@ -307,6 +308,9 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { // set the second stage sub query for join process TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE); + memcpy(&pQueryInfo->interval, &pSupporter->interval, sizeof(pQueryInfo->interval)); + pQueryInfo->groupbyExpr = pSupporter->groupbyExpr; + tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond); pQueryInfo->colList = pSupporter->colList; @@ -1204,7 +1208,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter } pNew->cmd.numOfCols = 0; - pNewQueryInfo->intervalTime = 0; + pNewQueryInfo->interval.interval = 0; pSupporter->limit = pNewQueryInfo->limit; pNewQueryInfo->limit.limit = -1; @@ -2185,7 +2189,7 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) { } // primary key column cannot be null in interval query, no need to check - if (i == 0 && pQueryInfo->intervalTime > 0) { + if (i == 0 && pQueryInfo->interval.interval > 0) { continue; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 9b3f5927ab..cc0b844a8a 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1899,10 +1899,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); pNewQueryInfo->command = pQueryInfo->command; - pNewQueryInfo->intervalTimeUnit = pQueryInfo->intervalTimeUnit; - pNewQueryInfo->slidingTimeUnit = pQueryInfo->slidingTimeUnit; - pNewQueryInfo->intervalTime = pQueryInfo->intervalTime; - pNewQueryInfo->slidingTime = pQueryInfo->slidingTime; + memcpy(&pNewQueryInfo->interval, &pQueryInfo->interval, sizeof(pNewQueryInfo->interval)); pNewQueryInfo->type = pQueryInfo->type; pNewQueryInfo->window = pQueryInfo->window; pNewQueryInfo->limit = pQueryInfo->limit; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 94ac5dc995..e49e2caca1 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -460,12 +460,7 @@ typedef struct { int16_t order; int16_t orderColId; int16_t numOfCols; // the number of columns will be load from vnode - int64_t intervalTime; // time interval for aggregation, in million second - int64_t slidingTime; // value for sliding window - int64_t offsetTime; // start offset for interval query - char intervalTimeUnit; - char slidingTimeUnit; // time interval type, for revisement of interval(1d) - char offsetTimeUnit; + SInterval interval; uint16_t tagCondLen; // tag length in current query int16_t numOfGroupCols; // num of group by columns int16_t orderByIdx; diff --git a/src/os/inc/osTime.h b/src/os/inc/osTime.h index 97432ca241..c03fc1c36f 100644 --- a/src/os/inc/osTime.h +++ b/src/os/inc/osTime.h @@ -63,6 +63,19 @@ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) { } } + +typedef struct SInterval { + char intervalUnit; + char slidingUnit; + char offsetUnit; + int64_t interval; + int64_t sliding; + int64_t offset; +} SInterval; + +int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision); +int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision); + int32_t getTimestampInUsFromStr(char* token, int32_t tokenlen, int64_t* ts); int32_t parseDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit); diff --git a/src/os/src/detail/osTime.c b/src/os/src/detail/osTime.c index 9d8328a71b..c9c6130498 100644 --- a/src/os/src/detail/osTime.c +++ b/src/os/src/detail/osTime.c @@ -403,6 +403,102 @@ int32_t parseDuration(const char* token, int32_t tokenLen, int64_t* duration, ch return getTimestampInUsFromStrImpl(*duration, *unit, duration); } +int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) { + if (duration == 0) { + return t; + } + if (unit == 'y') { + duration *= 12; + } else if (unit != 'n') { + return t + duration; + } + + t /= 1000; + if (precision == TSDB_TIME_PRECISION_MICRO) { + t /= 1000; + } + + struct tm tm; + time_t tt = (time_t)t; + localtime_r(&tt, &tm); + int mon = tm.tm_year * 12 + tm.tm_mon + (int)duration; + tm.tm_year = mon / 12; + tm.tm_mon = mon % 12; + + t = mktime(&tm) * 1000L; + if (precision == TSDB_TIME_PRECISION_MICRO) { + t *= 1000L; + } + + return t; +} + +int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision) { + if (pInterval->sliding == 0) { + assert(pInterval->interval == 0); + return t; + } + + int64_t start = t; + if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { + start /= 1000; + if (precision == TSDB_TIME_PRECISION_MICRO) { + start /= 1000; + } + struct tm tm; + time_t t = (time_t)start; + localtime_r(&t, &tm); + tm.tm_sec = 0; + tm.tm_min = 0; + tm.tm_hour = 0; + tm.tm_mday = 1; + + if (pInterval->slidingUnit == 'y') { + tm.tm_mon = 0; + tm.tm_year = (int)(tm.tm_year / pInterval->sliding * pInterval->sliding); + } else { + int mon = tm.tm_year * 12 + tm.tm_mon; + mon = (int)(mon / pInterval->sliding * pInterval->sliding); + tm.tm_year = mon / 12; + tm.tm_mon = mon % 12; + } + + start = mktime(&tm) * 1000L; + if (precision == TSDB_TIME_PRECISION_MICRO) { + start *= 1000L; + } + } else { + int64_t delta = t - pInterval->interval; + int32_t factor = delta > 0 ? 1 : -1; + + start = (delta / pInterval->sliding + factor) * pInterval->sliding; + + if (pInterval->intervalUnit == 'd' || pInterval->intervalUnit == 'w') { + /* + * here we revised the start time of day according to the local time zone, + * but in case of DST, the start time of one day need to be dynamically decided. + */ + // todo refactor to extract function that is available for Linux/Windows/Mac platform + #if defined(WINDOWS) && _MSC_VER >= 1900 + // see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019 + int64_t timezone = _timezone; + int32_t daylight = _daylight; + char** tzname = _tzname; + #endif + + int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L; + start += timezone * t; + } + + int64_t end = start + pInterval->interval - 1; + if (end < t) { + start += pInterval->sliding; + } + } + + return taosTimeAdd(start, pInterval->offset, pInterval->intervalUnit, precision); +} + // internal function, when program is paused in debugger, // one can call this function from debugger to print a // timestamp as human readable string, for example (gdb): diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 169bf907c6..5d570821cb 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -132,12 +132,9 @@ typedef struct SQueryCostInfo { typedef struct SQuery { int16_t numOfCols; int16_t numOfTags; - char intervalTimeUnit; - char slidingTimeUnit; // interval data type, used for daytime revise SOrderVal order; STimeWindow window; - int64_t intervalTime; - int64_t slidingTime; // sliding time for sliding window query + SInterval interval; int16_t precision; int16_t numOfOutput; int16_t fillType; diff --git a/src/query/inc/qFill.h b/src/query/inc/qFill.h index 6b8dcb0bf9..6d44fee095 100644 --- a/src/query/inc/qFill.h +++ b/src/query/inc/qFill.h @@ -51,12 +51,11 @@ typedef struct SFillInfo { int32_t rowSize; // size of each row // char ** pTags; // tags value for current interpolation SFillTagColInfo* pTags; // tags value for filling gap - int64_t slidingTime; // sliding value to determine the number of result for a given time window + SInterval interval; char * prevValues; // previous row of data, to generate the interpolation results char * nextValues; // next row of data char** pData; // original result data block involved in filling data int32_t capacityInRows; // data buffer size in rows - int8_t slidingUnit; // sliding time unit int8_t precision; // time resoluation SFillColInfo* pFillCol; // column info for fill operations } SFillInfo; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index b873714c49..64d05f2205 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -131,21 +131,21 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { static void setQueryStatus(SQuery *pQuery, int8_t status); static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv); -#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0) +#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - if (pQuery->intervalTimeUnit != 'n' && pQuery->intervalTimeUnit != 'y') { - tw->skey += pQuery->slidingTime * factor; - tw->ekey = tw->skey + pQuery->intervalTime - 1; + if (pQuery->interval.intervalUnit != 'n' && pQuery->interval.intervalUnit != 'y') { + tw->skey += pQuery->interval.sliding * factor; + tw->ekey = tw->skey + pQuery->interval.interval - 1; return; } - int64_t key = tw->skey / 1000, interval = pQuery->intervalTime; + int64_t key = tw->skey / 1000, interval = pQuery->interval.interval; if (pQuery->precision == TSDB_TIME_PRECISION_MICRO) { key /= 1000; } - if (pQuery->intervalTimeUnit == 'y') { + if (pQuery->interval.intervalUnit == 'y') { interval *= 12; } @@ -510,10 +510,10 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value w.skey = pWindowResInfo->prevSKey; - if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { - w.ekey = taosAddNatualInterval(w.skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; + if (pQuery->interval.intervalUnit == 'n' || pQuery->interval.intervalUnit == 'y') { + w.ekey = taosTimeAdd(w.skey, pQuery->interval.interval, pQuery->interval.intervalUnit, pQuery->precision); } else { - w.ekey = w.skey + pQuery->intervalTime - 1; + w.ekey = w.skey + pQuery->interval.interval - 1; } } else { int32_t slot = curTimeWindowIndex(pWindowResInfo); @@ -522,23 +522,23 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t } if (w.skey > ts || w.ekey < ts) { - if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { - w.skey = taosGetIntervalStartTimestamp(ts, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision); - w.ekey = taosAddNatualInterval(w.skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; + if (pQuery->interval.intervalUnit == 'n' || pQuery->interval.intervalUnit == 'y') { + w.skey = taosTimeTruncate(ts, &pQuery->interval, pQuery->precision); + w.ekey = taosTimeAdd(w.skey, pQuery->interval.interval, pQuery->interval.intervalUnit, pQuery->precision) - 1; } else { int64_t st = w.skey; if (st > ts) { - st -= ((st - ts + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime; + st -= ((st - ts + pQuery->interval.sliding - 1) / pQuery->interval.sliding) * pQuery->interval.sliding; } - int64_t et = st + pQuery->intervalTime - 1; + int64_t et = st + pQuery->interval.interval - 1; if (et < ts) { - st += ((ts - et + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime; + st += ((ts - et + pQuery->interval.sliding - 1) / pQuery->interval.sliding) * pQuery->interval.sliding; } w.skey = st; - w.ekey = w.skey + pQuery->intervalTime - 1; + w.ekey = w.skey + pQuery->interval.interval - 1; } } @@ -848,7 +848,7 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow int32_t startPos = 0; // tumbling time window query, a special case of sliding time window query - if (pQuery->slidingTime == pQuery->intervalTime && prevPosition != -1) { + if (pQuery->interval.sliding == pQuery->interval.interval && prevPosition != -1) { int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); startPos = prevPosition + factor; } else { @@ -861,21 +861,21 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow */ if (QUERY_IS_ASC_QUERY(pQuery) && primaryKeys[startPos] > pNext->ekey) { TSKEY next = primaryKeys[startPos]; - if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { - pNext->skey = taosGetIntervalStartTimestamp(next, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision); - pNext->ekey = taosAddNatualInterval(pNext->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; + if (pQuery->interval.intervalUnit == 'n' || pQuery->interval.intervalUnit == 'y') { + pNext->skey = taosTimeTruncate(next, &pQuery->interval, pQuery->precision); + pNext->ekey = taosTimeAdd(pNext->skey, pQuery->interval.interval, pQuery->interval.intervalUnit, pQuery->precision) - 1; } else { - pNext->ekey += ((next - pNext->ekey + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime; - pNext->skey = pNext->ekey - pQuery->intervalTime + 1; + pNext->ekey += ((next - pNext->ekey + pQuery->interval.sliding - 1)/pQuery->interval.sliding) * pQuery->interval.sliding; + pNext->skey = pNext->ekey - pQuery->interval.interval + 1; } } else if ((!QUERY_IS_ASC_QUERY(pQuery)) && primaryKeys[startPos] < pNext->skey) { TSKEY next = primaryKeys[startPos]; - if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { - pNext->skey = taosGetIntervalStartTimestamp(next, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision); - pNext->ekey = taosAddNatualInterval(pNext->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; + if (pQuery->interval.intervalUnit == 'n' || pQuery->interval.intervalUnit == 'y') { + pNext->skey = taosTimeTruncate(next, &pQuery->interval, pQuery->precision); + pNext->ekey = taosTimeAdd(pNext->skey, pQuery->interval.interval, pQuery->interval.intervalUnit, pQuery->precision) - 1; } else { - pNext->skey -= ((pNext->skey - next + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime; - pNext->ekey = pNext->skey + pQuery->intervalTime - 1; + pNext->skey -= ((pNext->skey - next + pQuery->interval.sliding - 1) / pQuery->interval.sliding) * pQuery->interval.sliding; + pNext->ekey = pNext->skey + pQuery->interval.interval - 1; } } @@ -1871,20 +1871,20 @@ static bool onlyQueryTags(SQuery* pQuery) { ///////////////////////////////////////////////////////////////////////////////////////////// void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win) { - assert(key >= keyFirst && key <= keyLast && pQuery->slidingTime <= pQuery->intervalTime); - win->skey = taosGetIntervalStartTimestamp(key, pQuery->slidingTime, pQuery->intervalTime, pQuery->slidingTimeUnit, pQuery->precision); + assert(key >= keyFirst && key <= keyLast && pQuery->interval.sliding <= pQuery->interval.interval); + win->skey = taosTimeTruncate(key, &pQuery->interval, pQuery->precision); /* - * if the realSkey > INT64_MAX - pQuery->intervalTime, the query duration between + * if the realSkey > INT64_MAX - pQuery->interval.interval, the query duration between * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges. */ - if (keyFirst > (INT64_MAX - pQuery->intervalTime)) { - assert(keyLast - keyFirst < pQuery->intervalTime); + if (keyFirst > (INT64_MAX - pQuery->interval.interval)) { + assert(keyLast - keyFirst < pQuery->interval.interval); win->ekey = INT64_MAX; - } else if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { - win->ekey = taosAddNatualInterval(win->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; + } else if (pQuery->interval.intervalUnit == 'n' || pQuery->interval.intervalUnit == 'y') { + win->ekey = taosTimeAdd(win->skey, pQuery->interval.interval, pQuery->interval.intervalUnit, pQuery->precision) - 1; } else { - win->ekey = win->skey + pQuery->intervalTime - 1; + win->ekey = win->skey + pQuery->interval.interval - 1; } } @@ -1998,7 +1998,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo return; } - if (isPointInterpoQuery(pQuery) && pQuery->intervalTime == 0) { + if (isPointInterpoQuery(pQuery) && pQuery->interval.interval == 0) { if (!QUERY_IS_ASC_QUERY(pQuery)) { qDebug(msg, GET_QINFO_ADDR(pQuery), "interp", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); @@ -2009,7 +2009,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo return; } - if (pQuery->intervalTime == 0) { + if (pQuery->interval.interval == 0) { if (onlyFirstQuery(pQuery)) { if (!QUERY_IS_ASC_QUERY(pQuery)) { qDebug(msg, GET_QINFO_ADDR(pQuery), "only-first", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey, @@ -4269,8 +4269,8 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { } /* - * 1. for interval without interpolation query we forward pQuery->intervalTime at a time for - * pQuery->limit.offset times. Since hole exists, pQuery->intervalTime*pQuery->limit.offset value is + * 1. for interval without interpolation query we forward pQuery->interval.interval at a time for + * pQuery->limit.offset times. Since hole exists, pQuery->interval.interval*pQuery->limit.offset value is * not valid. otherwise, we only forward pQuery->limit.offset number of points */ assert(pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL); @@ -4569,7 +4569,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo getAlignQueryTimeWindow(pQuery, pQuery->window.skey, sk, ek, &w); pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, w.skey, 0, (int32_t)pQuery->rec.capacity, pQuery->numOfOutput, - pQuery->slidingTime, pQuery->slidingTimeUnit, (int8_t)pQuery->precision, + pQuery->interval.sliding, pQuery->interval.slidingUnit, (int8_t)pQuery->precision, pQuery->fillType, pColInfo); } @@ -5430,7 +5430,7 @@ static void stableQueryImpl(SQInfo *pQInfo) { (isFixedOutputQuery(pRuntimeEnv) && (!isPointInterpoQuery(pQuery)) && (!pRuntimeEnv->groupbyNormalCol))) { multiTableQueryProcess(pQInfo); } else { - assert((pQuery->checkBuffer == 1 && pQuery->intervalTime == 0) || isPointInterpoQuery(pQuery) || + assert((pQuery->checkBuffer == 1 && pQuery->interval.interval == 0) || isPointInterpoQuery(pQuery) || isFirstLastRowQuery(pQuery) || pRuntimeEnv->groupbyNormalCol); sequentialTableProcess(pQInfo); @@ -5476,8 +5476,8 @@ bool validateExprColumnInfo(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SC } static bool validateQueryMsg(SQueryTableMsg *pQueryMsg) { - if (pQueryMsg->intervalTime < 0) { - qError("qmsg:%p illegal value of interval time %" PRId64, pQueryMsg, pQueryMsg->intervalTime); + if (pQueryMsg->interval.interval < 0) { + qError("qmsg:%p illegal value of interval time %" PRId64, pQueryMsg, pQueryMsg->interval.interval); return false; } @@ -5556,8 +5556,12 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey); pQueryMsg->window.ekey = htobe64(pQueryMsg->window.ekey); - pQueryMsg->intervalTime = htobe64(pQueryMsg->intervalTime); - pQueryMsg->slidingTime = htobe64(pQueryMsg->slidingTime); + pQueryMsg->interval.interval = htobe64(pQueryMsg->interval.interval); + pQueryMsg->interval.sliding = htobe64(pQueryMsg->interval.sliding); + pQueryMsg->interval.offset = htobe64(pQueryMsg->interval.offset); + pQueryMsg->interval.intervalUnit = pQueryMsg->interval.intervalUnit; + pQueryMsg->interval.slidingUnit = pQueryMsg->interval.slidingUnit; + pQueryMsg->interval.offsetUnit = pQueryMsg->interval.offsetUnit; pQueryMsg->limit = htobe64(pQueryMsg->limit); pQueryMsg->offset = htobe64(pQueryMsg->offset); @@ -5770,7 +5774,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, qDebug("qmsg:%p query %d tables, type:%d, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, " "outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, compNumOfBlocks:%d, limit:%" PRId64 ", offset:%" PRId64, pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->queryType, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols, - pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime, + pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->interval.interval, pQueryMsg->fillType, pQueryMsg->tsLen, pQueryMsg->tsNumOfBlocks, pQueryMsg->limit, pQueryMsg->offset); return TSDB_CODE_SUCCESS; @@ -6109,10 +6113,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQuery->order.orderColId = pQueryMsg->orderColId; pQuery->pSelectExpr = pExprs; pQuery->pGroupbyExpr = pGroupbyExpr; - pQuery->intervalTime = pQueryMsg->intervalTime; - pQuery->slidingTime = pQueryMsg->slidingTime; - pQuery->intervalTimeUnit = pQueryMsg->intervalTimeUnit; - pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit; + memcpy(&pQuery->interval, &pQueryMsg->interval, sizeof(pQuery->interval)); pQuery->fillType = pQueryMsg->fillType; pQuery->numOfTags = pQueryMsg->numOfTags; pQuery->tagColList = pTagCols; diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index ddb63c5012..6ae36a160d 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -38,8 +38,8 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_ pFillInfo->numOfTags = numOfTags; pFillInfo->numOfCols = numOfCols; pFillInfo->precision = precision; - pFillInfo->slidingTime = slidingTime; - pFillInfo->slidingUnit = slidingUnit; + pFillInfo->interval.sliding = slidingTime; + pFillInfo->interval.slidingUnit = slidingUnit; pFillInfo->pData = malloc(POINTER_BYTES * numOfCols); if (numOfTags > 0) { @@ -121,7 +121,7 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) return; } - pFillInfo->endKey = taosGetRevisedEndKey(endKey, pFillInfo->order, pFillInfo->slidingTime, pFillInfo->slidingUnit, + pFillInfo->endKey = taosGetRevisedEndKey(endKey, pFillInfo->order, pFillInfo->interval.sliding, pFillInfo->interval.slidingUnit, pFillInfo->precision); pFillInfo->rowIdx = 0; @@ -172,17 +172,17 @@ int64_t getFilledNumOfRes(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows int32_t numOfRows = taosNumOfRemainRows(pFillInfo); - TSKEY ekey1 = taosGetRevisedEndKey(ekey, pFillInfo->order, pFillInfo->slidingTime, pFillInfo->slidingUnit, + TSKEY ekey1 = taosGetRevisedEndKey(ekey, pFillInfo->order, pFillInfo->interval.sliding, pFillInfo->interval.slidingUnit, pFillInfo->precision); int64_t numOfRes = -1; if (numOfRows > 0) { // still fill gap within current data block, not generating data after the result set. TSKEY lastKey = tsList[pFillInfo->numOfRows - 1]; - if (pFillInfo->slidingUnit != 'y' && pFillInfo->slidingUnit != 'n') { - numOfRes = (int64_t)(ABS(lastKey - pFillInfo->start) / pFillInfo->slidingTime) + 1; + if (pFillInfo->interval.slidingUnit != 'y' && pFillInfo->interval.slidingUnit != 'n') { + numOfRes = (int64_t)(ABS(lastKey - pFillInfo->start) / pFillInfo->interval.sliding) + 1; } else { - numOfRes = taosCountNatualInterval(lastKey, pFillInfo->start, pFillInfo->slidingTime, pFillInfo->slidingUnit, pFillInfo->precision) + 1; + numOfRes = taosCountNatualInterval(lastKey, pFillInfo->start, pFillInfo->interval.sliding, pFillInfo->interval.slidingUnit, pFillInfo->precision) + 1; } assert(numOfRes >= numOfRows); } else { // reach the end of data @@ -191,10 +191,10 @@ int64_t getFilledNumOfRes(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows return 0; } // the numOfRes rows are all filled with specified policy - if (pFillInfo->slidingUnit != 'y' && pFillInfo->slidingUnit != 'n') { - numOfRes = (ABS(ekey1 - pFillInfo->start) / pFillInfo->slidingTime) + 1; + if (pFillInfo->interval.slidingUnit != 'y' && pFillInfo->interval.slidingUnit != 'n') { + numOfRes = (ABS(ekey1 - pFillInfo->start) / pFillInfo->interval.sliding) + 1; } else { - numOfRes = taosCountNatualInterval(ekey1, pFillInfo->start, pFillInfo->slidingTime, pFillInfo->slidingUnit, pFillInfo->precision) + 1; + numOfRes = taosCountNatualInterval(ekey1, pFillInfo->start, pFillInfo->interval.sliding, pFillInfo->interval.slidingUnit, pFillInfo->precision) + 1; } } @@ -375,10 +375,10 @@ static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* nu } // TODO natual sliding time - if (pFillInfo->slidingUnit != 'n' && pFillInfo->slidingUnit != 'y') { - pFillInfo->start += (pFillInfo->slidingTime * step); + if (pFillInfo->interval.slidingUnit != 'n' && pFillInfo->interval.slidingUnit != 'y') { + pFillInfo->start += (pFillInfo->interval.sliding * step); } else { - pFillInfo->start = taosAddNatualInterval(pFillInfo->start, pFillInfo->slidingTime*step, pFillInfo->slidingUnit, pFillInfo->precision); + pFillInfo->start = taosAddNatualInterval(pFillInfo->start, pFillInfo->interval.sliding*step, pFillInfo->interval.slidingUnit, pFillInfo->precision); } pFillInfo->numOfCurrent++; @@ -487,10 +487,10 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu setTagsValue(pFillInfo, data, num); // TODO natual sliding time - if (pFillInfo->slidingUnit != 'n' && pFillInfo->slidingUnit != 'y') { - pFillInfo->start += (pFillInfo->slidingTime * step); + if (pFillInfo->interval.slidingUnit != 'n' && pFillInfo->interval.slidingUnit != 'y') { + pFillInfo->start += (pFillInfo->interval.sliding * step); } else { - pFillInfo->start = taosAddNatualInterval(pFillInfo->start, pFillInfo->slidingTime*step, pFillInfo->slidingUnit, pFillInfo->precision); + pFillInfo->start = taosAddNatualInterval(pFillInfo->start, pFillInfo->interval.sliding*step, pFillInfo->interval.slidingUnit, pFillInfo->precision); } pFillInfo->rowIdx += 1; diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index c195a0b76c..2bd92c74a4 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -54,7 +54,7 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun return TSDB_CODE_QRY_OUT_OF_MEMORY; } - pWindowResInfo->interval = pRuntimeEnv->pQuery->intervalTime; + pWindowResInfo->interval = pRuntimeEnv->pQuery->interval.interval; pSummary->internalSupSize += sizeof(SWindowResult) * threshold; pSummary->internalSupSize += (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultInfo) + pRuntimeEnv->interBufSize) * pWindowResInfo->capacity; -- GitLab