From 49d279e4f7b66e363e0fb192f7f621d0a37d24ec Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Tue, 22 Sep 2020 16:06:40 +0800 Subject: [PATCH] td-1262: stream support history data --- src/client/src/tscStream.c | 19 ++++------ tests/pytest/fulltest.sh | 1 + tests/pytest/stream/history.py | 63 ++++++++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+), 12 deletions(-) create mode 100644 tests/pytest/stream/history.py diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 1ebef4ce0f..ea5532cf48 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -136,7 +136,6 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { etime = pStream->stime + (etime - pStream->stime) / pStream->interval.interval * pStream->interval.interval; } else { etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision); - //etime = taosGetIntervalStartTimestamp(etime, pStream->interval.sliding, pStream->interval.sliding, pStream->interval.slidingUnit, pStream->precision); } pQueryInfo->window.ekey = etime; if (pQueryInfo->window.skey >= pQueryInfo->window.ekey) { @@ -454,17 +453,11 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in } } else { // timewindow based aggregation stream if (stime == 0) { // no data in meter till now - stime = pQueryInfo->window.skey; - if (stime == INT64_MIN) { - stime = (int64_t)taosGetTimestamp(pStream->precision); - stime = taosTimeTruncate(stime, &pStream->interval, pStream->precision); - stime = taosTimeTruncate(stime - 1, &pStream->interval, pStream->precision); - //stime = taosGetIntervalStartTimestamp(stime, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision); - //stime = taosGetIntervalStartTimestamp(stime - 1, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision); - tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64, pSql, pStream, stime); + if (pQueryInfo->window.skey != INT64_MIN) { + stime = pQueryInfo->window.skey; } + stime = taosTimeTruncate(stime, &pStream->interval, pStream->precision); } else { - //int64_t newStime = taosGetIntervalStartTimestamp(stime, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision); int64_t newStime = taosTimeTruncate(stime, &pStream->interval, pStream->precision); if (newStime != stime) { tscWarn("%p stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64, pSql, pStream, stime, newStime); @@ -477,8 +470,10 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in } static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) { - int64_t timer = pStream->stime - taosGetTimestamp(pStream->precision); - if (timer < 0) timer = 0; + int64_t timer = 0, now = taosGetTimestamp(pStream->precision); + if (pStream->stime > now) { + timer = pStream->stime - now; + } int64_t startDelay = (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsStreamCompStartDelay * 1000L : tsStreamCompStartDelay; diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index 9ebf1584e2..ab8593a781 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -154,6 +154,7 @@ python3 ./test.py -f stream/new.py python3 ./test.py -f stream/stream1.py python3 ./test.py -f stream/stream2.py python3 ./test.py -f stream/parser.py +python3 ./test.py -f stream/history.py #alter table python3 ./test.py -f alter/alter_table_crash.py diff --git a/tests/pytest/stream/history.py b/tests/pytest/stream/history.py new file mode 100644 index 0000000000..890580001c --- /dev/null +++ b/tests/pytest/stream/history.py @@ -0,0 +1,63 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import time +import taos +from util.log import tdLog +from util.cases import tdCases +from util.sql import tdSql + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + def run(self): + tdSql.prepare() + + tdSql.execute("create table cars(ts timestamp, s int) tags(id int)") + tdSql.execute("create table car0 using cars tags(0)") + tdSql.execute("create table car1 using cars tags(1)") + tdSql.execute("create table car2 using cars tags(2)") + tdSql.execute("create table car3 using cars tags(3)") + tdSql.execute("create table car4 using cars tags(4)") + + tdSql.execute("insert into car0 values('2019-01-01 00:00:00.103', 1)") + tdSql.execute("insert into car1 values('2019-01-01 00:00:00.234', 1)") + tdSql.execute("insert into car0 values('2019-01-01 00:00:01.012', 1)") + tdSql.execute("insert into car0 values('2019-01-01 00:00:02.003', 1)") + tdSql.execute("insert into car2 values('2019-01-01 00:00:02.328', 1)") + tdSql.execute("insert into car0 values('2019-01-01 00:00:03.139', 1)") + tdSql.execute("insert into car0 values('2019-01-01 00:00:04.348', 1)") + tdSql.execute("insert into car0 values('2019-01-01 00:00:05.783', 1)") + tdSql.execute("insert into car1 values('2019-01-01 00:00:01.893', 1)") + tdSql.execute("insert into car1 values('2019-01-01 00:00:02.712', 1)") + tdSql.execute("insert into car1 values('2019-01-01 00:00:03.982', 1)") + tdSql.execute("insert into car3 values('2019-01-01 00:00:01.389', 1)") + tdSql.execute("insert into car4 values('2019-01-01 00:00:01.829', 1)") + + tdSql.execute("create table strm as select count(*) from cars interval(4s)") + tdSql.waitedQuery("select * from strm", 2, 100) + tdSql.checkData(0, 1, 11) + tdSql.checkData(1, 1, 2) + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) -- GitLab