diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index c39ab23811037819eb19c48359bbbe2da6556d75..f214e76cc7604abc9db73d5727917830b85622ef 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -121,7 +121,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { pQueryInfo->window.ekey = pStream->etime; } } else { - pQueryInfo->window.skey = pStream->stime;// - pStream->interval; + pQueryInfo->window.skey = pStream->stime - pStream->interval; int64_t etime = taosGetTimestamp(pStream->precision); // delay to wait all data in last time window if (pStream->precision == TSDB_TIME_PRECISION_MICRO) { @@ -150,7 +150,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf SSqlStream *pStream = (SSqlStream *)param; if (tres == NULL || numOfRows < 0) { int64_t retryDelay = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); - tscError("%p stream:%p, query data failed, code:%d, retry in %" PRId64 "ms", pStream->pSql, pStream, numOfRows, + tscError("%p stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql, pStream, numOfRows, retryDelay); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0); @@ -211,7 +211,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf if (pSql == NULL || numOfRows < 0) { int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); - tscError("%p stream:%p, retrieve data failed, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime); + tscError("%p stream:%p, retrieve data failed, code:0x%08x, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime); tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime); return; @@ -240,7 +240,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf /* no resuls in the query range, retry */ // todo set retry dynamic time int32_t retry = tsProjectExecInterval; - tscError("%p stream:%p, retrieve no data, code:%d, retry in %" PRId32 "ms", pSql, pStream, numOfRows, retry); + tscError("%p stream:%p, retrieve no data, code:0x%08x, retry in %" PRId32 "ms", pSql, pStream, numOfRows, retry); tscSetRetryTimer(pStream, pStream->pSql, retry); return; @@ -487,7 +487,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream)); if (pStream == NULL) { - tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code); + tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, sqlstr, pCmd->payload, pRes->code); tscFreeSqlObj(pSql); return NULL; } @@ -512,7 +512,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p if (pRes->code != TSDB_CODE_SUCCESS) { setErrorInfo(pSql, pRes->code, pCmd->payload); - tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code); + tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, sqlstr, pCmd->payload, pRes->code); tscFreeSqlObj(pSql); return NULL; } @@ -564,6 +564,8 @@ void taos_close_stream(TAOS_STREAM *handle) { taosTmrStopA(&(pStream->pTimer)); tscDebug("%p stream:%p is closed", pSql, pStream); + // notify CQ to release the pStream object + pStream->fp(pStream->param, NULL, NULL); tscFreeSqlObj(pSql); pStream->pSql = NULL; diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 4cbe173c41761d0429a8061123b9d2d6c2bf1f50..dfa3eee38ea5a3c473a01d298be8294b0515b6a2 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -244,6 +244,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { SCqObj *pObj = (SCqObj *)param; + if (tres == NULL && row == NULL) { + pObj->pStream = NULL; + return; + } SCqContext *pContext = pObj->pContext; STSchema *pSchema = pObj->pSchema; if (pObj->pStream == NULL) return; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index b550106c7047191c22287f6bbdbda875df75dc7d..3d7f6f95a00bb6baea785766c47d2d74db7d6d06 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -5822,7 +5822,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ qDebug("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, pQuery->window.ekey, pQuery->order.order); setQueryStatus(pQuery, QUERY_COMPLETED); - + pQInfo->tableqinfoGroupInfo.numOfTables = 0; sem_post(&pQInfo->dataReady); return TSDB_CODE_SUCCESS; } diff --git a/tests/pytest/stream/metric_1.py b/tests/pytest/stream/metric_1.py new file mode 100644 index 0000000000000000000000000000000000000000..b4cccac69c8afe9c637b7a455732572c029258a7 --- /dev/null +++ b/tests/pytest/stream/metric_1.py @@ -0,0 +1,104 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import time +import taos +from util.log import tdLog +from util.cases import tdCases +from util.sql import tdSql + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + def createFuncStream(self, expr, suffix, value): + tbname = "strm_" + suffix + tdLog.info("create stream table %s" % tbname) + tdSql.query("select %s from stb interval(1d)" % expr) + tdSql.checkData(0, 1, value) + tdSql.execute("create table %s as select %s from stb interval(1d)" % (tbname, expr)) + + def checkStreamData(self, suffix, value): + sql = "select * from strm_" + suffix + tdSql.waitedQuery(sql, 1, 120) + tdSql.checkData(0, 1, value) + + def run(self): + tbNum = 10 + rowNum = 20 + + tdSql.prepare() + + tdLog.info("===== preparing data =====") + tdSql.execute( + "create table stb(ts timestamp, tbcol int, tbcol2 float) tags(tgcol int)") + for i in range(tbNum): + tdSql.execute("create table tb%d using stb tags(%d)" % (i, i)) + for j in range(rowNum): + tdSql.execute( + "insert into tb%d values (now - %dm, %d, %d)" % + (i, 1440 - j, j, j)) + time.sleep(0.1) + + self.createFuncStream("count(*)", "c1", 200) + self.createFuncStream("count(tbcol)", "c2", 200) + self.createFuncStream("count(tbcol2)", "c3", 200) + self.createFuncStream("avg(tbcol)", "av", 9.5) + self.createFuncStream("sum(tbcol)", "su", 1900) + self.createFuncStream("min(tbcol)", "mi", 0) + self.createFuncStream("max(tbcol)", "ma", 19) + self.createFuncStream("first(tbcol)", "fi", 0) + self.createFuncStream("last(tbcol)", "la", 19) + #tdSql.query("select stddev(tbcol) from stb interval(1d)") + #tdSql.query("select leastsquares(tbcol, 1, 1) from stb interval(1d)") + tdSql.query("select top(tbcol, 1) from stb interval(1d)") + tdSql.query("select bottom(tbcol, 1) from stb interval(1d)") + #tdSql.query("select percentile(tbcol, 1) from stb interval(1d)") + #tdSql.query("select diff(tbcol) from stb interval(1d)") + + tdSql.query("select count(tbcol) from stb where ts < now + 4m interval(1d)") + tdSql.checkData(0, 1, 200) + #tdSql.execute("create table strm_wh as select count(tbcol) from stb where ts < now + 4m interval(1d)") + + self.createFuncStream("count(tbcol)", "as", 200) + + tdSql.query("select count(tbcol) from stb interval(1d) group by tgcol") + tdSql.checkData(0, 1, 20) + + tdSql.query("select count(tbcol) from stb where ts < now + 4m interval(1d) group by tgcol") + tdSql.checkData(0, 1, 20) + + self.checkStreamData("c1", 200) + self.checkStreamData("c2", 200) + self.checkStreamData("c3", 200) + self.checkStreamData("av", 9.5) + self.checkStreamData("su", 1900) + self.checkStreamData("mi", 0) + self.checkStreamData("ma", 19) + self.checkStreamData("fi", 0) + self.checkStreamData("la", 19) + #self.checkStreamData("wh", 200) + self.checkStreamData("as", 200) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) + + diff --git a/tests/pytest/stream/parser.py b/tests/pytest/stream/parser.py index 7a89b926296ebdb12fad56ba17ef0762ed325c13..3b231d2b391a8a5a92cb8924134555117c5bfed2 100644 --- a/tests/pytest/stream/parser.py +++ b/tests/pytest/stream/parser.py @@ -148,23 +148,23 @@ class TDTestCase: def datatypes(self): tdLog.debug("begin data types") tdSql.prepare() - tdSql.execute("create table stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(15), c6 nchar(15), c7 bool) tags(t1 int, t2 binary(15))") - tdSql.execute("create table tb0 using stb tags(0, 'tb0')") - tdSql.execute("create table tb1 using stb tags(1, 'tb1')") - tdSql.execute("create table tb2 using stb tags(2, 'tb2')") - tdSql.execute("create table tb3 using stb tags(3, 'tb3')") - tdSql.execute("create table tb4 using stb tags(4, 'tb4')") - - tdSql.execute("create table strm0 as select count(ts), count(c1), max(c2), min(c4), first(c5), last(c6) from stb where ts < now + 30s interval(4s) sliding(2s)") + tdSql.execute("create table stb3 (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(15), c6 nchar(15), c7 bool) tags(t1 int, t2 binary(15))") + tdSql.execute("create table tb0 using stb3 tags(0, 'tb0')") + tdSql.execute("create table tb1 using stb3 tags(1, 'tb1')") + tdSql.execute("create table tb2 using stb3 tags(2, 'tb2')") + tdSql.execute("create table tb3 using stb3 tags(3, 'tb3')") + tdSql.execute("create table tb4 using stb3 tags(4, 'tb4')") + + tdSql.execute("create table strm0 as select count(ts), count(c1), max(c2), min(c4), first(c5), last(c6) from stb3 where ts < now + 30s interval(4s) sliding(2s)") #tdSql.execute("create table strm0 as select count(ts), count(c1), max(c2), min(c4), first(c5) from stb where ts < now + 30s interval(4s) sliding(2s)") tdLog.sleep(1) tdSql.execute("insert into tb0 values (now, 0, 0, 0, 0, 'binary0', '涛思0', true) tb1 values (now, 1, 1, 1, 1, 'binary1', '涛思1', false) tb2 values (now, 2, 2, 2, 2, 'binary2', '涛思2', true) tb3 values (now, 3, 3, 3, 3, 'binary3', '涛思3', false) tb4 values (now, 4, 4, 4, 4, 'binary4', '涛思4', true) ") - tdSql.waitedQuery("select * from strm0 order by ts desc", 2, 60) + tdSql.waitedQuery("select * from strm0 order by ts desc", 2, 120) tdSql.checkRows(2) tdSql.execute("insert into tb0 values (now, 10, 10, 10, 10, 'binary0', '涛思0', true) tb1 values (now, 11, 11, 11, 11, 'binary1', '涛思1', false) tb2 values (now, 12, 12, 12, 12, 'binary2', '涛思2', true) tb3 values (now, 13, 13, 13, 13, 'binary3', '涛思3', false) tb4 values (now, 14, 14, 14, 14, 'binary4', '涛思4', true) ") - tdSql.waitedQuery("select * from strm0 order by ts desc", 4, 60) + tdSql.waitedQuery("select * from strm0 order by ts desc", 4, 120) tdSql.checkRows(4) def run(self):