diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index fbcf891b4e74d1da8e8a06e4cf42e3c3c038ce58..5c3c12caba9f0ab324fb44802a660fc66d4b1b8b 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -273,7 +273,7 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { pSdesc->num = htobe64(pStream->num); pSdesc->useconds = htobe64(pStream->useconds); - pSdesc->stime = htobe64(pStream->stime - pStream->interval.interval); + pSdesc->stime = (pStream->stime == INT64_MIN) ? htobe64(pStream->stime) : htobe64(pStream->stime - pStream->interval.interval); pSdesc->ctime = htobe64(pStream->ctime); pSdesc->slidingTime = htobe64(pStream->interval.sliding); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index d0e778bddca9235824ce8c6988fd691d1f6d47f0..02d8b7957ee84aaae3ed689c94b84feab9421904 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -402,10 +402,12 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { taos_close_stream(pStream); return; } - - timer = pStream->stime - taosGetTimestamp(pStream->precision); - if (timer < 0) { - timer = 0; + + if (pStream->stime > 0) { + timer = pStream->stime - taosGetTimestamp(pStream->precision); + if (timer < 0) { + timer = 0; + } } } @@ -473,6 +475,10 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + + if (stime == INT64_MIN) { + return stime; + } if (pStream->isProject) { // no data in table, flush all data till now to destination meter, 10sec delay diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index fb683786adee33c62ba7c4af1642c9a4e83a4406..5d5d5f339eec62db09f4c93acad0cea096c0962d 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -449,7 +449,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { pObj->tmrId = 0; if (pObj->pStream == NULL) { - pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, (void *)pObj->rid, NULL); + pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL); // TODO the pObj->pStream may be released if error happens if (pObj->pStream) { diff --git a/tests/pytest/stream/new.py b/tests/pytest/stream/new.py index 70f300e937b2ed422231a774d2bca1183ce6fd0a..4a0e47c01ad9f9aac7ed78be0ff4fc93fc0d41ed 100644 --- a/tests/pytest/stream/new.py +++ b/tests/pytest/stream/new.py @@ -42,7 +42,7 @@ class TDTestCase: tdLog.info("=============== step3") start = time.time() - tdSql.waitedQuery("select * from st", 1, 120) + tdSql.waitedQuery("select * from st", 1, 180) delay = int(time.time() - start) + 80 v = tdSql.getData(0, 3) if v >= 51: diff --git a/tests/pytest/stream/stream2.py b/tests/pytest/stream/stream2.py index d71742048a45774fbe6909093840f265b829eb4c..9b4eb8725c96f95196f251c55b0b773cd68e9ed5 100644 --- a/tests/pytest/stream/stream2.py +++ b/tests/pytest/stream/stream2.py @@ -88,6 +88,8 @@ class TDTestCase: except Exception as e: tdLog.info(repr(e)) + + time.sleep(5) tdSql.query("show streams") tdSql.checkRows(1) tdSql.checkData(0, 2, 's0') @@ -146,6 +148,7 @@ class TDTestCase: except Exception as e: tdLog.info(repr(e)) + time.sleep(5) tdSql.query("show streams") tdSql.checkRows(2) tdSql.checkData(0, 2, 's1') diff --git a/tests/pytest/stream/sys.py b/tests/pytest/stream/sys.py index a73e7043e8c65b2eb9c78fbcb99d4e546ddf9ae4..c9a3fccfe68b61da722dcdb2ccab63bf3d5bcabc 100644 --- a/tests/pytest/stream/sys.py +++ b/tests/pytest/stream/sys.py @@ -47,7 +47,7 @@ class TDTestCase: "select * from iostrm", ] for sql in sqls: - (rows, _) = tdSql.waitedQuery(sql, 1, 120) + (rows, _) = tdSql.waitedQuery(sql, 1, 240) if rows < 1: tdLog.exit("failed: sql:%s, expect at least one row" % sql) diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 8cda72fb38061ede39d22178a739e10a784c8468..ba9cb4d53d2ee733f4b087d7c4067e816cf8d112 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -87,6 +87,7 @@ class TDSql: self.queryResult = self.cursor.fetchall() self.queryRows = len(self.queryResult) self.queryCols = len(self.cursor.description) + tdLog.info("sql: %s, try to retrieve %d rows,get %d rows" % (sql, expectRows, self.queryRows)) if self.queryRows >= expectRows: return (self.queryRows, i) time.sleep(1) diff --git a/tests/script/general/stream/stream_1970.sim b/tests/script/general/stream/stream_1970.sim new file mode 100644 index 0000000000000000000000000000000000000000..30a733c08ff37d193df688ee956f3c5911671ddf --- /dev/null +++ b/tests/script/general/stream/stream_1970.sim @@ -0,0 +1,73 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c walLevel -v 1 +system sh/exec.sh -n dnode1 -s start + +sleep 2000 +sql connect + +print ======================== dnode1 start + +$dbPrefix = s3_db +$tbPrefix = s3_tb +$mtPrefix = s3_mt +$stPrefix = s3_st +$tbNum = 10 +$rowNum = 20 +$totalNum = 200 + +print =============== step1 +$i = 0 +$db = $dbPrefix . $i +$mt = $mtPrefix . $i +$st = $stPrefix . $i + +sql drop databae $db -x step1 +step1: +sql create database $db keep 36500 +sql use $db +sql create stable $mt (ts timestamp, tbcol int, tbcol2 float) TAGS(tgcol int) + +sql create table cq1 as select count(*) from $mt interval(10s); + +sleep 1000 + +sql create table $st using $mt tags(1); + +sql insert into $st values (-50000, 1, 1.0); +sql insert into $st values (-10000, 1, 1.0); +sql insert into $st values (10000, 1, 1.0); + + +$i = 0 +while $i < 12 + sql select * from cq1; + + if $rows != 3 then + sleep 10000 + else + if $data00 != @70-01-01 07:59:10.000@ then + return -1 + endi + if $data01 != 1 then + return -1 + endi + if $data10 != @70-01-01 07:59:50.000@ then + return -1 + endi + if $data11 != 1 then + return -1 + endi + if $data20 != @70-01-01 08:00:10.000@ then + return -1 + endi + if $data21 != 1 then + return -1 + endi + $i = 12 + endi + + $i = $i + 1 +endw +