提交 6fa86bfb 编写于 作者: B Bomin Zhang

td-1245: fix crash and refactor

上级 85aca0c2
...@@ -366,8 +366,6 @@ typedef struct SSqlStream { ...@@ -366,8 +366,6 @@ typedef struct SSqlStream {
uint32_t streamId; uint32_t streamId;
char listed; char listed;
bool isProject; bool isProject;
char intervalTimeUnit;
char slidingTimeUnit;
int16_t precision; int16_t precision;
int64_t num; // number of computing count int64_t num; // number of computing count
...@@ -381,8 +379,7 @@ typedef struct SSqlStream { ...@@ -381,8 +379,7 @@ typedef struct SSqlStream {
int64_t ctime; // stream created time int64_t ctime; // stream created time
int64_t stime; // stream next executed time int64_t stime; // stream next executed time
int64_t etime; // stream end query time, when time is larger then etime, the stream will be closed int64_t etime; // stream end query time, when time is larger then etime, the stream will be closed
int64_t intervalTime; SInterval interval;
int64_t slidingTime;
void * pTimer; void * pTimer;
void (*fp)(); void (*fp)();
......
...@@ -259,11 +259,11 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { ...@@ -259,11 +259,11 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
pSdesc->num = htobe64(pStream->num); pSdesc->num = htobe64(pStream->num);
pSdesc->useconds = htobe64(pStream->useconds); pSdesc->useconds = htobe64(pStream->useconds);
pSdesc->stime = htobe64(pStream->stime - pStream->intervalTime); pSdesc->stime = htobe64(pStream->stime - pStream->interval.interval);
pSdesc->ctime = htobe64(pStream->ctime); pSdesc->ctime = htobe64(pStream->ctime);
pSdesc->slidingTime = htobe64(pStream->slidingTime); pSdesc->slidingTime = htobe64(pStream->interval.sliding);
pSdesc->interval = htobe64(pStream->intervalTime); pSdesc->interval = htobe64(pStream->interval.interval);
pHeartbeat->numOfStreams++; pHeartbeat->numOfStreams++;
pSdesc++; pSdesc++;
......
...@@ -51,7 +51,7 @@ static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, in ...@@ -51,7 +51,7 @@ static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, in
int64_t retryDelta = (int64_t)(tsStreamCompRetryDelay * retryRangeFactor); int64_t retryDelta = (int64_t)(tsStreamCompRetryDelay * retryRangeFactor);
retryDelta = ((rand() % retryDelta) + tsStreamCompRetryDelay) * 1000L; retryDelta = ((rand() % retryDelta) + tsStreamCompRetryDelay) * 1000L;
if (pStream->intervalTimeUnit != 'n' && pStream->intervalTimeUnit != 'y') { if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') {
// change to ms // change to ms
if (prec == TSDB_TIME_PRECISION_MICRO) { if (prec == TSDB_TIME_PRECISION_MICRO) {
slidingTime = slidingTime / 1000; slidingTime = slidingTime / 1000;
...@@ -87,7 +87,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { ...@@ -87,7 +87,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
// failed to get meter/metric meta, retry in 10sec. // failed to get meter/metric meta, retry in 10sec.
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->slidingTime, pStream->precision); int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
tscDebug("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime); tscDebug("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime);
tscSetRetryTimer(pStream, pSql, retryDelayTime); tscSetRetryTimer(pStream, pSql, retryDelayTime);
...@@ -132,15 +132,16 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { ...@@ -132,15 +132,16 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
} }
if (etime > pStream->etime) { if (etime > pStream->etime) {
etime = pStream->etime; etime = pStream->etime;
} else if (pStream->intervalTimeUnit != 'y' && pStream->intervalTimeUnit != 'n') { } else if (pStream->interval.intervalUnit != 'y' && pStream->interval.intervalUnit != 'n') {
etime = pStream->stime + (etime - pStream->stime) / pStream->intervalTime * pStream->intervalTime; etime = pStream->stime + (etime - pStream->stime) / pStream->interval.interval * pStream->interval.interval;
} else { } else {
etime = taosGetIntervalStartTimestamp(etime, pStream->slidingTime, pStream->intervalTime, pStream->slidingTimeUnit, pStream->precision); etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision);
//etime = taosGetIntervalStartTimestamp(etime, pStream->interval.sliding, pStream->interval.sliding, pStream->interval.slidingUnit, pStream->precision);
} }
pQueryInfo->window.ekey = etime; pQueryInfo->window.ekey = etime;
if (pQueryInfo->window.skey >= pQueryInfo->window.ekey) { if (pQueryInfo->window.skey >= pQueryInfo->window.ekey) {
int64_t timer = pStream->slidingTime; int64_t timer = pStream->interval.sliding;
if (pStream->intervalTimeUnit == 'y' || pStream->intervalTimeUnit == 'n') { if (pStream->interval.intervalUnit == 'y' || pStream->interval.intervalUnit == 'n') {
timer = 86400 * 1000l; timer = 86400 * 1000l;
} else if (pStream->precision == TSDB_TIME_PRECISION_MICRO) { } else if (pStream->precision == TSDB_TIME_PRECISION_MICRO) {
timer /= 1000l; timer /= 1000l;
...@@ -162,7 +163,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { ...@@ -162,7 +163,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows) { static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows) {
SSqlStream *pStream = (SSqlStream *)param; SSqlStream *pStream = (SSqlStream *)param;
if (tres == NULL || numOfRows < 0) { if (tres == NULL || numOfRows < 0) {
int64_t retryDelay = tscGetRetryDelayTime(pStream, pStream->slidingTime, pStream->precision); int64_t retryDelay = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
tscError("%p stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql, pStream, numOfRows, tscError("%p stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql, pStream, numOfRows,
retryDelay); retryDelay);
...@@ -223,7 +224,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf ...@@ -223,7 +224,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
SSqlObj * pSql = (SSqlObj *)res; SSqlObj * pSql = (SSqlObj *)res;
if (pSql == NULL || numOfRows < 0) { if (pSql == NULL || numOfRows < 0) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->slidingTime, pStream->precision); int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
tscError("%p stream:%p, retrieve data failed, code:0x%08x, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime); tscError("%p stream:%p, retrieve data failed, code:0x%08x, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime);
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime); tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
...@@ -246,7 +247,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf ...@@ -246,7 +247,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
} }
if (!pStream->isProject) { if (!pStream->isProject) {
pStream->stime = taosTimeAdd(pStream->stime, pStream->slidingTime, pStream->slidingTimeUnit, pStream->precision); pStream->stime = taosTimeAdd(pStream->stime, pStream->interval.sliding, pStream->interval.slidingUnit, pStream->precision);
} }
// actually only one row is returned. this following is not necessary // actually only one row is returned. this following is not necessary
taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream); taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream);
...@@ -306,7 +307,7 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) ...@@ -306,7 +307,7 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
now + timer, timer, delay, pStream->stime, etime); now + timer, timer, delay, pStream->stime, etime);
} else { } else {
tscDebug("%p stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql, pStream, tscDebug("%p stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql, pStream,
pStream->stime, timer, delay, pStream->stime - pStream->intervalTime, pStream->stime - 1); pStream->stime, timer, delay, pStream->stime - pStream->interval.interval, pStream->stime - 1);
} }
pSql->cmd.command = TSDB_SQL_SELECT; pSql->cmd.command = TSDB_SQL_SELECT;
...@@ -320,12 +321,12 @@ static int64_t getLaunchTimeDelay(const SSqlStream* pStream) { ...@@ -320,12 +321,12 @@ static int64_t getLaunchTimeDelay(const SSqlStream* pStream) {
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMaxStreamComputDelay * 1000L : tsMaxStreamComputDelay; (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMaxStreamComputDelay * 1000L : tsMaxStreamComputDelay;
int64_t delayDelta = maxDelay; int64_t delayDelta = maxDelay;
if (pStream->intervalTimeUnit != 'n' && pStream->intervalTimeUnit != 'y') { if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') {
delayDelta = (int64_t)(pStream->slidingTime * tsStreamComputDelayRatio); delayDelta = (int64_t)(pStream->interval.sliding * tsStreamComputDelayRatio);
if (delayDelta > maxDelay) { if (delayDelta > maxDelay) {
delayDelta = maxDelay; delayDelta = maxDelay;
} }
int64_t remainTimeWindow = pStream->slidingTime - delayDelta; int64_t remainTimeWindow = pStream->interval.sliding - delayDelta;
if (maxDelay > remainTimeWindow) { if (maxDelay > remainTimeWindow) {
maxDelay = (int64_t)(remainTimeWindow / 1.5f); maxDelay = (int64_t)(remainTimeWindow / 1.5f);
} }
...@@ -333,8 +334,8 @@ static int64_t getLaunchTimeDelay(const SSqlStream* pStream) { ...@@ -333,8 +334,8 @@ static int64_t getLaunchTimeDelay(const SSqlStream* pStream) {
int64_t currentDelay = (rand() % maxDelay); // a random number int64_t currentDelay = (rand() % maxDelay); // a random number
currentDelay += delayDelta; currentDelay += delayDelta;
if (pStream->intervalTimeUnit != 'n' && pStream->intervalTimeUnit != 'y') { if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') {
assert(currentDelay < pStream->slidingTime); assert(currentDelay < pStream->interval.sliding);
} }
return currentDelay; return currentDelay;
...@@ -349,7 +350,7 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { ...@@ -349,7 +350,7 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
* for project query, no mater fetch data successfully or not, next launch will issue * for project query, no mater fetch data successfully or not, next launch will issue
* more than the sliding time window * more than the sliding time window
*/ */
timer = pStream->slidingTime; timer = pStream->interval.sliding;
if (pStream->stime > pStream->etime) { if (pStream->stime > pStream->etime) {
tscDebug("%p stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql, pStream, tscDebug("%p stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql, pStream,
pStream->stime, pStream->etime); pStream->stime, pStream->etime);
...@@ -362,7 +363,8 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { ...@@ -362,7 +363,8 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
return; return;
} }
} else { } else {
int64_t stime = taosGetIntervalStartTimestamp(pStream->stime - 1, pStream->intervalTime, pStream->intervalTime, pStream->intervalTimeUnit, pStream->precision); int64_t stime = taosTimeTruncate(pStream->stime - 1, &pStream->interval, pStream->precision);
//int64_t stime = taosGetIntervalStartTimestamp(pStream->stime - 1, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision);
if (stime >= pStream->etime) { if (stime >= pStream->etime) {
tscDebug("%p stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql, pStream, tscDebug("%p stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql, pStream,
pStream->stime, pStream->etime); pStream->stime, pStream->etime);
...@@ -402,8 +404,8 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { ...@@ -402,8 +404,8 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
pQueryInfo->interval.interval = minIntervalTime; pQueryInfo->interval.interval = minIntervalTime;
} }
pStream->intervalTimeUnit = pQueryInfo->interval.intervalUnit; pStream->interval.intervalUnit = pQueryInfo->interval.intervalUnit;
pStream->intervalTime = pQueryInfo->interval.interval; // it shall be derived from sql string pStream->interval.interval = pQueryInfo->interval.interval; // it shall be derived from sql string
if (pQueryInfo->interval.sliding <= 0) { if (pQueryInfo->interval.sliding <= 0) {
pQueryInfo->interval.sliding = pQueryInfo->interval.interval; pQueryInfo->interval.sliding = pQueryInfo->interval.interval;
...@@ -427,8 +429,8 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { ...@@ -427,8 +429,8 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
pQueryInfo->interval.sliding = pQueryInfo->interval.interval; pQueryInfo->interval.sliding = pQueryInfo->interval.interval;
} }
pStream->slidingTimeUnit = pQueryInfo->interval.slidingUnit; pStream->interval.slidingUnit = pQueryInfo->interval.slidingUnit;
pStream->slidingTime = pQueryInfo->interval.sliding; pStream->interval.sliding = pQueryInfo->interval.sliding;
if (pStream->isProject) { if (pStream->isProject) {
pQueryInfo->interval.interval = 0; // clear the interval value to avoid the force time window split by query processor pQueryInfo->interval.interval = 0; // clear the interval value to avoid the force time window split by query processor
...@@ -441,8 +443,8 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in ...@@ -441,8 +443,8 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
if (pStream->isProject) { if (pStream->isProject) {
// no data in table, flush all data till now to destination meter, 10sec delay // no data in table, flush all data till now to destination meter, 10sec delay
pStream->intervalTime = tsProjectExecInterval; pStream->interval.interval = tsProjectExecInterval;
pStream->slidingTime = tsProjectExecInterval; pStream->interval.sliding = tsProjectExecInterval;
if (stime != 0) { // first projection start from the latest event timestamp if (stime != 0) { // first projection start from the latest event timestamp
assert(stime >= pQueryInfo->window.skey); assert(stime >= pQueryInfo->window.skey);
...@@ -455,12 +457,15 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in ...@@ -455,12 +457,15 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
stime = pQueryInfo->window.skey; stime = pQueryInfo->window.skey;
if (stime == INT64_MIN) { if (stime == INT64_MIN) {
stime = (int64_t)taosGetTimestamp(pStream->precision); stime = (int64_t)taosGetTimestamp(pStream->precision);
stime = taosGetIntervalStartTimestamp(stime, pStream->intervalTime, pStream->intervalTime, pStream->intervalTimeUnit, pStream->precision); stime = taosTimeTruncate(stime, &pStream->interval, pStream->precision);
stime = taosGetIntervalStartTimestamp(stime - 1, pStream->intervalTime, pStream->intervalTime, pStream->intervalTimeUnit, pStream->precision); stime = taosTimeTruncate(stime - 1, &pStream->interval, pStream->precision);
//stime = taosGetIntervalStartTimestamp(stime, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision);
//stime = taosGetIntervalStartTimestamp(stime - 1, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision);
tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64, pSql, pStream, stime); tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64, pSql, pStream, stime);
} }
} else { } else {
int64_t newStime = taosGetIntervalStartTimestamp(stime, pStream->intervalTime, pStream->intervalTime, pStream->intervalTimeUnit, pStream->precision); //int64_t newStime = taosGetIntervalStartTimestamp(stime, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision);
int64_t newStime = taosTimeTruncate(stime, &pStream->interval, pStream->precision);
if (newStime != stime) { if (newStime != stime) {
tscWarn("%p stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64, pSql, pStream, stime, newStime); tscWarn("%p stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64, pSql, pStream, stime, newStime);
stime = newStime; stime = newStime;
...@@ -530,7 +535,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { ...@@ -530,7 +535,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer); taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer);
tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql, tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql,
pStream, pTableMetaInfo->name, pStream->intervalTime, pStream->slidingTime, starttime, pSql->sqlstr); pStream, pTableMetaInfo->name, pStream->interval.interval, pStream->interval.sliding, starttime, pSql->sqlstr);
} }
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
......
...@@ -35,7 +35,6 @@ bool tscValidateTableNameLength(size_t len); ...@@ -35,7 +35,6 @@ bool tscValidateTableNameLength(size_t len);
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters); SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters);
int32_t taosCountNatualInterval(int64_t skey, int64_t ekey, int64_t intervalTime, char timeUnit, int16_t precision); // int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision);
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision);
#endif // TDENGINE_NAME_H #endif // TDENGINE_NAME_H
...@@ -99,35 +99,7 @@ SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numO ...@@ -99,35 +99,7 @@ SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numO
return pFilter; return pFilter;
} }
int32_t taosCountNatualInterval(int64_t skey, int64_t ekey, int64_t intervalTime, char timeUnit, int16_t precision) { #if 0
skey /= 1000;
ekey /= 1000;
if (precision == TSDB_TIME_PRECISION_MICRO) {
skey /= 1000;
ekey /= 1000;
}
if (ekey < skey) {
int64_t tmp = ekey;
ekey = skey;
skey = tmp;
}
struct tm tm;
time_t t = (time_t)skey;
localtime_r(&t, &tm);
int smon = tm.tm_year * 12 + tm.tm_mon;
t = (time_t)ekey;
localtime_r(&t, &tm);
int emon = tm.tm_year * 12 + tm.tm_mon;
if (timeUnit == 'y') {
intervalTime *= 12;
}
return (emon - smon) / (int32_t)intervalTime;
}
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision) { int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision) {
if (slidingTime == 0) { if (slidingTime == 0) {
return startTime; return startTime;
...@@ -192,6 +164,8 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in ...@@ -192,6 +164,8 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in
return start; return start;
} }
#endif
/* /*
* tablePrefix.columnName * tablePrefix.columnName
* extract table name and save it in pTable, with only column name in pToken * extract table name and save it in pTable, with only column name in pToken
......
...@@ -101,6 +101,7 @@ extern const int32_t TYPE_BYTES[11]; ...@@ -101,6 +101,7 @@ extern const int32_t TYPE_BYTES[11];
#define TSDB_TIME_PRECISION_MILLI 0 #define TSDB_TIME_PRECISION_MILLI 0
#define TSDB_TIME_PRECISION_MICRO 1 #define TSDB_TIME_PRECISION_MICRO 1
#define TSDB_TIME_PRECISION_NANO 2 #define TSDB_TIME_PRECISION_NANO 2
#define TSDB_TICK_PER_SECOND(precision) ((precision)==TSDB_TIME_PRECISION_MILLI ? 1e3L : ((precision)==TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L))
#define TSDB_TIME_PRECISION_MILLI_STR "ms" #define TSDB_TIME_PRECISION_MILLI_STR "ms"
#define TSDB_TIME_PRECISION_MICRO_STR "us" #define TSDB_TIME_PRECISION_MICRO_STR "us"
......
...@@ -75,6 +75,7 @@ typedef struct SInterval { ...@@ -75,6 +75,7 @@ typedef struct SInterval {
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision); int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision);
int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision); int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision);
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision);
int32_t getTimestampInUsFromStr(char* token, int32_t tokenlen, int64_t* ts); int32_t getTimestampInUsFromStr(char* token, int32_t tokenlen, int64_t* ts);
int32_t parseDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit); int32_t parseDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit);
......
...@@ -413,24 +413,43 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) { ...@@ -413,24 +413,43 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
return t + duration; return t + duration;
} }
t /= 1000;
if (precision == TSDB_TIME_PRECISION_MICRO) {
t /= 1000;
}
struct tm tm; struct tm tm;
time_t tt = (time_t)t; time_t tt = (time_t)(t / TSDB_TICK_PER_SECOND(precision));
localtime_r(&tt, &tm); localtime_r(&tt, &tm);
int mon = tm.tm_year * 12 + tm.tm_mon + (int)duration; int mon = tm.tm_year * 12 + tm.tm_mon + (int)duration;
tm.tm_year = mon / 12; tm.tm_year = mon / 12;
tm.tm_mon = mon % 12; tm.tm_mon = mon % 12;
t = mktime(&tm) * 1000L; return mktime(&tm) * TSDB_TICK_PER_SECOND(precision);
if (precision == TSDB_TIME_PRECISION_MICRO) { }
t *= 1000L;
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision) {
if (ekey < skey) {
int64_t tmp = ekey;
ekey = skey;
skey = tmp;
} }
if (unit != 'n' && unit != 'y') {
return (int32_t)((ekey - skey) / interval);
}
skey /= TSDB_TICK_PER_SECOND(precision);
ekey /= TSDB_TICK_PER_SECOND(precision);
return t; struct tm tm;
time_t t = (time_t)skey;
localtime_r(&t, &tm);
int smon = tm.tm_year * 12 + tm.tm_mon;
t = (time_t)ekey;
localtime_r(&t, &tm);
int emon = tm.tm_year * 12 + tm.tm_mon;
if (unit == 'y') {
interval *= 12;
}
return (emon - smon) / (int32_t)interval;
} }
int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision) { int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision) {
...@@ -440,14 +459,11 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio ...@@ -440,14 +459,11 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
} }
int64_t start = t; int64_t start = t;
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { if (pInterval->slidingUnit == 'n' || pInterval->slidingUnit == 'y') {
start /= 1000; start /= TSDB_TICK_PER_SECOND(precision);
if (precision == TSDB_TIME_PRECISION_MICRO) {
start /= 1000;
}
struct tm tm; struct tm tm;
time_t t = (time_t)start; time_t tt = (time_t)start;
localtime_r(&t, &tm); localtime_r(&tt, &tm);
tm.tm_sec = 0; tm.tm_sec = 0;
tm.tm_min = 0; tm.tm_min = 0;
tm.tm_hour = 0; tm.tm_hour = 0;
...@@ -463,10 +479,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio ...@@ -463,10 +479,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
tm.tm_mon = mon % 12; tm.tm_mon = mon % 12;
} }
start = mktime(&tm) * 1000L; start = mktime(&tm) * TSDB_TICK_PER_SECOND(precision);
if (precision == TSDB_TIME_PRECISION_MICRO) {
start *= 1000L;
}
} else { } else {
int64_t delta = t - pInterval->interval; int64_t delta = t - pInterval->interval;
int32_t factor = delta > 0 ? 1 : -1; int32_t factor = delta > 0 ? 1 : -1;
...@@ -486,8 +499,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio ...@@ -486,8 +499,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
char** tzname = _tzname; char** tzname = _tzname;
#endif #endif
int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L; start += timezone * TSDB_TICK_PER_SECOND(precision);
start += timezone * t;
} }
int64_t end = start + pInterval->interval - 1; int64_t end = start + pInterval->interval - 1;
...@@ -496,7 +508,13 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio ...@@ -496,7 +508,13 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
} }
} }
return taosTimeAdd(start, pInterval->offset, pInterval->offsetUnit, precision); if (pInterval->offset > 0) {
start = taosTimeAdd(start, pInterval->offset, pInterval->offsetUnit, precision);
if (start > t) {
start = taosTimeAdd(start, -pInterval->interval, pInterval->intervalUnit, precision);
}
}
return start;
} }
// internal function, when program is paused in debugger, // internal function, when program is paused in debugger,
...@@ -507,24 +525,38 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio ...@@ -507,24 +525,38 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
// 2020-07-03 17:48:42 // 2020-07-03 17:48:42
// and the parameter can also be a variable. // and the parameter can also be a variable.
const char* fmtts(int64_t ts) { const char* fmtts(int64_t ts) {
static char buf[32]; static char buf[96];
size_t pos = 0;
struct tm tm;
time_t tt;
if (ts > -62135625943 && ts < 32503651200) { if (ts > -62135625943 && ts < 32503651200) {
tt = ts; time_t t = (time_t)ts;
} else if (ts > -62135625943000 && ts < 32503651200000) { localtime_r(&t, &tm);
tt = ts / 1000; pos += strftime(buf + pos, sizeof(buf), "s=%Y-%m-%d %H:%M:%S", &tm);
} else {
tt = ts / 1000000;
} }
struct tm* ptm = localtime(&tt); if (ts > -62135625943000 && ts < 32503651200000) {
size_t pos = strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", ptm); time_t t = (time_t)(ts / 1000);
localtime_r(&t, &tm);
if (pos > 0) {
buf[pos++] = ' ';
buf[pos++] = '|';
buf[pos++] = ' ';
}
pos += strftime(buf + pos, sizeof(buf), "ms=%Y-%m-%d %H:%M:%S", &tm);
pos += sprintf(buf + pos, ".%03d", (int)(ts % 1000));
}
if (ts <= -62135625943000 || ts >= 32503651200000) { {
sprintf(buf + pos, ".%06d", (int)(ts % 1000000)); time_t t = (time_t)(ts / 1000000);
} else if (ts <= -62135625943 || ts >= 32503651200) { localtime_r(&t, &tm);
sprintf(buf + pos, ".%03d", (int)(ts % 1000)); if (pos > 0) {
buf[pos++] = ' ';
buf[pos++] = '|';
buf[pos++] = ' ';
}
pos += strftime(buf + pos, sizeof(buf), "us=%Y-%m-%d %H:%M:%S", &tm);
pos += sprintf(buf + pos, ".%06d", (int)(ts % 1000000));
} }
return buf; return buf;
......
...@@ -550,8 +550,6 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t ...@@ -550,8 +550,6 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
w.ekey = pQuery->window.ekey; w.ekey = pQuery->window.ekey;
} }
assert(ts >= w.skey && ts <= w.ekey);
return w; return w;
} }
......
...@@ -174,24 +174,26 @@ int64_t getFilledNumOfRes(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows ...@@ -174,24 +174,26 @@ int64_t getFilledNumOfRes(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows
int64_t numOfRes = -1; int64_t numOfRes = -1;
if (numOfRows > 0) { // still fill gap within current data block, not generating data after the result set. if (numOfRows > 0) { // still fill gap within current data block, not generating data after the result set.
TSKEY lastKey = tsList[pFillInfo->numOfRows - 1]; TSKEY lastKey = tsList[pFillInfo->numOfRows - 1];
numOfRes = taosTimeCountInterval(
if (pFillInfo->interval.slidingUnit != 'y' && pFillInfo->interval.slidingUnit != 'n') { lastKey,
numOfRes = (int64_t)(ABS(lastKey - pFillInfo->start) / pFillInfo->interval.sliding) + 1; pFillInfo->start,
} else { pFillInfo->interval.sliding,
numOfRes = taosCountNatualInterval(lastKey, pFillInfo->start, pFillInfo->interval.sliding, pFillInfo->interval.slidingUnit, pFillInfo->precision) + 1; pFillInfo->interval.slidingUnit,
} pFillInfo->precision);
numOfRes += 1;
assert(numOfRes >= numOfRows); assert(numOfRes >= numOfRows);
} else { // reach the end of data } else { // reach the end of data
if ((ekey1 < pFillInfo->start && FILL_IS_ASC_FILL(pFillInfo)) || if ((ekey1 < pFillInfo->start && FILL_IS_ASC_FILL(pFillInfo)) ||
(ekey1 > pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo))) { (ekey1 > pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo))) {
return 0; return 0;
} }
// the numOfRes rows are all filled with specified policy numOfRes = taosTimeCountInterval(
if (pFillInfo->interval.slidingUnit != 'y' && pFillInfo->interval.slidingUnit != 'n') { ekey1,
numOfRes = (ABS(ekey1 - pFillInfo->start) / pFillInfo->interval.sliding) + 1; pFillInfo->start,
} else { pFillInfo->interval.sliding,
numOfRes = taosCountNatualInterval(ekey1, pFillInfo->start, pFillInfo->interval.sliding, pFillInfo->interval.slidingUnit, pFillInfo->precision) + 1; pFillInfo->interval.slidingUnit,
} pFillInfo->precision);
numOfRes += 1;
} }
return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes; return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册