diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index c8754e5bebd7a460ed3e33a5e56a1e535dbf7035..a398ad659e4596df1ed9538b6c7a3750eb0f20d8 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -320,6 +320,8 @@ typedef struct SSqlStream { SSqlObj *pSql; uint32_t streamId; char listed; + bool isProject; + int16_t precision; int64_t num; // number of computing count /* @@ -334,7 +336,6 @@ typedef struct SSqlStream { int64_t etime; // stream end query time, when time is larger then etime, the stream will be closed int64_t interval; int64_t slidingTime; - int16_t precision; void * pTimer; void (*fp)(); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index cc703e0f9338f50f765d6c1679ca2f541930a1d8..e2cdff2d032079e0d6af07df1f075d69c435c43b 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -71,6 +71,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { pSql->fp = tscProcessStreamQueryCallback; pSql->param = pStream; + pSql->res.completed = false; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -86,7 +87,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { // failed to get meter/metric meta, retry in 10sec. if (code != TSDB_CODE_SUCCESS) { int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); - tscError("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime); + tscDebug("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime); tscSetRetryTimer(pStream, pSql, retryDelayTime); } else { @@ -108,7 +109,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); tscDebug("%p add into timer", pSql); - if (isProjectStream(pQueryInfo)) { + if (pStream->isProject) { /* * pQueryInfo->window.ekey, which is the start time, does not change in case of * repeat first execution, once the first execution failed. @@ -121,7 +122,6 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { } } else { pQueryInfo->window.skey = pStream->stime - pStream->interval; - pQueryInfo->window.ekey = pStream->stime - 1; } // launch stream computing in a new thread @@ -180,16 +180,11 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful. pStream->numOfRes += numOfRows; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); for(int32_t i = 0; i < numOfRows; ++i) { TAOS_ROW row = taos_fetch_row(res); tscDebug("%p stream:%p fetch result", pSql, pStream); - if (isProjectStream(pQueryInfo)) { - pStream->stime = *(TSKEY *)row[0]; - } else { - tscSetTimestampForRes(pStream, pSql); - } + pStream->stime = *(TSKEY *)row[0]; // user callback function (*pStream->fp)(pStream->param, res, row); @@ -225,9 +220,6 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf tscSetTimestampForRes(pStream, pSql); row[0] = pRes->data; - // char result[512] = {0}; - // taos_print_row(result, row, pQueryInfo->fieldsInfo.pFields, pQueryInfo->fieldsInfo.numOfOutput); - // tscInfo("%p stream:%p query result: %s", pSql, pStream, result); tscDebug("%p stream:%p fetch result", pSql, pStream); // user callback function @@ -235,7 +227,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf pRes->numOfRows = 0; pRes->data = oldPtr; - } else if (isProjectStream(pQueryInfo)) { + } else if (pStream->isProject) { /* no resuls in the query range, retry */ // todo set retry dynamic time int32_t retry = tsProjectExecInterval; @@ -245,7 +237,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf return; } } else { - if (isProjectStream(pQueryInfo)) { + if (pStream->isProject) { pStream->stime += 1; } } @@ -262,10 +254,9 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf } static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); int64_t delay = getDelayValueAfterTimewindowClosed(pStream, timer); - if (isProjectStream(pQueryInfo)) { + if (pStream->isProject) { int64_t now = taosGetTimestamp(pStream->precision); int64_t etime = now > pStream->etime ? pStream->etime : now; @@ -323,8 +314,7 @@ static int64_t getLaunchTimeDelay(const SSqlStream* pStream) { static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { int64_t timer = 0; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - if (isProjectStream(pQueryInfo)) { + if (pStream->isProject) { /* * for project query, no mater fetch data successfully or not, next launch will issue * more than the sliding time window @@ -342,7 +332,6 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { return; } } else { - pStream->stime += pStream->slidingTime; if ((pStream->stime - pStream->interval) >= pStream->etime) { tscDebug("%p stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql, pStream, pStream->stime, pStream->etime); @@ -409,14 +398,16 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { pStream->slidingTime = pQueryInfo->slidingTime; - pQueryInfo->intervalTime = 0; // clear the interval value to avoid the force time window split by query processor - pQueryInfo->slidingTime = 0; + if (pStream->isProject) { + pQueryInfo->intervalTime = 0; // clear the interval value to avoid the force time window split by query processor + pQueryInfo->slidingTime = 0; + } } static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - if (isProjectStream(pQueryInfo)) { + if (pStream->isProject) { // no data in table, flush all data till now to destination meter, 10sec delay pStream->interval = tsProjectExecInterval; pStream->slidingTime = tsProjectExecInterval; @@ -430,6 +421,8 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in } else { // timewindow based aggregation stream if (stime == 0) { // no data in meter till now stime = ((int64_t)taosGetTimestamp(pStream->precision) / pStream->interval) * pStream->interval; + const char* fmtts(int64_t); + printf("stream start time is: %s\n", fmtts(stime)); tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64, pSql, pStream, stime); } else { int64_t newStime = (stime / pStream->interval) * pStream->interval; @@ -523,6 +516,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); + pStream->isProject = isProjectStream(pQueryInfo); pStream->fp = fp; pStream->callback = callback; pStream->param = param; diff --git a/tests/pytest/parser/__init__.py b/tests/pytest/parser/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/pytest/parser/stream.py b/tests/pytest/parser/stream.py new file mode 100644 index 0000000000000000000000000000000000000000..ce55c54415db7aef661bf4e92b70f1047f35ef0d --- /dev/null +++ b/tests/pytest/parser/stream.py @@ -0,0 +1,205 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import time +import taos +from util.log import tdLog +from util.cases import tdCases +from util.sql import tdSql + + +class TDTestCase: + def sleep(self, duration): + tdLog.info("sleeping %f seconds" % duration) + time.sleep(duration) + + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + def tbase300(self): + tdLog.debug("begin tbase300") + + tdSql.prepare() + tdSql.execute("create table mt(ts timestamp, c1 int, c2 int) tags(t1 int)") + tdSql.execute("create table tb1 using mt tags(1)"); + tdSql.execute("create table tb2 using mt tags(2)"); + tdSql.execute("create table strm as select count(*), avg(c1), sum(c2), max(c1), min(c2),first(c1), last(c2) from mt interval(4s) sliding(2s)") + #tdSql.execute("create table strm as select count(*), avg(c1), sum(c2), max(c1), min(c2), first(c1) from mt interval(4s) sliding(2s)") + self.sleep(10) + tdSql.execute("insert into tb2 values(now, 1, 1)"); + tdSql.execute("insert into tb1 values(now, 1, 1)"); + self.sleep(4) + tdSql.query("select * from mt") + tdSql.query("select * from strm") + tdSql.execute("drop table tb1") + + tdLog.info("retrieve data from strm") + for i in range(100): + time.sleep(1) + tdSql.query("select * from strm") + if tdSql.queryRows > 0: + break + if tdSql.queryRows < 1 or tdSql.queryRows > 2: + tdLog.exit("rows should be 1 or 2") + + tdSql.execute("drop table tb2") + tdSql.execute("drop table mt") + tdSql.execute("drop table strm") + + def tbase304(self): + tdLog.debug("begin tbase304") + # we cannot reset query cache in server side, as a workaround, + # set super table name to mt304, need to change back to mt later + tdSql.execute("create table mt304 (ts timestamp, c1 int) tags(t1 int, t2 int)") + tdSql.execute("create table tb1 using mt304 tags(1, 1)") + tdSql.execute("create table tb2 using mt304 tags(1, -1)") + time.sleep(0.1) + tdSql.execute("create table strm as select count(*), avg(c1) from mt304 where t2 >= 0 interval(4s) sliding(2s)") + tdSql.execute("insert into tb1 values (now,1)") + tdSql.execute("insert into tb2 values (now,2)") + + tdLog.info("retrieve data from strm") + for i in range(100): + time.sleep(1) + tdSql.query("select * from strm") + if tdSql.queryRows > 0: + break + if tdSql.queryRows < 1 or tdSql.queryRows > 2: + tdLog.exit("rows should be 1 or 2") + + tdSql.checkData(0, 1, 1) + tdSql.checkData(0, 2, 1.000000000) + tdSql.execute("alter table mt304 drop tag t2") + tdSql.execute("insert into tb2 values (now,2)") + tdSql.execute("insert into tb1 values (now,1)") + tdSql.query("select * from strm") + tdSql.execute("alter table mt304 add tag t2 int") + self.sleep(1) + tdSql.query("select * from strm") + + def wildcardFilterOnTags(self): + tdLog.debug("begin wildcardFilterOnTag") + tdSql.prepare() + tdSql.execute("create table stb (ts timestamp, c1 int, c2 binary(10)) tags(t1 binary(10))") + tdSql.execute("create table tb1 using stb tags('a1')") + tdSql.execute("create table tb2 using stb tags('b2')") + tdSql.execute("create table tb3 using stb tags('a3')") + tdSql.execute("create table strm as select count(*), avg(c1), first(c2) from stb where t1 like 'a%' interval(4s) sliding(2s)") + tdSql.query("describe strm") + tdSql.checkRows(4) + + self.sleep(1) + tdSql.execute("insert into tb1 values (now, 0, 'tb1')") + self.sleep(4) + tdSql.execute("insert into tb2 values (now, 2, 'tb2')") + self.sleep(4) + tdSql.execute("insert into tb3 values (now, 0, 'tb3')") + + tdLog.info("retrieve data from strm") + for i in range(60): + time.sleep(1) + tdSql.query("select * from strm") + if tdSql.queryRows == 4: + break + + tdSql.checkRows(4) + tdSql.checkData(0, 2, 0.000000000) + if tdSql.getData(0, 3) == 'tb2': + tdLog.exit("unexpected value of data03") + if tdSql.getData(1, 3) == 'tb2': + tdLog.exit("unexpected value of data13") + if tdSql.getData(2, 3) == 'tb2': + tdLog.exit("unexpected value of data23") + if tdSql.getData(3, 3) == 'tb2': + tdLog.exit("unexpected value of data33") + + tdLog.info("add table tb4 to see if stream still works correctly") + # The vnode client needs to refresh metadata cache to allow strm calculate tb4's data. + # But the current refreshing frequency is every 10 min + # commented out the case below to save running time + tdSql.execute("create table tb4 using stb tags('a4')") + tdSql.execute("insert into tb4 values(now, 4, 'tb4')") + tdLog.info("retrieve data from strm order by ts desc") + for i in range(60): + time.sleep(1) + tdSql.query("select * from strm order by ts desc") + if tdSql.queryRows == 6: + break + tdSql.checkRows(6) + tdSql.checkData(0, 2, 4) + tdSql.checkData(0, 3, "tb4") + + tdLog.info("change tag values to see if stream still works correctly") + tdSql.execute("alter table tb4 set tag t1='b4'") + self.sleep(3) + tdSql.execute("insert into tb1 values (now, 1, 'tb1_a1')") + self.sleep(4) + tdSql.execute("insert into tb4 values (now, -4, 'tb4_b4')") + tdLog.info("retrieve data from strm order by ts desc") + for i in range(100): + time.sleep(1) + tdSql.query("select * from strm order by ts desc") + if tdSql.queryRows == 8: + break + tdSql.checkRows(8) + tdSql.checkData(0, 2, 1) + tdSql.checkData(0, 3, "tb1_a1") + + def datatypes(self): + tdLog.debug("begin data types") + tdSql.prepare() + tdSql.execute("create table stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(15), c6 nchar(15), c7 bool) tags(t1 int, t2 binary(15))") + tdSql.execute("create table tb0 using stb tags(0, 'tb0')") + tdSql.execute("create table tb1 using stb tags(1, 'tb1')") + tdSql.execute("create table tb2 using stb tags(2, 'tb2')") + tdSql.execute("create table tb3 using stb tags(3, 'tb3')") + tdSql.execute("create table tb4 using stb tags(4, 'tb4')") + + tdSql.execute("create table strm0 as select count(ts), count(c1), max(c2), min(c4), first(c5), last(c6) from stb where ts < now + 30s interval(4s) sliding(2s)") + #tdSql.execute("create table strm0 as select count(ts), count(c1), max(c2), min(c4), first(c5) from stb where ts < now + 30s interval(4s) sliding(2s)") + self.sleep(1) + tdSql.execute("insert into tb0 values (now, 0, 0, 0, 0, 'binary0', '涛思0', true) tb1 values (now, 1, 1, 1, 1, 'binary1', '涛思1', false) tb2 values (now, 2, 2, 2, 2, 'binary2', '涛思2', true) tb3 values (now, 3, 3, 3, 3, 'binary3', '涛思3', false) tb4 values (now, 4, 4, 4, 4, 'binary4', '涛思4', true) ") + + tdLog.info("retrieve data from strm0") + for i in range(60): + time.sleep(1) + tdSql.query("select * from strm0 order by ts desc") + if tdSql.queryRows == 2: + break + tdSql.checkRows(2) + + tdSql.execute("insert into tb0 values (now, 10, 10, 10, 10, 'binary0', '涛思0', true) tb1 values (now, 11, 11, 11, 11, 'binary1', '涛思1', false) tb2 values (now, 12, 12, 12, 12, 'binary2', '涛思2', true) tb3 values (now, 13, 13, 13, 13, 'binary3', '涛思3', false) tb4 values (now, 14, 14, 14, 14, 'binary4', '涛思4', true) ") + tdLog.info("retrieve data from strm0 again") + for i in range(60): + time.sleep(1) + tdSql.query("select * from strm0 order by ts desc") + if tdSql.queryRows == 4: + break + tdSql.checkRows(4) + + def run(self): + self.tbase300() + #time.sleep(10) + self.tbase304() + #self.wildcardFilterOnTags() + self.datatypes() + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase())