diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 17c4e7fc5545a566e3176fd6f695a32d854e3a8d..023594d8659ec8dcbc5db182959e08c09acf086f 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -366,8 +366,6 @@ typedef struct SSqlStream { uint32_t streamId; char listed; bool isProject; - char intervalTimeUnit; - char slidingTimeUnit; int16_t precision; int64_t num; // number of computing count @@ -381,8 +379,7 @@ typedef struct SSqlStream { int64_t ctime; // stream created time int64_t stime; // stream next executed time int64_t etime; // stream end query time, when time is larger then etime, the stream will be closed - int64_t intervalTime; - int64_t slidingTime; + SInterval interval; void * pTimer; void (*fp)(); diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index b8c38302046c8df4ea33d98633dd82c4e3a0ff40..f2f997a59494ab7ab4e8da31edb4b819730e30c4 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -259,11 +259,11 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { pSdesc->num = htobe64(pStream->num); pSdesc->useconds = htobe64(pStream->useconds); - pSdesc->stime = htobe64(pStream->stime - pStream->intervalTime); + pSdesc->stime = htobe64(pStream->stime - pStream->interval.interval); pSdesc->ctime = htobe64(pStream->ctime); - pSdesc->slidingTime = htobe64(pStream->slidingTime); - pSdesc->interval = htobe64(pStream->intervalTime); + pSdesc->slidingTime = htobe64(pStream->interval.sliding); + pSdesc->interval = htobe64(pStream->interval.interval); pHeartbeat->numOfStreams++; pSdesc++; diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index a1b4ba0bf4994b89758380debef91c3114c361a6..4a1f4d9d87ff25450826cbf4715d3200685d1fe7 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -51,7 +51,7 @@ static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, in int64_t retryDelta = (int64_t)(tsStreamCompRetryDelay * retryRangeFactor); retryDelta = ((rand() % retryDelta) + tsStreamCompRetryDelay) * 1000L; - if (pStream->intervalTimeUnit != 'n' && pStream->intervalTimeUnit != 'y') { + if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') { // change to ms if (prec == TSDB_TIME_PRECISION_MICRO) { slidingTime = slidingTime / 1000; @@ -87,7 +87,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { // failed to get meter/metric meta, retry in 10sec. if (code != TSDB_CODE_SUCCESS) { - int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->slidingTime, pStream->precision); + int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision); tscDebug("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime); tscSetRetryTimer(pStream, pSql, retryDelayTime); @@ -132,15 +132,16 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { } if (etime > pStream->etime) { etime = pStream->etime; - } else if (pStream->intervalTimeUnit != 'y' && pStream->intervalTimeUnit != 'n') { - etime = pStream->stime + (etime - pStream->stime) / pStream->intervalTime * pStream->intervalTime; + } else if (pStream->interval.intervalUnit != 'y' && pStream->interval.intervalUnit != 'n') { + etime = pStream->stime + (etime - pStream->stime) / pStream->interval.interval * pStream->interval.interval; } else { - etime = taosGetIntervalStartTimestamp(etime, pStream->slidingTime, pStream->intervalTime, pStream->slidingTimeUnit, pStream->precision); + etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision); + //etime = taosGetIntervalStartTimestamp(etime, pStream->interval.sliding, pStream->interval.sliding, pStream->interval.slidingUnit, pStream->precision); } pQueryInfo->window.ekey = etime; if (pQueryInfo->window.skey >= pQueryInfo->window.ekey) { - int64_t timer = pStream->slidingTime; - if (pStream->intervalTimeUnit == 'y' || pStream->intervalTimeUnit == 'n') { + int64_t timer = pStream->interval.sliding; + if (pStream->interval.intervalUnit == 'y' || pStream->interval.intervalUnit == 'n') { timer = 86400 * 1000l; } else if (pStream->precision == TSDB_TIME_PRECISION_MICRO) { timer /= 1000l; @@ -162,7 +163,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows) { SSqlStream *pStream = (SSqlStream *)param; if (tres == NULL || numOfRows < 0) { - int64_t retryDelay = tscGetRetryDelayTime(pStream, pStream->slidingTime, pStream->precision); + int64_t retryDelay = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision); tscError("%p stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql, pStream, numOfRows, retryDelay); @@ -223,7 +224,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf SSqlObj * pSql = (SSqlObj *)res; if (pSql == NULL || numOfRows < 0) { - int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->slidingTime, pStream->precision); + int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision); tscError("%p stream:%p, retrieve data failed, code:0x%08x, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime); tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime); @@ -246,7 +247,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf } if (!pStream->isProject) { - pStream->stime = taosTimeAdd(pStream->stime, pStream->slidingTime, pStream->slidingTimeUnit, pStream->precision); + pStream->stime = taosTimeAdd(pStream->stime, pStream->interval.sliding, pStream->interval.slidingUnit, pStream->precision); } // actually only one row is returned. this following is not necessary taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream); @@ -306,7 +307,7 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) now + timer, timer, delay, pStream->stime, etime); } else { tscDebug("%p stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql, pStream, - pStream->stime, timer, delay, pStream->stime - pStream->intervalTime, pStream->stime - 1); + pStream->stime, timer, delay, pStream->stime - pStream->interval.interval, pStream->stime - 1); } pSql->cmd.command = TSDB_SQL_SELECT; @@ -320,12 +321,12 @@ static int64_t getLaunchTimeDelay(const SSqlStream* pStream) { (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMaxStreamComputDelay * 1000L : tsMaxStreamComputDelay; int64_t delayDelta = maxDelay; - if (pStream->intervalTimeUnit != 'n' && pStream->intervalTimeUnit != 'y') { - delayDelta = (int64_t)(pStream->slidingTime * tsStreamComputDelayRatio); + if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') { + delayDelta = (int64_t)(pStream->interval.sliding * tsStreamComputDelayRatio); if (delayDelta > maxDelay) { delayDelta = maxDelay; } - int64_t remainTimeWindow = pStream->slidingTime - delayDelta; + int64_t remainTimeWindow = pStream->interval.sliding - delayDelta; if (maxDelay > remainTimeWindow) { maxDelay = (int64_t)(remainTimeWindow / 1.5f); } @@ -333,8 +334,8 @@ static int64_t getLaunchTimeDelay(const SSqlStream* pStream) { int64_t currentDelay = (rand() % maxDelay); // a random number currentDelay += delayDelta; - if (pStream->intervalTimeUnit != 'n' && pStream->intervalTimeUnit != 'y') { - assert(currentDelay < pStream->slidingTime); + if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') { + assert(currentDelay < pStream->interval.sliding); } return currentDelay; @@ -349,7 +350,7 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { * for project query, no mater fetch data successfully or not, next launch will issue * more than the sliding time window */ - timer = pStream->slidingTime; + timer = pStream->interval.sliding; if (pStream->stime > pStream->etime) { tscDebug("%p stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql, pStream, pStream->stime, pStream->etime); @@ -362,7 +363,8 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { return; } } else { - int64_t stime = taosGetIntervalStartTimestamp(pStream->stime - 1, pStream->intervalTime, pStream->intervalTime, pStream->intervalTimeUnit, pStream->precision); + int64_t stime = taosTimeTruncate(pStream->stime - 1, &pStream->interval, pStream->precision); + //int64_t stime = taosGetIntervalStartTimestamp(pStream->stime - 1, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision); if (stime >= pStream->etime) { tscDebug("%p stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql, pStream, pStream->stime, pStream->etime); @@ -402,8 +404,8 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { pQueryInfo->interval.interval = minIntervalTime; } - pStream->intervalTimeUnit = pQueryInfo->interval.intervalUnit; - pStream->intervalTime = pQueryInfo->interval.interval; // it shall be derived from sql string + pStream->interval.intervalUnit = pQueryInfo->interval.intervalUnit; + pStream->interval.interval = pQueryInfo->interval.interval; // it shall be derived from sql string if (pQueryInfo->interval.sliding <= 0) { pQueryInfo->interval.sliding = pQueryInfo->interval.interval; @@ -427,8 +429,8 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { pQueryInfo->interval.sliding = pQueryInfo->interval.interval; } - pStream->slidingTimeUnit = pQueryInfo->interval.slidingUnit; - pStream->slidingTime = pQueryInfo->interval.sliding; + pStream->interval.slidingUnit = pQueryInfo->interval.slidingUnit; + pStream->interval.sliding = pQueryInfo->interval.sliding; if (pStream->isProject) { pQueryInfo->interval.interval = 0; // clear the interval value to avoid the force time window split by query processor @@ -441,8 +443,8 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in if (pStream->isProject) { // no data in table, flush all data till now to destination meter, 10sec delay - pStream->intervalTime = tsProjectExecInterval; - pStream->slidingTime = tsProjectExecInterval; + pStream->interval.interval = tsProjectExecInterval; + pStream->interval.sliding = tsProjectExecInterval; if (stime != 0) { // first projection start from the latest event timestamp assert(stime >= pQueryInfo->window.skey); @@ -455,12 +457,15 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in stime = pQueryInfo->window.skey; if (stime == INT64_MIN) { stime = (int64_t)taosGetTimestamp(pStream->precision); - stime = taosGetIntervalStartTimestamp(stime, pStream->intervalTime, pStream->intervalTime, pStream->intervalTimeUnit, pStream->precision); - stime = taosGetIntervalStartTimestamp(stime - 1, pStream->intervalTime, pStream->intervalTime, pStream->intervalTimeUnit, pStream->precision); + stime = taosTimeTruncate(stime, &pStream->interval, pStream->precision); + stime = taosTimeTruncate(stime - 1, &pStream->interval, pStream->precision); + //stime = taosGetIntervalStartTimestamp(stime, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision); + //stime = taosGetIntervalStartTimestamp(stime - 1, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision); tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64, pSql, pStream, stime); } } else { - int64_t newStime = taosGetIntervalStartTimestamp(stime, pStream->intervalTime, pStream->intervalTime, pStream->intervalTimeUnit, pStream->precision); + //int64_t newStime = taosGetIntervalStartTimestamp(stime, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision); + int64_t newStime = taosTimeTruncate(stime, &pStream->interval, pStream->precision); if (newStime != stime) { tscWarn("%p stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64, pSql, pStream, stime, newStime); stime = newStime; @@ -530,7 +535,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer); tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql, - pStream, pTableMetaInfo->name, pStream->intervalTime, pStream->slidingTime, starttime, pSql->sqlstr); + pStream, pTableMetaInfo->name, pStream->interval.interval, pStream->interval.sliding, starttime, pSql->sqlstr); } TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), diff --git a/src/common/inc/tname.h b/src/common/inc/tname.h index b21d93fe8d24b18a39cc098007051603967c36ea..6b73d98b81fa8cbcce8d322f566d8b6709396f1c 100644 --- a/src/common/inc/tname.h +++ b/src/common/inc/tname.h @@ -35,7 +35,6 @@ bool tscValidateTableNameLength(size_t len); SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters); -int32_t taosCountNatualInterval(int64_t skey, int64_t ekey, int64_t intervalTime, char timeUnit, int16_t precision); -int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision); +// int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision); #endif // TDENGINE_NAME_H diff --git a/src/common/src/tname.c b/src/common/src/tname.c index 4193b770d25c2aa6b287e4cd1b06e5831a683c83..8879e9e7979be5b019b3048389105faa86ac9225 100644 --- a/src/common/src/tname.c +++ b/src/common/src/tname.c @@ -99,35 +99,7 @@ SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numO return pFilter; } -int32_t taosCountNatualInterval(int64_t skey, int64_t ekey, int64_t intervalTime, char timeUnit, int16_t precision) { - skey /= 1000; - ekey /= 1000; - if (precision == TSDB_TIME_PRECISION_MICRO) { - skey /= 1000; - ekey /= 1000; - } - if (ekey < skey) { - int64_t tmp = ekey; - ekey = skey; - skey = tmp; - } - - struct tm tm; - time_t t = (time_t)skey; - localtime_r(&t, &tm); - int smon = tm.tm_year * 12 + tm.tm_mon; - - t = (time_t)ekey; - localtime_r(&t, &tm); - int emon = tm.tm_year * 12 + tm.tm_mon; - - if (timeUnit == 'y') { - intervalTime *= 12; - } - - return (emon - smon) / (int32_t)intervalTime; -} - +#if 0 int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision) { if (slidingTime == 0) { return startTime; @@ -192,6 +164,8 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in return start; } +#endif + /* * tablePrefix.columnName * extract table name and save it in pTable, with only column name in pToken diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 17cbbca83459b37da80490558c5639e5d5780c34..9a411d8f858b9f6f90651e8f4d40d6f3c46a02f5 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -101,6 +101,7 @@ extern const int32_t TYPE_BYTES[11]; #define TSDB_TIME_PRECISION_MILLI 0 #define TSDB_TIME_PRECISION_MICRO 1 #define TSDB_TIME_PRECISION_NANO 2 +#define TSDB_TICK_PER_SECOND(precision) ((precision)==TSDB_TIME_PRECISION_MILLI ? 1e3L : ((precision)==TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L)) #define TSDB_TIME_PRECISION_MILLI_STR "ms" #define TSDB_TIME_PRECISION_MICRO_STR "us" diff --git a/src/os/inc/osTime.h b/src/os/inc/osTime.h index c03fc1c36f242b14433c0b006989526de94e4413..2bc24caa14bc2b9c2022c37099af658cb0728255 100644 --- a/src/os/inc/osTime.h +++ b/src/os/inc/osTime.h @@ -75,6 +75,7 @@ typedef struct SInterval { int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision); int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision); +int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision); int32_t getTimestampInUsFromStr(char* token, int32_t tokenlen, int64_t* ts); int32_t parseDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit); diff --git a/src/os/src/detail/osTime.c b/src/os/src/detail/osTime.c index 21b972c7c8ada1a7aaf525f0561072f88e02decd..5e6922cd512ed78e7e26de4941f55caf2516dee6 100644 --- a/src/os/src/detail/osTime.c +++ b/src/os/src/detail/osTime.c @@ -413,24 +413,43 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) { return t + duration; } - t /= 1000; - if (precision == TSDB_TIME_PRECISION_MICRO) { - t /= 1000; - } - struct tm tm; - time_t tt = (time_t)t; + time_t tt = (time_t)(t / TSDB_TICK_PER_SECOND(precision)); localtime_r(&tt, &tm); int mon = tm.tm_year * 12 + tm.tm_mon + (int)duration; tm.tm_year = mon / 12; tm.tm_mon = mon % 12; - t = mktime(&tm) * 1000L; - if (precision == TSDB_TIME_PRECISION_MICRO) { - t *= 1000L; + return mktime(&tm) * TSDB_TICK_PER_SECOND(precision); +} + +int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision) { + if (ekey < skey) { + int64_t tmp = ekey; + ekey = skey; + skey = tmp; } + if (unit != 'n' && unit != 'y') { + return (int32_t)((ekey - skey) / interval); + } + + skey /= TSDB_TICK_PER_SECOND(precision); + ekey /= TSDB_TICK_PER_SECOND(precision); - return t; + struct tm tm; + time_t t = (time_t)skey; + localtime_r(&t, &tm); + int smon = tm.tm_year * 12 + tm.tm_mon; + + t = (time_t)ekey; + localtime_r(&t, &tm); + int emon = tm.tm_year * 12 + tm.tm_mon; + + if (unit == 'y') { + interval *= 12; + } + + return (emon - smon) / (int32_t)interval; } int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision) { @@ -440,14 +459,11 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio } int64_t start = t; - if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { - start /= 1000; - if (precision == TSDB_TIME_PRECISION_MICRO) { - start /= 1000; - } + if (pInterval->slidingUnit == 'n' || pInterval->slidingUnit == 'y') { + start /= TSDB_TICK_PER_SECOND(precision); struct tm tm; - time_t t = (time_t)start; - localtime_r(&t, &tm); + time_t tt = (time_t)start; + localtime_r(&tt, &tm); tm.tm_sec = 0; tm.tm_min = 0; tm.tm_hour = 0; @@ -463,10 +479,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio tm.tm_mon = mon % 12; } - start = mktime(&tm) * 1000L; - if (precision == TSDB_TIME_PRECISION_MICRO) { - start *= 1000L; - } + start = mktime(&tm) * TSDB_TICK_PER_SECOND(precision); } else { int64_t delta = t - pInterval->interval; int32_t factor = delta > 0 ? 1 : -1; @@ -486,8 +499,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio char** tzname = _tzname; #endif - int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L; - start += timezone * t; + start += timezone * TSDB_TICK_PER_SECOND(precision); } int64_t end = start + pInterval->interval - 1; @@ -496,7 +508,13 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio } } - return taosTimeAdd(start, pInterval->offset, pInterval->offsetUnit, precision); + if (pInterval->offset > 0) { + start = taosTimeAdd(start, pInterval->offset, pInterval->offsetUnit, precision); + if (start > t) { + start = taosTimeAdd(start, -pInterval->interval, pInterval->intervalUnit, precision); + } + } + return start; } // internal function, when program is paused in debugger, @@ -507,24 +525,38 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio // 2020-07-03 17:48:42 // and the parameter can also be a variable. const char* fmtts(int64_t ts) { - static char buf[32]; + static char buf[96]; + size_t pos = 0; + struct tm tm; - time_t tt; if (ts > -62135625943 && ts < 32503651200) { - tt = ts; - } else if (ts > -62135625943000 && ts < 32503651200000) { - tt = ts / 1000; - } else { - tt = ts / 1000000; + time_t t = (time_t)ts; + localtime_r(&t, &tm); + pos += strftime(buf + pos, sizeof(buf), "s=%Y-%m-%d %H:%M:%S", &tm); } - struct tm* ptm = localtime(&tt); - size_t pos = strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", ptm); + if (ts > -62135625943000 && ts < 32503651200000) { + time_t t = (time_t)(ts / 1000); + localtime_r(&t, &tm); + if (pos > 0) { + buf[pos++] = ' '; + buf[pos++] = '|'; + buf[pos++] = ' '; + } + pos += strftime(buf + pos, sizeof(buf), "ms=%Y-%m-%d %H:%M:%S", &tm); + pos += sprintf(buf + pos, ".%03d", (int)(ts % 1000)); + } - if (ts <= -62135625943000 || ts >= 32503651200000) { - sprintf(buf + pos, ".%06d", (int)(ts % 1000000)); - } else if (ts <= -62135625943 || ts >= 32503651200) { - sprintf(buf + pos, ".%03d", (int)(ts % 1000)); + { + time_t t = (time_t)(ts / 1000000); + localtime_r(&t, &tm); + if (pos > 0) { + buf[pos++] = ' '; + buf[pos++] = '|'; + buf[pos++] = ' '; + } + pos += strftime(buf + pos, sizeof(buf), "us=%Y-%m-%d %H:%M:%S", &tm); + pos += sprintf(buf + pos, ".%06d", (int)(ts % 1000000)); } return buf; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 64d05f2205879b12d4a25fb50b2f7ef80e9c234d..e26eeefb6a4b334aa81ac4a070558f8e80e1159b 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -550,8 +550,6 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t w.ekey = pQuery->window.ekey; } - assert(ts >= w.skey && ts <= w.ekey); - return w; } diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index d0ecd22739e6363fa66ac4b925f8734aa296c46c..8660d9f4fe59b72aa88a058c922369d313e49b48 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -174,24 +174,26 @@ int64_t getFilledNumOfRes(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows int64_t numOfRes = -1; if (numOfRows > 0) { // still fill gap within current data block, not generating data after the result set. TSKEY lastKey = tsList[pFillInfo->numOfRows - 1]; - - if (pFillInfo->interval.slidingUnit != 'y' && pFillInfo->interval.slidingUnit != 'n') { - numOfRes = (int64_t)(ABS(lastKey - pFillInfo->start) / pFillInfo->interval.sliding) + 1; - } else { - numOfRes = taosCountNatualInterval(lastKey, pFillInfo->start, pFillInfo->interval.sliding, pFillInfo->interval.slidingUnit, pFillInfo->precision) + 1; - } + numOfRes = taosTimeCountInterval( + lastKey, + pFillInfo->start, + pFillInfo->interval.sliding, + pFillInfo->interval.slidingUnit, + pFillInfo->precision); + numOfRes += 1; assert(numOfRes >= numOfRows); } else { // reach the end of data if ((ekey1 < pFillInfo->start && FILL_IS_ASC_FILL(pFillInfo)) || (ekey1 > pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo))) { return 0; } - // the numOfRes rows are all filled with specified policy - if (pFillInfo->interval.slidingUnit != 'y' && pFillInfo->interval.slidingUnit != 'n') { - numOfRes = (ABS(ekey1 - pFillInfo->start) / pFillInfo->interval.sliding) + 1; - } else { - numOfRes = taosCountNatualInterval(ekey1, pFillInfo->start, pFillInfo->interval.sliding, pFillInfo->interval.slidingUnit, pFillInfo->precision) + 1; - } + numOfRes = taosTimeCountInterval( + ekey1, + pFillInfo->start, + pFillInfo->interval.sliding, + pFillInfo->interval.slidingUnit, + pFillInfo->precision); + numOfRes += 1; } return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes;