diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ae25e1bffd8892a3014dfd18167d6e8a038d27b3..8aaf9a79dc5af256cfe089d8fc5f7b12856d2e71 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -84,15 +84,16 @@ typedef struct { } SStreamCheckpoint; static FORCE_INLINE SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) { - SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosMemoryCalloc(1, sizeof(SStreamDataSubmit)); + SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); if (pDataSubmit == NULL) return NULL; - pDataSubmit->data = pReq; pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t)); - if (pDataSubmit->data == NULL) goto FAIL; + if (pDataSubmit->dataRef == NULL) goto FAIL; + pDataSubmit->data = pReq; *pDataSubmit->dataRef = 1; + pDataSubmit->type = STREAM_INPUT__DATA_SUBMIT; return pDataSubmit; FAIL: - taosMemoryFree(pDataSubmit); + taosFreeQitem(pDataSubmit); return NULL; } @@ -107,7 +108,6 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) if (ref == 0) { taosMemoryFree(pDataSubmit->data); taosMemoryFree(pDataSubmit->dataRef); - taosFreeQitem(pDataSubmit); } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7bfb43bfb16cb016a5fc6eadc9d577a3dbd1d9eb..9941b00ff73cbe95a9a7ead1baf619f910bdef18 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -829,27 +829,15 @@ FAIL: } int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { - void* pIter = NULL; - bool failed = false; + void* pIter = NULL; + bool failed = false; + SStreamDataSubmit* pSubmit = NULL; - SStreamDataSubmit* pSubmit = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); + pSubmit = streamDataSubmitNew(pReq); if (pSubmit == NULL) { failed = true; - goto SET_TASK_FAIL; - } - pSubmit->dataRef = taosMemoryMalloc(sizeof(int32_t)); - if (pSubmit->dataRef == NULL) { - failed = true; - goto SET_TASK_FAIL; } - pSubmit->type = STREAM_INPUT__DATA_SUBMIT; - /*pSubmit->sourceVer = ver;*/ - /*pSubmit->sourceVg = pTq->pVnode->config.vgId;*/ - pSubmit->data = pReq; - *pSubmit->dataRef = 1; - -SET_TASK_FAIL: while (1) { pIter = taosHashIterate(pTq->pStreamTasks, pIter); if (pIter == NULL) break; @@ -864,7 +852,9 @@ SET_TASK_FAIL: } streamDataSubmitRefInc(pSubmit); - taosWriteQitem(pTask->inputQ, pSubmit); + SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); + memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit)); + taosWriteQitem(pTask->inputQ, pSubmitClone); int8_t execStatus = atomic_load_8(&pTask->status); if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) { @@ -887,18 +877,12 @@ SET_TASK_FAIL: } } - if (!failed) { + if (pSubmit) { streamDataSubmitRefDec(pSubmit); - return 0; - } else { - if (pSubmit) { - if (pSubmit->dataRef) { - taosMemoryFree(pSubmit->dataRef); - } - taosFreeQitem(pSubmit); - } - return -1; + taosFreeQitem(pSubmit); } + + return failed ? -1 : 0; } int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 775a185da7b6304e2f9f03d248336db170a82f55..67f7a44a719d24a065dbe4c3a81cc46a1324dc8d 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -166,6 +166,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) // destroy if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) { streamDataSubmitRefDec((SStreamDataSubmit*)data); + taosFreeQitem(data); } else { taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock); taosFreeQitem(data); @@ -173,6 +174,25 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) return 0; } +static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { + while (1) { + void* data = NULL; + taosGetQitem(pTask->inputQAll, &data); + if (data == NULL) break; + + streamTaskExecImpl(pTask, data, pRes); + + if (taosArrayGetSize(pRes) != 0) { + SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + qRes->type = STREAM_INPUT__DATA_BLOCK; + qRes->blocks = pRes; + taosWriteQitem(pTask->outputQ, qRes); + return taosArrayInit(0, sizeof(SSDataBlock)); + } + } + return pRes; +} + // TODO: handle version int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) { SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); @@ -182,88 +202,21 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) { void* exec = pTask->exec.executor; if (execStatus == TASK_STATUS__IDLE) { // first run, from qall, handle failure from last exec - while (1) { - void* data = NULL; - taosGetQitem(pTask->inputQAll, &data); - if (data == NULL) break; - - streamTaskExecImpl(pTask, data, pRes); - - /*taosFreeQitem(data);*/ - - if (taosArrayGetSize(pRes) != 0) { - SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); - resQ->type = STREAM_INPUT__DATA_BLOCK; - resQ->blocks = pRes; - taosWriteQitem(pTask->outputQ, resQ); - pRes = taosArrayInit(0, sizeof(SSDataBlock)); - if (pRes == NULL) goto FAIL; - } - } + pRes = streamExecForQall(pTask, pRes); + if (pRes == NULL) goto FAIL; + // second run, from inputQ taosReadAllQitems(pTask->inputQ, pTask->inputQAll); - while (1) { - void* data = NULL; - taosGetQitem(pTask->inputQAll, &data); - if (data == NULL) break; - - streamTaskExecImpl(pTask, data, pRes); - - /*taosFreeQitem(data);*/ - - if (taosArrayGetSize(pRes) != 0) { - SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); - resQ->type = STREAM_INPUT__DATA_BLOCK; - resQ->blocks = pRes; - taosWriteQitem(pTask->outputQ, resQ); - pRes = taosArrayInit(0, sizeof(SSDataBlock)); - if (pRes == NULL) goto FAIL; - } - } - // set status closing - atomic_store_8(&pTask->status, TASK_STATUS__CLOSING); - // third run, make sure all inputQ is cleared - taosReadAllQitems(pTask->inputQ, pTask->inputQAll); - while (1) { - void* data = NULL; - taosGetQitem(pTask->inputQAll, &data); - if (data == NULL) break; - - streamTaskExecImpl(pTask, data, pRes); - - /*taosFreeQitem(data);*/ - - if (taosArrayGetSize(pRes) != 0) { - SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); - resQ->type = STREAM_INPUT__DATA_BLOCK; - resQ->blocks = pRes; - taosWriteQitem(pTask->outputQ, resQ); - pRes = taosArrayInit(0, sizeof(SSDataBlock)); - if (pRes == NULL) goto FAIL; - } - } + pRes = streamExecForQall(pTask, pRes); + if (pRes == NULL) goto FAIL; + // set status closing atomic_store_8(&pTask->status, TASK_STATUS__CLOSING); - // third run, make sure all inputQ is cleared + + // third run, make sure inputQ and qall are cleared taosReadAllQitems(pTask->inputQ, pTask->inputQAll); - while (1) { - void* data = NULL; - taosGetQitem(pTask->inputQAll, &data); - if (data == NULL) break; - - streamTaskExecImpl(pTask, data, pRes); - - taosFreeQitem(data); - - if (taosArrayGetSize(pRes) != 0) { - SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); - resQ->type = STREAM_INPUT__DATA_BLOCK; - resQ->blocks = pRes; - taosWriteQitem(pTask->outputQ, resQ); - pRes = taosArrayInit(0, sizeof(SSDataBlock)); - if (pRes == NULL) goto FAIL; - } - } + pRes = streamExecForQall(pTask, pRes); + if (pRes == NULL) goto FAIL; atomic_store_8(&pTask->status, TASK_STATUS__IDLE); break; diff --git a/tests/pytest/cq.py b/tests/pytest/cq.py deleted file mode 100644 index 7778969619f2d0679c2596581d8d76101d41ed9f..0000000000000000000000000000000000000000 --- a/tests/pytest/cq.py +++ /dev/null @@ -1,169 +0,0 @@ -################################################################### -# Copyright (c) 2016 by TAOS Technologies, Inc. -# All rights reserved. -# -# This file is proprietary and confidential to TAOS Technologies. -# No part of this file may be reproduced, stored, transmitted, -# disclosed or used in any form or by any means other than as -# expressly provided by the written permission from Jianhui Tao -# -################################################################### - -# -*- coding: utf-8 -*- -import threading -import taos -import sys -import json -import time -import random -# query sql -query_sql = [ -# first supertable -"select count(*) from test.meters ;", -"select count(*) from test.meters where t3 > 2;", -"select count(*) from test.meters where ts <> '2020-05-13 10:00:00.002';", -"select count(*) from test.meters where t7 like 'taos_1%';", -"select count(*) from test.meters where t7 like '_____2';", -"select count(*) from test.meters where t8 like '%思%';", -"select count(*) from test.meters interval(1n) order by ts desc;", -#"select max(c0) from test.meters group by tbname", -"select first(ts) from test.meters where t5 >5000 and t5<5100;", -"select last(ts) from test.meters where t5 >5000 and t5<5100;", -"select last_row(*) from test.meters;", -"select twa(c1) from test.t1 where ts > 1500000001000 and ts < 1500000101000" , -"select avg(c1) from test.meters where t5 >5000 and t5<5100;", -"select bottom(c1, 2) from test.t1;", -"select diff(c1) from test.t1;", -"select leastsquares(c1, 1, 1) from test.t1 ;", -"select max(c1) from test.meters where t5 >5000 and t5<5100;", -"select min(c1) from test.meters where t5 >5000 and t5<5100;", -"select c1 + c2 + c1 / c5 + c4 + c2 from test.t1;", -"select percentile(c1, 50) from test.t1;", -"select spread(c1) from test.t1 ;", -"select stddev(c1) from test.t1;", -"select sum(c1) from test.meters where t5 >5000 and t5<5100;", -"select top(c1, 2) from test.meters where t5 >5000 and t5<5100;" -"select twa(c4) from test.t1 where ts > 1500000001000 and ts < 1500000101000" , -"select avg(c4) from test.meters where t5 >5000 and t5<5100;", -"select bottom(c4, 2) from test.t1 where t5 >5000 and t5<5100;", -"select diff(c4) from test.t1 where t5 >5000 and t5<5100;", -"select leastsquares(c4, 1, 1) from test.t1 ;", -"select max(c4) from test.meters where t5 >5000 and t5<5100;", -"select min(c4) from test.meters where t5 >5000 and t5<5100;", -"select c5 + c2 + c4 / c5 + c4 + c2 from test.t1 ;", -"select percentile(c5, 50) from test.t1;", -"select spread(c5) from test.t1 ;", -"select stddev(c5) from test.t1 where t5 >5000 and t5<5100;", -"select sum(c5) from test.meters where t5 >5000 and t5<5100;", -"select top(c5, 2) from test.meters where t5 >5000 and t5<5100;", -#all vnode -"select count(*) from test.meters where t5 >5000 and t5<5100", -"select max(c0),avg(c1) from test.meters where t5 >5000 and t5<5100", -"select sum(c5),avg(c1) from test.meters where t5 >5000 and t5<5100", -"select max(c0),min(c5) from test.meters where t5 >5000 and t5<5100", -"select min(c0),avg(c5) from test.meters where t5 >5000 and t5<5100", -# second supertable -"select count(*) from test.meters1 where t3 > 2;", -"select count(*) from test.meters1 where ts <> '2020-05-13 10:00:00.002';", -"select count(*) from test.meters where t7 like 'taos_1%';", -"select count(*) from test.meters where t7 like '_____2';", -"select count(*) from test.meters where t8 like '%思%';", -"select count(*) from test.meters1 interval(1n) order by ts desc;", -#"select max(c0) from test.meters1 group by tbname", -"select first(ts) from test.meters1 where t5 >5000 and t5<5100;", -"select last(ts) from test.meters1 where t5 >5000 and t5<5100;", -"select last_row(*) from test.meters1 ;", -"select twa(c1) from test.m1 where ts > 1500000001000 and ts < 1500000101000" , -"select avg(c1) from test.meters1 where t5 >5000 and t5<5100;", -"select bottom(c1, 2) from test.m1 where t5 >5000 and t5<5100;", -"select diff(c1) from test.m1 ;", -"select leastsquares(c1, 1, 1) from test.m1 ;", -"select max(c1) from test.meters1 where t5 >5000 and t5<5100;", -"select min(c1) from test.meters1 where t5 >5000 and t5<5100;", -"select c1 + c2 + c1 / c0 + c2 from test.m1 ;", -"select percentile(c1, 50) from test.m1;", -"select spread(c1) from test.m1 ;", -"select stddev(c1) from test.m1;", -"select sum(c1) from test.meters1 where t5 >5000 and t5<5100;", -"select top(c1, 2) from test.meters1 where t5 >5000 and t5<5100;", -"select twa(c5) from test.m1 where ts > 1500000001000 and ts < 1500000101000" , -"select avg(c5) from test.meters1 where t5 >5000 and t5<5100;", -"select bottom(c5, 2) from test.m1;", -"select diff(c5) from test.m1;", -"select leastsquares(c5, 1, 1) from test.m1 ;", -"select max(c5) from test.meters1 where t5 >5000 and t5<5100;", -"select min(c5) from test.meters1 where t5 >5000 and t5<5100;", -"select c5 + c2 + c4 / c5 + c0 from test.m1;", -"select percentile(c4, 50) from test.m1;", -"select spread(c4) from test.m1 ;", -"select stddev(c4) from test.m1;", -"select sum(c4) from test.meters1 where t5 >5100 and t5<5300;", -"select top(c4, 2) from test.meters1 where t5 >5100 and t5<5300;", -"select count(*) from test.meters1 where t5 >5100 and t5<5300", -#all vnode -"select count(*) from test.meters1 where t5 >5100 and t5<5300", -"select max(c0),avg(c1) from test.meters1 where t5 >5000 and t5<5100", -"select sum(c5),avg(c1) from test.meters1 where t5 >5000 and t5<5100", -"select max(c0),min(c5) from test.meters1 where t5 >5000 and t5<5100", -"select min(c0),avg(c5) from test.meters1 where t5 >5000 and t5<5100", -#join -# "select * from meters,meters1 where meters.ts = meters1.ts and meters.t5 = meters1.t5", -# "select * from meters,meters1 where meters.ts = meters1.ts and meters.t7 = meters1.t7", -# "select * from meters,meters1 where meters.ts = meters1.ts and meters.t8 = meters1.t8", -# "select meters.ts,meters1.c2 from meters,meters1 where meters.ts = meters1.ts and meters.t8 = meters1.t8" -] - -class ConcurrentInquiry: - def initConnection(self): - self.numOfTherads = 50 - self.ts=1500000001000 - - def SetThreadsNum(self,num): - self.numOfTherads=num - def query_thread(self,threadID): - host = "10.211.55.14" - user = "root" - password = "taosdata" - conn = taos.connect( - host, - user, - password, - ) - cl = conn.cursor() - cl.execute("use test;") - - print("Thread %d: starting" % threadID) - - while True: - ran_query_sql=query_sql - random.shuffle(ran_query_sql) - for i in ran_query_sql: - print("Thread %d : %s"% (threadID,i)) - try: - start = time.time() - cl.execute(i) - cl.fetchall() - end = time.time() - print("time cost :",end-start) - except Exception as e: - print( - "Failure thread%d, sql: %s,exception: %s" % - (threadID, str(i),str(e))) - exit(-1) - - - print("Thread %d: finishing" % threadID) - - - - def run(self): - - threads = [] - for i in range(self.numOfTherads): - thread = threading.Thread(target=self.query_thread, args=(i,)) - threads.append(thread) - thread.start() - -q = ConcurrentInquiry() -q.initConnection() -q.run() diff --git a/tests/pytest/stream/test2.py b/tests/pytest/stream/test2.py new file mode 100644 index 0000000000000000000000000000000000000000..a441174722047d7fb7819f535fe7b6c7bf55380f --- /dev/null +++ b/tests/pytest/stream/test2.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- + +import sys +from util.log import * +from util.cases import * +from util.sql import * +from util.common import tdCom +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + def run(self): + #for i in range(100): + tdSql.prepare() + dbname = tdCom.getLongName(10, "letters") + tdSql.execute('show databases') + tdSql.execute('drop database if exists ttxkbrzmpo') + tdSql.execute('create database if not exists ttxkbrzmpo vgroups 1') + tdSql.execute('use ttxkbrzmpo') + tdSql.execute('create table if not exists downsampling_stb (ts timestamp, c1 int, c2 double, c3 varchar(100), c4 bool) tags (t1 int, t2 double, t3 varchar(100), t4 bool);') + tdSql.execute('create table downsampling_ct1 using downsampling_stb tags(10, 10.1, "Beijing", True);') + tdSql.execute('create table if not exists scalar_stb (ts timestamp, c1 int, c2 double, c3 binary(20), c4 nchar(20), c5 nchar(20)) tags (t1 int);') + tdSql.execute('create table scalar_ct1 using scalar_stb tags(10);') + tdSql.execute('create stream downsampling_stream into output_downsampling_stb as select _wstartts AS start, min(c1), max(c2), sum(c1) from downsampling_stb interval(10m);') + tdSql.execute('insert into downsampling_ct1 values (1653547828591, 100, 100.1, "Beijing", True);') + tdSql.execute('insert into downsampling_ct1 values (1653547828591+1s, -100, -100.1, "Tianjin", False);') + tdSql.execute('insert into downsampling_ct1 values (1653547828591+2s, 50, 50.3, "HeBei", False);') + tdSql.execute('select * from output_downsampling_stb;') + tdSql.execute('select start, `min(c1)`, `max(c2)`, `sum(c1)` from output_downsampling_stb;') + tdSql.execute('select _wstartts AS start, min(c1), max(c2), sum(c1) from downsampling_stb interval(10m);') + tdSql.execute('insert into downsampling_ct1 values (1653547828591+10m, 60, 60.3, "heilongjiang", True);') + tdSql.execute('insert into downsampling_ct1 values (1653547828591+11m, 70, 70.3, "JiLin", True);') + tdSql.execute('select * from output_downsampling_stb;') + tdSql.execute('select start, `min(c1)`, `max(c2)`, `sum(c1)` from output_downsampling_stb;') + tdSql.execute('select _wstartts AS start, min(c1), max(c2), sum(c1) from downsampling_stb interval(10m);') + tdSql.execute('insert into downsampling_ct1 values (1653547828591+21m, 70, 70.3, "JiLin", True);') + tdSql.execute('select * from output_downsampling_stb;') + tdSql.execute('select * from output_downsampling_stb;') + tdSql.execute('select start, `min(c1)`, `max(c2)`, `sum(c1)` from output_downsampling_stb;') + tdSql.execute('select _wstartts AS start, min(c1), max(c2), sum(c1) from downsampling_stb interval(10m);') + tdSql.execute('create stream abs_stream into output_abs_stb as select ts, abs(c1), abs(c2), c3 from scalar_stb;') + tdSql.query('describe output_abs_stb') + tdSql.execute('create stream acos_stream into output_acos_stb as select ts, acos(c1), acos(c2), c3 from scalar_stb;') + tdSql.query('describe output_acos_stb') + tdSql.execute('create stream asin_stream into output_asin_stb as select ts, asin(c1), asin(c2), c3 from scalar_stb;') + tdSql.query('describe output_asin_stb') + tdSql.execute('create stream atan_stream into output_atan_stb as select ts, atan(c1), atan(c2), c3 from scalar_stb;') + tdSql.query('describe output_atan_stb') + tdSql.execute('create stream ceil_stream into output_ceil_stb as select ts, ceil(c1), ceil(c2), c3 from scalar_stb;') + tdSql.query('describe output_ceil_stb') + tdSql.execute('create stream cos_stream into output_cos_stb as select ts, cos(c1), cos(c2), c3 from scalar_stb;') + tdSql.query('describe output_cos_stb') + tdSql.execute('create stream floor_stream into output_floor_stb as select ts, floor(c1), floor(c2), c3 from scalar_stb;') + tdSql.query('describe output_floor_stb') + tdSql.execute('create stream log_stream into output_log_stb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_stb;') + tdSql.query('describe output_log_stb') + tdSql.execute('create stream pow_stream into output_pow_stb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_stb;') + tdSql.query('describe output_pow_stb') + tdSql.execute('create stream round_stream into output_round_stb as select ts, round(c1), round(c2), c3 from scalar_stb;') + tdSql.query('describe output_round_stb') + tdSql.execute('create stream sin_stream into output_sin_stb as select ts, sin(c1), sin(c2), c3 from scalar_stb;') + tdSql.query('describe output_sin_stb') + tdSql.execute('create stream sqrt_stream into output_sqrt_stb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_stb;') + tdSql.query('describe output_sqrt_stb') + tdSql.execute('create stream tan_stream into output_tan_stb as select ts, tan(c1), tan(c2), c3 from scalar_stb;') + tdSql.query('describe output_tan_stb') + tdSql.execute('create stream char_length_stream into output_char_length_stb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_stb;') + tdSql.query('describe output_char_length_stb') + tdSql.execute('create stream concat_stream into output_concat_stb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_stb;') + tdSql.execute('create stream concat_ws_stream into output_concat_ws_stb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_stb;') + tdSql.execute('create stream length_stream into output_length_stb as select ts, length(c3), length(c4), length(c5) from scalar_stb;') + tdSql.query('describe output_length_stb') + tdSql.execute('create stream lower_stream into output_lower_stb as select ts, lower(c3), lower(c4), lower(c5) from scalar_stb;') + tdSql.query('describe output_lower_stb') + tdSql.execute('create stream ltrim_stream into output_ltrim_stb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_stb;') + tdSql.query('describe output_ltrim_stb') + tdSql.execute('create stream rtrim_stream into output_rtrim_stb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_stb;') + tdSql.query('describe output_rtrim_stb') + tdSql.execute('create stream substr_stream into output_substr_stb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_stb;') + tdSql.query('describe output_substr_stb') + tdSql.execute('create stream upper_stream into output_upper_stb as select ts, upper(c3), upper(c4), upper(c5) from scalar_stb;') + tdSql.query('describe output_upper_stb') + tdSql.execute('insert into scalar_ct1 values (1653560440733, 100, 100.1, "beijing", "taos", "Taos");') + tdSql.execute('insert into scalar_ct1 values (1653560440733+1s, -50, -50.1, "tianjin", "taosdata", "Taosdata");') + tdSql.execute('insert into scalar_ct1 values (1653560440733+2s, 0, Null, "hebei", "TDengine", Null);') + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase())