diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index f77897a74b77dfc9d168698bde927197eee8e9cc..46a576fa9aba85f12a91919d04ae2658d13f905c 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -70,6 +70,8 @@ typedef struct SJoinSupporter { SSubqueryState* pState; SSqlObj* pObj; // parent SqlObj int32_t subqueryIndex; // index of sub query + char intervalTimeUnit; + char slidingTimeUnit; int64_t intervalTime; // interval time int64_t slidingTime; // sliding time SLimitVal limit; // limit info diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 1fb8594588e05f533a3fb8b5458e3d5179e18560..3e99b644bd42d43c7288cbab2ed82ec35d84b90c 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4681,7 +4681,9 @@ int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { const char* msg0 = "sample interval can not be less than 10ms."; const char* msg1 = "functions not allowed in select clause"; - if (pQueryInfo->intervalTime != 0 && pQueryInfo->intervalTime < 10) { + if (pQueryInfo->intervalTime != 0 && pQueryInfo->intervalTime < 10 && + pQueryInfo->intervalTimeUnit != 'n' && + pQueryInfo->intervalTimeUnit != 'y') { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0); } @@ -6167,7 +6169,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5); } - if (pQueryInfo->intervalTime > 0) { + if (pQueryInfo->intervalTime > 0 && pQueryInfo->intervalTimeUnit != 'n' && pQueryInfo->intervalTimeUnit != 'y') { int64_t timeRange = ABS(pQueryInfo->window.skey - pQueryInfo->window.ekey); // number of result is not greater than 10,000,000 if ((timeRange == 0) || (timeRange / pQueryInfo->intervalTime) > MAX_INTERVAL_TIME_WINDOW) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ecb85472fc79a5a05f6ba3967624265b5155b83a..b36767dbb49d55be091784c6a5addb530ed19062 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -673,6 +673,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->numOfCols = htons((int16_t)taosArrayGetSize(pQueryInfo->colList)); pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime); pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime); + pQueryMsg->intervalTimeUnit = pQueryInfo->intervalTimeUnit; pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit; pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); pQueryMsg->numOfTags = htonl(numOfTags); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 2fb264c75652f2a22f2bab441cd241068bd58c54..7a626bfe5c629ffc2df0b4dd4aa4ef79c92bf3ef 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -178,6 +178,8 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in pSupporter->subqueryIndex = index; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); + pSupporter->intervalTimeUnit = pQueryInfo->intervalTimeUnit; + pSupporter->slidingTime = pQueryInfo->slidingTimeUnit; pSupporter->intervalTime = pQueryInfo->intervalTime; pSupporter->slidingTime = pQueryInfo->slidingTime; pSupporter->limit = pQueryInfo->limit; @@ -309,6 +311,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { // set the second stage sub query for join process TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE); + pQueryInfo->intervalTimeUnit = pSupporter->intervalTimeUnit; + pQueryInfo->slidingTimeUnit = pSupporter->slidingTimeUnit; pQueryInfo->intervalTime = pSupporter->intervalTime; pQueryInfo->slidingTime = pSupporter->slidingTime; pQueryInfo->groupbyExpr = pSupporter->groupbyExpr; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b61fd7e8c949eff743337c552876c1770f9f6d0a..49f7c91397d793c6795a599078c1ea7b7e490de7 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1830,6 +1830,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); pNewQueryInfo->command = pQueryInfo->command; + pNewQueryInfo->intervalTimeUnit = pQueryInfo->intervalTimeUnit; pNewQueryInfo->slidingTimeUnit = pQueryInfo->slidingTimeUnit; pNewQueryInfo->intervalTime = pQueryInfo->intervalTime; pNewQueryInfo->slidingTime = pQueryInfo->slidingTime; diff --git a/src/common/src/tname.c b/src/common/src/tname.c index 01945dbb0056ed2712f2f439753cdcfe3c9601fb..0c89d26bedd61a3d1b8d19bbc6c13469ef8af961 100644 --- a/src/common/src/tname.c +++ b/src/common/src/tname.c @@ -104,29 +104,60 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in if (slidingTime == 0) { return startTime; } + int64_t start = startTime; + if (timeUnit == 'n' || timeUnit == 'y') { + start /= 1000; + if (precision == TSDB_TIME_PRECISION_MICRO) { + start /= 1000; + } + struct tm tm; + time_t t = (time_t)start; + localtime_r(&t, &tm); + tm.tm_sec = 0; + tm.tm_min = 0; + tm.tm_hour = 0; + tm.tm_mday = 1; + + if (timeUnit == 'y') { + tm.tm_mon = 0; + tm.tm_year = tm.tm_year / slidingTime * slidingTime; + } else { + int mon = tm.tm_year * 12 + tm.tm_mon; + mon = mon / slidingTime * slidingTime; + tm.tm_year = mon / 12; + tm.tm_mon = mon % 12; + } - int64_t start = ((startTime - intervalTime) / slidingTime + 1) * slidingTime; - if (!(timeUnit == 'u' || timeUnit == 'a' || timeUnit == 'm' || timeUnit == 's' || timeUnit == 'h')) { - /* - * here we revised the start time of day according to the local time zone, - * but in case of DST, the start time of one day need to be dynamically decided. - */ - // todo refactor to extract function that is available for Linux/Windows/Mac platform -#if defined(WINDOWS) && _MSC_VER >= 1900 - // see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019 - int64_t timezone = _timezone; - int32_t daylight = _daylight; - char** tzname = _tzname; -#endif - - int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L; - start += timezone * t; - } + start = mktime(&tm) * 1000L; + if (precision == TSDB_TIME_PRECISION_MICRO) { + start *= 1000L; + } + } else { + start = ((start - intervalTime) / slidingTime + 1) * slidingTime; + + if (timeUnit == 'd' || timeUnit == 'w') { + /* + * here we revised the start time of day according to the local time zone, + * but in case of DST, the start time of one day need to be dynamically decided. + */ + // todo refactor to extract function that is available for Linux/Windows/Mac platform + #if defined(WINDOWS) && _MSC_VER >= 1900 + // see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019 + int64_t timezone = _timezone; + int32_t daylight = _daylight; + char** tzname = _tzname; + #endif + + int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L; + start += timezone * t; + } - int64_t end = start + intervalTime - 1; - if (end < startTime) { - start += slidingTime; + int64_t end = start + intervalTime - 1; + if (end < startTime) { + start += slidingTime; + } } + return start; } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 761a267ce53f9b2c54e0ea993ebd346564feee29..0fe63a740eecca26e4e8ffc639b8ea067aa3e37e 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -456,6 +456,7 @@ typedef struct { int64_t intervalTime; // time interval for aggregation, in million second int64_t intervalOffset; // start offset for interval query int64_t slidingTime; // value for sliding window + char intervalTimeUnit; char slidingTimeUnit; // time interval type, for revisement of interval(1d) uint16_t tagCondLen; // tag length in current query int16_t numOfGroupCols; // num of group by columns diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 7093495763a76ac4b9d9f86f413fbbc27d273b2a..25fb04fb9a56bd9218b37f10cb5577d00a7fbd51 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -132,11 +132,12 @@ typedef struct SQueryCostInfo { typedef struct SQuery { int16_t numOfCols; int16_t numOfTags; + char intervalTimeUnit; + char slidingTimeUnit; // interval data type, used for daytime revise SOrderVal order; STimeWindow window; int64_t intervalTime; int64_t slidingTime; // sliding time for sliding window query - char slidingTimeUnit; // interval data type, used for daytime revise int16_t precision; int16_t numOfOutput; int16_t fillType; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 4e2e31d2694712b14ed6d526a1ee960c0163398b..6c4046d7766d5bb3d9c9107c149b398ecae74a2d 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -137,13 +137,75 @@ static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv); #define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0) -// previous time window may not be of the same size of pQuery->intervalTime -#define GET_NEXT_TIMEWINDOW(_q, tw) \ - do { \ - int32_t factor = GET_FORWARD_DIRECTION_FACTOR((_q)->order.order); \ - (tw)->skey += ((_q)->slidingTime * factor); \ - (tw)->ekey = (tw)->skey + ((_q)->intervalTime - 1); \ - } while (0) +static int64_t addNatualInterval(int64_t key, int64_t intervalTime, char intervalTimeUnit, int precision) { + key /= 1000; + if (precision == TSDB_TIME_PRECISION_MICRO) { + key /= 1000; + } + + struct tm tm; + time_t t = (time_t)key; + localtime_r(&t, &tm); + + if (intervalTimeUnit == 'y') { + intervalTime *= 12; + } + + int mon = tm.tm_year * 12 + tm.tm_mon + intervalTime; + tm.tm_year = mon / 12; + tm.tm_mon = mon % 12; + + key = mktime(&tm) * 1000L; + + if (precision == TSDB_TIME_PRECISION_MICRO) { + key *= 1000L; + } + + return key; +} + +static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { + int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + if (pQuery->intervalTimeUnit != 'n' && pQuery->intervalTimeUnit != 'y') { + tw->skey += pQuery->slidingTime * factor; + tw->ekey = tw->skey + pQuery->intervalTime - 1; + return; + } + + int64_t key = tw->skey; + key /= 1000; + if (pQuery->precision == TSDB_TIME_PRECISION_MICRO) { + key /= 1000; + } + + struct tm tm; + time_t t = (time_t)key; + localtime_r(&t, &tm); + + if (pQuery->intervalTimeUnit == 'y') { + factor *= 12; + } + + int mon = tm.tm_year * 12 + tm.tm_mon; + mon += pQuery->intervalTime * factor; + tm.tm_year = mon / 12; + tm.tm_mon = mon % 12; + + tw->skey = mktime(&tm) * 1000L; + + mon += pQuery->intervalTime * factor; + tm.tm_year = mon / 12; + tm.tm_mon = mon % 12; + tw->ekey = mktime(&tm) * 1000L; + + if (pQuery->precision == TSDB_TIME_PRECISION_MICRO) { + tw->skey *= 1000L; + tw->ekey *= 1000L; + } + tw->ekey -= 1; +} + +#define GET_NEXT_TIMEWINDOW(_q, tw) getNextTimeWindow((_q), (tw)) #define SET_STABLE_QUERY_OVER(_q) ((_q)->tableIndex = (int32_t)((_q)->tableqinfoGroupInfo.numOfTables)) #define IS_STASBLE_QUERY_OVER(_q) ((_q)->tableIndex >= (int32_t)((_q)->tableqinfoGroupInfo.numOfTables)) @@ -467,9 +529,13 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t ts, SQuery *pQuery) { STimeWindow w = {0}; - if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value + if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value w.skey = pWindowResInfo->prevSKey; - w.ekey = w.skey + pQuery->intervalTime - 1; + if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { + w.ekey = addNatualInterval(w.skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; + } else { + w.ekey = w.skey + pQuery->intervalTime - 1; + } } else { int32_t slot = curTimeWindowIndex(pWindowResInfo); SWindowResult* pWindowRes = getWindowResult(pWindowResInfo, slot); @@ -477,19 +543,24 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t } if (w.skey > ts || w.ekey < ts) { - int64_t st = w.skey; + if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { + w.skey = taosGetIntervalStartTimestamp(ts, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision); + w.ekey = addNatualInterval(w.skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; + } else { + int64_t st = w.skey; - if (st > ts) { - st -= ((st - ts + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime; - } + if (st > ts) { + st -= ((st - ts + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime; + } - int64_t et = st + pQuery->intervalTime - 1; - if (et < ts) { - st += ((ts - et + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime; - } + int64_t et = st + pQuery->intervalTime - 1; + if (et < ts) { + st += ((ts - et + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime; + } - w.skey = st; - w.ekey = w.skey + pQuery->intervalTime - 1; + w.skey = st; + w.ekey = w.skey + pQuery->intervalTime - 1; + } } /* @@ -814,14 +885,22 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow */ if (QUERY_IS_ASC_QUERY(pQuery) && primaryKeys[startPos] > pNext->ekey) { TSKEY next = primaryKeys[startPos]; - - pNext->ekey += ((next - pNext->ekey + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime; - pNext->skey = pNext->ekey - pQuery->intervalTime + 1; + if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { + pNext->skey = taosGetIntervalStartTimestamp(next, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision); + pNext->ekey = addNatualInterval(pNext->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; + } else { + pNext->ekey += ((next - pNext->ekey + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime; + pNext->skey = pNext->ekey - pQuery->intervalTime + 1; + } } else if ((!QUERY_IS_ASC_QUERY(pQuery)) && primaryKeys[startPos] < pNext->skey) { TSKEY next = primaryKeys[startPos]; - - pNext->skey -= ((pNext->skey - next + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime; - pNext->ekey = pNext->skey + pQuery->intervalTime - 1; + if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { + pNext->skey = taosGetIntervalStartTimestamp(next, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision); + pNext->ekey = addNatualInterval(pNext->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; + } else { + pNext->skey -= ((pNext->skey - next + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime; + pNext->ekey = pNext->skey + pQuery->intervalTime - 1; + } } return startPos; @@ -1804,7 +1883,8 @@ void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int6 if (keyFirst > (INT64_MAX - pQuery->intervalTime)) { assert(keyLast - keyFirst < pQuery->intervalTime); win->ekey = INT64_MAX; - return; + } else if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { + win->ekey = addNatualInterval(win->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; } else { win->ekey = win->skey + pQuery->intervalTime - 1; } @@ -6016,6 +6096,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, pQuery->pGroupbyExpr = pGroupbyExpr; pQuery->intervalTime = pQueryMsg->intervalTime; pQuery->slidingTime = pQueryMsg->slidingTime; + pQuery->intervalTimeUnit = pQueryMsg->intervalTimeUnit; pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit; pQuery->fillType = pQueryMsg->fillType; pQuery->numOfTags = pQueryMsg->numOfTags;