提交 49d279e4 编写于 作者: B Bomin Zhang

td-1262: stream support history data

上级 a1ce0a26
...@@ -136,7 +136,6 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { ...@@ -136,7 +136,6 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
etime = pStream->stime + (etime - pStream->stime) / pStream->interval.interval * pStream->interval.interval; etime = pStream->stime + (etime - pStream->stime) / pStream->interval.interval * pStream->interval.interval;
} else { } else {
etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision); etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision);
//etime = taosGetIntervalStartTimestamp(etime, pStream->interval.sliding, pStream->interval.sliding, pStream->interval.slidingUnit, pStream->precision);
} }
pQueryInfo->window.ekey = etime; pQueryInfo->window.ekey = etime;
if (pQueryInfo->window.skey >= pQueryInfo->window.ekey) { if (pQueryInfo->window.skey >= pQueryInfo->window.ekey) {
...@@ -454,17 +453,11 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in ...@@ -454,17 +453,11 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
} }
} else { // timewindow based aggregation stream } else { // timewindow based aggregation stream
if (stime == 0) { // no data in meter till now if (stime == 0) { // no data in meter till now
stime = pQueryInfo->window.skey; if (pQueryInfo->window.skey != INT64_MIN) {
if (stime == INT64_MIN) { stime = pQueryInfo->window.skey;
stime = (int64_t)taosGetTimestamp(pStream->precision);
stime = taosTimeTruncate(stime, &pStream->interval, pStream->precision);
stime = taosTimeTruncate(stime - 1, &pStream->interval, pStream->precision);
//stime = taosGetIntervalStartTimestamp(stime, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision);
//stime = taosGetIntervalStartTimestamp(stime - 1, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision);
tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64, pSql, pStream, stime);
} }
stime = taosTimeTruncate(stime, &pStream->interval, pStream->precision);
} else { } else {
//int64_t newStime = taosGetIntervalStartTimestamp(stime, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision);
int64_t newStime = taosTimeTruncate(stime, &pStream->interval, pStream->precision); int64_t newStime = taosTimeTruncate(stime, &pStream->interval, pStream->precision);
if (newStime != stime) { if (newStime != stime) {
tscWarn("%p stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64, pSql, pStream, stime, newStime); tscWarn("%p stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64, pSql, pStream, stime, newStime);
...@@ -477,8 +470,10 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in ...@@ -477,8 +470,10 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
} }
static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) { static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) {
int64_t timer = pStream->stime - taosGetTimestamp(pStream->precision); int64_t timer = 0, now = taosGetTimestamp(pStream->precision);
if (timer < 0) timer = 0; if (pStream->stime > now) {
timer = pStream->stime - now;
}
int64_t startDelay = int64_t startDelay =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsStreamCompStartDelay * 1000L : tsStreamCompStartDelay; (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsStreamCompStartDelay * 1000L : tsStreamCompStartDelay;
......
...@@ -154,6 +154,7 @@ python3 ./test.py -f stream/new.py ...@@ -154,6 +154,7 @@ python3 ./test.py -f stream/new.py
python3 ./test.py -f stream/stream1.py python3 ./test.py -f stream/stream1.py
python3 ./test.py -f stream/stream2.py python3 ./test.py -f stream/stream2.py
python3 ./test.py -f stream/parser.py python3 ./test.py -f stream/parser.py
python3 ./test.py -f stream/history.py
#alter table #alter table
python3 ./test.py -f alter/alter_table_crash.py python3 ./test.py -f alter/alter_table_crash.py
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import time
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
tdSql.execute("create table cars(ts timestamp, s int) tags(id int)")
tdSql.execute("create table car0 using cars tags(0)")
tdSql.execute("create table car1 using cars tags(1)")
tdSql.execute("create table car2 using cars tags(2)")
tdSql.execute("create table car3 using cars tags(3)")
tdSql.execute("create table car4 using cars tags(4)")
tdSql.execute("insert into car0 values('2019-01-01 00:00:00.103', 1)")
tdSql.execute("insert into car1 values('2019-01-01 00:00:00.234', 1)")
tdSql.execute("insert into car0 values('2019-01-01 00:00:01.012', 1)")
tdSql.execute("insert into car0 values('2019-01-01 00:00:02.003', 1)")
tdSql.execute("insert into car2 values('2019-01-01 00:00:02.328', 1)")
tdSql.execute("insert into car0 values('2019-01-01 00:00:03.139', 1)")
tdSql.execute("insert into car0 values('2019-01-01 00:00:04.348', 1)")
tdSql.execute("insert into car0 values('2019-01-01 00:00:05.783', 1)")
tdSql.execute("insert into car1 values('2019-01-01 00:00:01.893', 1)")
tdSql.execute("insert into car1 values('2019-01-01 00:00:02.712', 1)")
tdSql.execute("insert into car1 values('2019-01-01 00:00:03.982', 1)")
tdSql.execute("insert into car3 values('2019-01-01 00:00:01.389', 1)")
tdSql.execute("insert into car4 values('2019-01-01 00:00:01.829', 1)")
tdSql.execute("create table strm as select count(*) from cars interval(4s)")
tdSql.waitedQuery("select * from strm", 2, 100)
tdSql.checkData(0, 1, 11)
tdSql.checkData(1, 1, 2)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册