提交 94b021ce 编写于 作者: A Alex Duan

fixed ekey include like 18:00

上级 69c913dc
......@@ -160,18 +160,19 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
} else {
pQueryInfo->window.skey = pStream->stime;
int64_t etime = taosGetTimestamp(pStream->precision);
int64_t one = convertTimePrecision(1, TSDB_TIME_PRECISION_MILLI, pStream->precision);
// delay to wait all data in last time window
etime -= convertTimePrecision(tsMaxStreamComputDelay, TSDB_TIME_PRECISION_MILLI, pStream->precision);
if (etime > pStream->etime) {
etime = pStream->etime;
} else if (pStream->interval.intervalUnit != 'y' && pStream->interval.intervalUnit != 'n') {
if(pStream->stime == INT64_MIN) {
etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision);
etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision) - one;
} else {
etime = pStream->stime + (etime - pStream->stime) / pStream->interval.interval * pStream->interval.interval;
etime = pStream->stime + (etime - pStream->stime) / pStream->interval.interval * pStream->interval.interval - one;
}
} else {
etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision);
etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision) - one;
}
pQueryInfo->window.ekey = etime;
if (pQueryInfo->window.skey >= pQueryInfo->window.ekey) {
......@@ -202,7 +203,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
}
}
tscDebug("CQ ProcessStreamTimer skey=%" PRId64 " ekey=%" PRId64 " stime=%" PRId64 " endtime=%" PRId64, pQueryInfo->window.skey, pQueryInfo->window.ekey, pStream->stime, pStream->etime);
tscDebug("CQ ProcessStreamTimer skey=%" PRId64 " ekey=%" PRId64 " stime=%" PRId64 " etime=%" PRId64, pQueryInfo->window.skey, pQueryInfo->window.ekey, pStream->stime, pStream->etime);
// launch stream computing in a new thread
SSchedMsg schedMsg = { 0 };
schedMsg.fp = tscProcessStreamLaunchQuery;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册