From c096da449e4545646a2e80f0328a6405544c09cc Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Thu, 9 Jul 2020 09:19:39 +0800 Subject: [PATCH] fix several bug in stream --- src/client/src/tscStream.c | 14 +++-- src/cq/src/cqMain.c | 4 ++ src/query/src/qExecutor.c | 2 +- tests/pytest/stream/metric_1.py | 104 ++++++++++++++++++++++++++++++++ tests/pytest/stream/parser.py | 20 +++--- 5 files changed, 127 insertions(+), 17 deletions(-) create mode 100644 tests/pytest/stream/metric_1.py diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index c39ab23811..f214e76cc7 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -121,7 +121,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { pQueryInfo->window.ekey = pStream->etime; } } else { - pQueryInfo->window.skey = pStream->stime;// - pStream->interval; + pQueryInfo->window.skey = pStream->stime - pStream->interval; int64_t etime = taosGetTimestamp(pStream->precision); // delay to wait all data in last time window if (pStream->precision == TSDB_TIME_PRECISION_MICRO) { @@ -150,7 +150,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf SSqlStream *pStream = (SSqlStream *)param; if (tres == NULL || numOfRows < 0) { int64_t retryDelay = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); - tscError("%p stream:%p, query data failed, code:%d, retry in %" PRId64 "ms", pStream->pSql, pStream, numOfRows, + tscError("%p stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql, pStream, numOfRows, retryDelay); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0); @@ -211,7 +211,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf if (pSql == NULL || numOfRows < 0) { int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); - tscError("%p stream:%p, retrieve data failed, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime); + tscError("%p stream:%p, retrieve data failed, code:0x%08x, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime); tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime); return; @@ -240,7 +240,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf /* no resuls in the query range, retry */ // todo set retry dynamic time int32_t retry = tsProjectExecInterval; - tscError("%p stream:%p, retrieve no data, code:%d, retry in %" PRId32 "ms", pSql, pStream, numOfRows, retry); + tscError("%p stream:%p, retrieve no data, code:0x%08x, retry in %" PRId32 "ms", pSql, pStream, numOfRows, retry); tscSetRetryTimer(pStream, pStream->pSql, retry); return; @@ -487,7 +487,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream)); if (pStream == NULL) { - tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code); + tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, sqlstr, pCmd->payload, pRes->code); tscFreeSqlObj(pSql); return NULL; } @@ -512,7 +512,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p if (pRes->code != TSDB_CODE_SUCCESS) { setErrorInfo(pSql, pRes->code, pCmd->payload); - tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code); + tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, sqlstr, pCmd->payload, pRes->code); tscFreeSqlObj(pSql); return NULL; } @@ -564,6 +564,8 @@ void taos_close_stream(TAOS_STREAM *handle) { taosTmrStopA(&(pStream->pTimer)); tscDebug("%p stream:%p is closed", pSql, pStream); + // notify CQ to release the pStream object + pStream->fp(pStream->param, NULL, NULL); tscFreeSqlObj(pSql); pStream->pSql = NULL; diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 4cbe173c41..dfa3eee38e 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -244,6 +244,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { SCqObj *pObj = (SCqObj *)param; + if (tres == NULL && row == NULL) { + pObj->pStream = NULL; + return; + } SCqContext *pContext = pObj->pContext; STSchema *pSchema = pObj->pSchema; if (pObj->pStream == NULL) return; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index b550106c70..3d7f6f95a0 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -5822,7 +5822,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ qDebug("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, pQuery->window.ekey, pQuery->order.order); setQueryStatus(pQuery, QUERY_COMPLETED); - + pQInfo->tableqinfoGroupInfo.numOfTables = 0; sem_post(&pQInfo->dataReady); return TSDB_CODE_SUCCESS; } diff --git a/tests/pytest/stream/metric_1.py b/tests/pytest/stream/metric_1.py new file mode 100644 index 0000000000..b4cccac69c --- /dev/null +++ b/tests/pytest/stream/metric_1.py @@ -0,0 +1,104 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import time +import taos +from util.log import tdLog +from util.cases import tdCases +from util.sql import tdSql + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + def createFuncStream(self, expr, suffix, value): + tbname = "strm_" + suffix + tdLog.info("create stream table %s" % tbname) + tdSql.query("select %s from stb interval(1d)" % expr) + tdSql.checkData(0, 1, value) + tdSql.execute("create table %s as select %s from stb interval(1d)" % (tbname, expr)) + + def checkStreamData(self, suffix, value): + sql = "select * from strm_" + suffix + tdSql.waitedQuery(sql, 1, 120) + tdSql.checkData(0, 1, value) + + def run(self): + tbNum = 10 + rowNum = 20 + + tdSql.prepare() + + tdLog.info("===== preparing data =====") + tdSql.execute( + "create table stb(ts timestamp, tbcol int, tbcol2 float) tags(tgcol int)") + for i in range(tbNum): + tdSql.execute("create table tb%d using stb tags(%d)" % (i, i)) + for j in range(rowNum): + tdSql.execute( + "insert into tb%d values (now - %dm, %d, %d)" % + (i, 1440 - j, j, j)) + time.sleep(0.1) + + self.createFuncStream("count(*)", "c1", 200) + self.createFuncStream("count(tbcol)", "c2", 200) + self.createFuncStream("count(tbcol2)", "c3", 200) + self.createFuncStream("avg(tbcol)", "av", 9.5) + self.createFuncStream("sum(tbcol)", "su", 1900) + self.createFuncStream("min(tbcol)", "mi", 0) + self.createFuncStream("max(tbcol)", "ma", 19) + self.createFuncStream("first(tbcol)", "fi", 0) + self.createFuncStream("last(tbcol)", "la", 19) + #tdSql.query("select stddev(tbcol) from stb interval(1d)") + #tdSql.query("select leastsquares(tbcol, 1, 1) from stb interval(1d)") + tdSql.query("select top(tbcol, 1) from stb interval(1d)") + tdSql.query("select bottom(tbcol, 1) from stb interval(1d)") + #tdSql.query("select percentile(tbcol, 1) from stb interval(1d)") + #tdSql.query("select diff(tbcol) from stb interval(1d)") + + tdSql.query("select count(tbcol) from stb where ts < now + 4m interval(1d)") + tdSql.checkData(0, 1, 200) + #tdSql.execute("create table strm_wh as select count(tbcol) from stb where ts < now + 4m interval(1d)") + + self.createFuncStream("count(tbcol)", "as", 200) + + tdSql.query("select count(tbcol) from stb interval(1d) group by tgcol") + tdSql.checkData(0, 1, 20) + + tdSql.query("select count(tbcol) from stb where ts < now + 4m interval(1d) group by tgcol") + tdSql.checkData(0, 1, 20) + + self.checkStreamData("c1", 200) + self.checkStreamData("c2", 200) + self.checkStreamData("c3", 200) + self.checkStreamData("av", 9.5) + self.checkStreamData("su", 1900) + self.checkStreamData("mi", 0) + self.checkStreamData("ma", 19) + self.checkStreamData("fi", 0) + self.checkStreamData("la", 19) + #self.checkStreamData("wh", 200) + self.checkStreamData("as", 200) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) + + diff --git a/tests/pytest/stream/parser.py b/tests/pytest/stream/parser.py index 7a89b92629..3b231d2b39 100644 --- a/tests/pytest/stream/parser.py +++ b/tests/pytest/stream/parser.py @@ -148,23 +148,23 @@ class TDTestCase: def datatypes(self): tdLog.debug("begin data types") tdSql.prepare() - tdSql.execute("create table stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(15), c6 nchar(15), c7 bool) tags(t1 int, t2 binary(15))") - tdSql.execute("create table tb0 using stb tags(0, 'tb0')") - tdSql.execute("create table tb1 using stb tags(1, 'tb1')") - tdSql.execute("create table tb2 using stb tags(2, 'tb2')") - tdSql.execute("create table tb3 using stb tags(3, 'tb3')") - tdSql.execute("create table tb4 using stb tags(4, 'tb4')") - - tdSql.execute("create table strm0 as select count(ts), count(c1), max(c2), min(c4), first(c5), last(c6) from stb where ts < now + 30s interval(4s) sliding(2s)") + tdSql.execute("create table stb3 (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(15), c6 nchar(15), c7 bool) tags(t1 int, t2 binary(15))") + tdSql.execute("create table tb0 using stb3 tags(0, 'tb0')") + tdSql.execute("create table tb1 using stb3 tags(1, 'tb1')") + tdSql.execute("create table tb2 using stb3 tags(2, 'tb2')") + tdSql.execute("create table tb3 using stb3 tags(3, 'tb3')") + tdSql.execute("create table tb4 using stb3 tags(4, 'tb4')") + + tdSql.execute("create table strm0 as select count(ts), count(c1), max(c2), min(c4), first(c5), last(c6) from stb3 where ts < now + 30s interval(4s) sliding(2s)") #tdSql.execute("create table strm0 as select count(ts), count(c1), max(c2), min(c4), first(c5) from stb where ts < now + 30s interval(4s) sliding(2s)") tdLog.sleep(1) tdSql.execute("insert into tb0 values (now, 0, 0, 0, 0, 'binary0', '涛思0', true) tb1 values (now, 1, 1, 1, 1, 'binary1', '涛思1', false) tb2 values (now, 2, 2, 2, 2, 'binary2', '涛思2', true) tb3 values (now, 3, 3, 3, 3, 'binary3', '涛思3', false) tb4 values (now, 4, 4, 4, 4, 'binary4', '涛思4', true) ") - tdSql.waitedQuery("select * from strm0 order by ts desc", 2, 60) + tdSql.waitedQuery("select * from strm0 order by ts desc", 2, 120) tdSql.checkRows(2) tdSql.execute("insert into tb0 values (now, 10, 10, 10, 10, 'binary0', '涛思0', true) tb1 values (now, 11, 11, 11, 11, 'binary1', '涛思1', false) tb2 values (now, 12, 12, 12, 12, 'binary2', '涛思2', true) tb3 values (now, 13, 13, 13, 13, 'binary3', '涛思3', false) tb4 values (now, 14, 14, 14, 14, 'binary4', '涛思4', true) ") - tdSql.waitedQuery("select * from strm0 order by ts desc", 4, 60) + tdSql.waitedQuery("select * from strm0 order by ts desc", 4, 120) tdSql.checkRows(4) def run(self): -- GitLab