未验证 提交 3a904846 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #14448 from taosdata/fix/tsim

fix: drop sma
......@@ -21,7 +21,7 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
taosThreadRwlockRdlock(&pMgmt->lock);
taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode == NULL) {
if (pVnode == NULL || pVnode->dropped) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
} else {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
......@@ -81,16 +81,18 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
vmReleaseVnode(pMgmt, pVnode);
while (pVnode->refCount > 0) taosMsleep(10);
dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId);
while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
dTrace("vgId:%d, vnode-fetch queue is empty", pVnode->vgId);
vmFreeQueue(pMgmt, pVnode);
vnodeClose(pVnode->pImpl);
pVnode->pImpl = NULL;
dDebug("vgId:%d, vnode is closed", pVnode->vgId);
if (pVnode->dropped) {
......
......@@ -146,8 +146,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) {
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s", pHead->vgId, pMsg, terrstr(),
TMSG_INFO(pMsg->msgType));
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, msgtype:%s qtype:%d", pHead->vgId, pMsg,
terrstr(), TMSG_INFO(pMsg->msgType), qtype);
return terrno != 0 ? terrno : -1;
}
......
......@@ -38,6 +38,8 @@ int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
#ifdef __cplusplus
}
#endif
......
......@@ -21,6 +21,7 @@
#include "mndShow.h"
#include "mndSma.h"
#include "mndStb.h"
#include "mndStream.h"
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
......@@ -927,6 +928,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropStreamByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropSmasByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER;
......@@ -947,7 +949,6 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
mndTransSetRpcRsp(pTrans, pRsp, rspLen);
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
_OVER:
......
......@@ -523,6 +523,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
streamObj.sourceDbUid = pDb->uid;
streamObj.targetDbUid = pDb->uid;
streamObj.version = 1;
streamObj.sql = pCreate->sql;
streamObj.smaId = smaObj.uid;
......@@ -854,35 +855,25 @@ _OVER:
int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb;
SSmaObj *pSma = NULL;
void *pIter = NULL;
SVgObj *pVgroup = NULL;
int32_t code = -1;
while (1) {
SSmaObj *pSma = NULL;
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
if (pIter == NULL) break;
if (pSma->dbUid == pDb->uid) {
pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
if (pVgroup == NULL) goto _OVER;
if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER;
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
mndReleaseVgroup(pMnode, pVgroup);
pVgroup = NULL;
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) {
sdbRelease(pSdb, pSma);
sdbCancelFetch(pSdb, pSma);
return -1;
}
}
sdbRelease(pSdb, pSma);
}
code = 0;
_OVER:
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pSma);
mndReleaseVgroup(pMnode, pVgroup);
return code;
return 0;
}
static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
......
......@@ -674,27 +674,29 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SStreamObj *pStream = NULL;
while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
if (pStream->sourceDbUid != pStream->targetDbUid) {
sdbRelease(pSdb, pStream);
sdbCancelFetch(pSdb, pIter);
mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
return -1;
} else {
// TODO drop all task on snode
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pSdb, pStream);
sdbCancelFetch(pSdb, pIter);
return -1;
}
}
} else {
sdbRelease(pSdb, pStream);
continue;
}
#if 0
......
......@@ -90,7 +90,7 @@ static int32_t mndTransGetActionsSize(SArray *pArray) {
for (int32_t i = 0; i < actionNum; ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
if (pAction->actionType == TRANS_ACTION_RAW) {
rawDataLen += (sdbGetRawTotalSize(pAction->pRaw) + sizeof(int32_t));
rawDataLen += (sizeof(STransAction) + sdbGetRawTotalSize(pAction->pRaw));
} else if (pAction->actionType == TRANS_ACTION_MSG) {
rawDataLen += (sizeof(STransAction) + pAction->contLen);
} else {
......@@ -105,7 +105,7 @@ static int32_t mndTransGetActionsSize(SArray *pArray) {
static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t rawDataLen = sizeof(STrans) + TRANS_RESERVE_SIZE;
int32_t rawDataLen = sizeof(STrans) + TRANS_RESERVE_SIZE + pTrans->paramLen;
rawDataLen += mndTransGetActionsSize(pTrans->redoActions);
rawDataLen += mndTransGetActionsSize(pTrans->undoActions);
rawDataLen += mndTransGetActionsSize(pTrans->commitActions);
......@@ -226,7 +226,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
_OVER:
if (terrno != 0) {
mError("trans:%d, failed to encode to raw:%p len:%d since %s", pTrans->id, pRaw, dataPos, terrstr());
mError("trans:%d, failed to encode to raw:%p maxlen:%d len:%d since %s", pTrans->id, pRaw, sdbGetRawTotalSize(pRaw),
dataPos, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
......
......@@ -185,5 +185,7 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
.contLen = ntohl(pReq->length),
};
ASSERT(tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) == 0);
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
tqDebug("failed to put into write-queue since %s", terrstr());
}
}
......@@ -35,7 +35,7 @@ if (${BUILD_WITH_INVERTEDINDEX})
endif(${BUILD_WITH_INVERTEDINDEX})
if (${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
# if (${BUILD_TEST})
# add_subdirectory(test)
# endif(${BUILD_TEST})
......@@ -93,6 +93,7 @@
./test.sh -f tsim/stream/basic0.sim
./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/basic2.sim
./test.sh -f tsim/stream/drop_stream.sim
./test.sh -f tsim/stream/distributeInterval0.sim
# ./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
# ./test.sh -f tsim/stream/distributesession0.sim
......@@ -159,6 +160,7 @@
#./test.sh -f tsim/mnode/basic1.sim -m
# --- sma
./test.sh -f tsim/sma/drop_sma.sim
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
......
......@@ -135,7 +135,7 @@ echo "jniDebugFlag 143" >> $TAOS_CFG
echo "qDebugFlag 143" >> $TAOS_CFG
echo "rpcDebugFlag 143" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "uDebugFlag 143" >> $TAOS_CFG
echo "uDebugFlag 131" >> $TAOS_CFG
echo "sDebugFlag 143" >> $TAOS_CFG
echo "wDebugFlag 143" >> $TAOS_CFG
echo "numOfLogLines 20000000" >> $TAOS_CFG
......
......@@ -64,12 +64,59 @@ sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) in
print --> drop stb
sql drop table stb;
print ========== step5
sql drop database if exists db;
sql create database db duration 300;
sql use db;
sql create table stb1(ts timestamp, c_int int, c_bint bigint, c_sint smallint, c_tint tinyint, c_float float, c_double double, c_bool bool, c_binary binary(16), c_nchar nchar(32), c_ts timestamp, c_tint_un tinyint unsigned, c_sint_un smallint unsigned, c_int_un int unsigned, c_bint_un bigint unsigned) tags (t_int int);
sql CREATE SMA INDEX sma_index_1 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) sliding(6m) watermark 5s;
print ========== step6 repeat
sql drop database if exists db;
sql create database db duration 300;
sql use db;
sql create table stb1(ts timestamp, c_int int, c_bint bigint ) tags (t_int int);
sql CREATE SMA INDEX sma_index_1 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) sliding(6m) watermark 5s;
print ========== step7
sql drop database if exists db;
sql create database db duration 300;
sql use db;
sql create table stb1(ts timestamp, c_int int, c_bint bigint, c_sint smallint, c_tint tinyint,c_float float, c_double double, c_bool bool,c_binary binary(16), c_nchar nchar(32), c_ts timestamp,c_tint_un tinyint unsigned, c_sint_un smallint unsigned,c_int_un int unsigned, c_bint_un bigint unsigned) tags (t_int int);
sql create table ct1 using stb1 tags ( 1 );
sql create table ct2 using stb1 tags ( 2 );
sql create table ct3 using stb1 tags ( 3 );
sql create table ct4 using stb1 tags ( 4 );
sql CREATE SMA INDEX sma_index_1 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) sliding(6m) watermark 5s;
sql CREATE SMA INDEX sma_index_2 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) sliding(6m) max_delay 6m;
sql CREATE SMA INDEX sma_index_3 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) watermark 5s max_delay 6m;
sql DROP INDEX sma_index_1 ;
sql DROP INDEX sma_index_2 ;
sql DROP INDEX sma_index_3 ;
print ========== step8
sql drop database if exists db;
sql create database db duration 300;
sql use db;
sql create table stb1(ts timestamp, c_int int, c_bint bigint, c_sint smallint, c_tint tinyint,c_float float, c_double double, c_bool bool,c_binary binary(16), c_nchar nchar(32), c_ts timestamp,c_tint_un tinyint unsigned, c_sint_un smallint unsigned,c_int_un int unsigned, c_bint_un bigint unsigned) tags (t_int int);
sql create table ct1 using stb1 tags ( 1 );
sql create table ct2 using stb1 tags ( 2 );
sql create table ct3 using stb1 tags ( 3 );
sql create table ct4 using stb1 tags ( 4 );
sql CREATE SMA INDEX sma_index_1 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) sliding(6m) watermark 5s;
sql CREATE SMA INDEX sma_index_2 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) sliding(6m) max_delay 6m;
sql CREATE SMA INDEX sma_index_3 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) watermark 5s max_delay 6m;
sql DROP INDEX sma_index_1 ;
sql DROP INDEX sma_index_2 ;
sql DROP INDEX sma_index_3 ;
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode5 -s stop -x SIGINT
system sh/exec.sh -n dnode6 -s stop -x SIGINT
system sh/exec.sh -n dnode7 -s stop -x SIGINT
system sh/exec.sh -n dnode8 -s stop -x SIGINT
\ No newline at end of file
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/cfg.sh -n dnode2 -c supportVnodes -v 4
system sh/cfg.sh -n dnode3 -c supportVnodes -v 4
print ========== step1
system sh/exec.sh -n dnode1 -s start
sql connect
print ========== step2
sql create dnode $hostname port 7200
system sh/exec.sh -n dnode2 -s start
$x = 0
step2:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[4] != ready then
goto step2
endi
if $data(2)[4] != ready then
goto step2
endi
print ========== step3
sql drop database if exists test;
sql create database if not exists test vgroups 1 precision "ms" ;
sql use test;
sql create table test.scalar_function_stb (ts timestamp, c1 tinyint, c2 smallint, c3 int, c4 bigint, c5 tinyint unsigned, c6 smallint unsigned, c7 int unsigned, c8 bigint unsigned, c9 float, c10 double, c11 binary(256), c12 nchar(256), c13 bool) tags (t1 tinyint, t2 smallint, t3 int, t4 bigint, t5 tinyint unsigned, t6 smallint unsigned, t7 int unsigned, t8 bigint unsigned, t9 float, t10 double, t11 binary(256), t12 nchar(256), t13 bool) ;
sql create table scalar_function_ct1 using scalar_function_stb tags (-38, -32456, 509722288, -1404014954778348330, 87, 8879, 3351927345, 1840080781675115605, 3.002364316200592e+38, 6.698140580387119e+37, "bktezshfyvmrmgzwrwerytfwudlblkyyxismpommiqpqsptpiucptwqutzhajxbiitqxkrpobqhgqvjlvgsudewmelpunjspurbpbbwypvgbwjfrwidrchnojtxyhrwfjwgdiabzfoujxkwcjjxjqsrnhmryjhrykldmdfiwircdfahldtrtuafzvybkihyjatiqivbtpydjtmbfddcgyzjuqidwcchtsamnwyqwvajftayyvfrmqcqygbxmxgjx", "ddlxkxhrvviwnjeqhewbercnlontwbsyevcjsocrwyupautsjkdzqbwuzsuetptgsdfyjzfkqyobkysikpaxtqqonxtocfowaehgovshwyciyzfmdmcmwaolkhdunfhwhcanetepxyppuullxnclockmadyaaufywllwburgsfxizcjgzvboydpqymlwgktslclidbcwiyyubyuvhjgwldkgxswigjkpbpslvlsbigdlmuldmtbqencbntbaohxr", False) ;
sql create table test.scalar_function_tb1 (ts timestamp, c1 tinyint, c2 smallint, c3 int, c4 bigint, c5 tinyint unsigned, c6 smallint unsigned, c7 int unsigned, c8 bigint unsigned, c9 float, c10 double, c11 binary(256), c12 nchar(256), c13 bool) ;
sql create table if not exists scalar_stb (ts timestamp, c1 int, c2 double, c3 binary(20), c4 binary(20), c5 nchar(20)) tags (t1 int);
sql create table scalar_ct1 using scalar_stb tags(10);
sql create table if not exists scalar_tb (ts timestamp, c1 int, c2 double, c3 binary(20), c4 binary(20), c5 nchar(20));
sql create stream stb_abs_stream trigger at_once into output_abs_stb as select ts, abs(c1), abs(c2), c3 from scalar_stb;
sql create stream ctb_abs_stream trigger at_once into output_abs_ctb as select ts, abs(c1), abs(c2), c3 from scalar_ct1;
sql create stream tb_abs_stream trigger at_once into output_abs_tb as select ts, abs(c1), abs(c2), c3 from scalar_tb;
sql create stream stb_acos_stream trigger at_once into output_acos_stb as select ts, acos(c1), acos(c2), c3 from scalar_stb;
sql create stream ctb_acos_stream trigger at_once into output_acos_ctb as select ts, acos(c1), acos(c2), c3 from scalar_ct1;
sql create stream tb_acos_stream trigger at_once into output_acos_tb as select ts, acos(c1), acos(c2), c3 from scalar_tb;
sql create stream stb_asin_stream trigger at_once into output_asin_stb as select ts, asin(c1), asin(c2), c3 from scalar_stb;
sql create stream ctb_asin_stream trigger at_once into output_asin_ctb as select ts, asin(c1), asin(c2), c3 from scalar_ct1;
sql create stream tb_asin_stream trigger at_once into output_asin_tb as select ts, asin(c1), asin(c2), c3 from scalar_tb;
sql create stream stb_atan_stream trigger at_once into output_atan_stb as select ts, atan(c1), atan(c2), c3 from scalar_stb;
sql create stream ctb_atan_stream trigger at_once into output_atan_ctb as select ts, atan(c1), atan(c2), c3 from scalar_ct1;
sql create stream tb_atan_stream trigger at_once into output_atan_tb as select ts, atan(c1), atan(c2), c3 from scalar_tb;
sql create stream stb_ceil_stream trigger at_once into output_ceil_stb as select ts, ceil(c1), ceil(c2), c3 from scalar_stb;
sql create stream ctb_ceil_stream trigger at_once into output_ceil_ctb as select ts, ceil(c1), ceil(c2), c3 from scalar_ct1;
sql create stream tb_ceil_stream trigger at_once into output_ceil_tb as select ts, ceil(c1), ceil(c2), c3 from scalar_tb;
sql create stream stb_cos_stream trigger at_once into output_cos_stb as select ts, cos(c1), cos(c2), c3 from scalar_stb;
sql create stream ctb_cos_stream trigger at_once into output_cos_ctb as select ts, cos(c1), cos(c2), c3 from scalar_ct1;
sql create stream tb_cos_stream trigger at_once into output_cos_tb as select ts, cos(c1), cos(c2), c3 from scalar_tb;
sql create stream stb_floor_stream trigger at_once into output_floor_stb as select ts, floor(c1), floor(c2), c3 from scalar_stb;
sql create stream ctb_floor_stream trigger at_once into output_floor_ctb as select ts, floor(c1), floor(c2), c3 from scalar_ct1;
sql create stream tb_floor_stream trigger at_once into output_floor_tb as select ts, floor(c1), floor(c2), c3 from scalar_tb;
sql create stream stb_log_stream trigger at_once into output_log_stb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_stb;
sql create stream ctb_log_stream trigger at_once into output_log_ctb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_ct1;
sql create stream tb_log_stream trigger at_once into output_log_tb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_tb;
sql create stream stb_pow_stream trigger at_once into output_pow_stb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_stb;
sql create stream ctb_pow_stream trigger at_once into output_pow_ctb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_ct1;
sql create stream tb_pow_stream trigger at_once into output_pow_tb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_tb;
sql create stream stb_round_stream trigger at_once into output_round_stb as select ts, round(c1), round(c2), c3 from scalar_stb;
sql create stream ctb_round_stream trigger at_once into output_round_ctb as select ts, round(c1), round(c2), c3 from scalar_ct1;
sql create stream tb_round_stream trigger at_once into output_round_tb as select ts, round(c1), round(c2), c3 from scalar_tb;
sql create stream stb_sin_stream trigger at_once into output_sin_stb as select ts, sin(c1), sin(c2), c3 from scalar_stb;
sql create stream ctb_sin_stream trigger at_once into output_sin_ctb as select ts, sin(c1), sin(c2), c3 from scalar_ct1;
sql create stream tb_sin_stream trigger at_once into output_sin_tb as select ts, sin(c1), sin(c2), c3 from scalar_tb;
sql create stream stb_sqrt_stream trigger at_once into output_sqrt_stb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_stb;
sql create stream ctb_sqrt_stream trigger at_once into output_sqrt_ctb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_ct1;
sql create stream tb_sqrt_stream trigger at_once into output_sqrt_tb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_tb;
sql create stream stb_tan_stream trigger at_once into output_tan_stb as select ts, tan(c1), tan(c2), c3 from scalar_stb;
sql create stream ctb_tan_stream trigger at_once into output_tan_ctb as select ts, tan(c1), tan(c2), c3 from scalar_ct1;
sql create stream tb_tan_stream trigger at_once into output_tan_tb as select ts, tan(c1), tan(c2), c3 from scalar_tb;
sql create stream stb_char_length_stream into output_char_length_stb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_stb;
sql create stream ctb_char_length_stream into output_char_length_ctb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_ct1;
sql create stream tb_char_length_stream into output_char_length_tb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_tb;
sql create stream stb_concat_stream into output_concat_stb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_stb;
sql create stream ctb_concat_stream into output_concat_ctb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_ct1;
sql create stream tb_concat_stream into output_concat_tb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_tb;
sql create stream stb_concat_ws_stream into output_concat_ws_stb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_stb;
sql create stream ctb_concat_ws_stream into output_concat_ws_ctb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_ct1;
sql create stream tb_concat_ws_stream into output_concat_ws_tb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_tb;
sql create stream stb_length_stream into output_length_stb as select ts, length(c3), length(c4), length(c5) from scalar_stb;
sql create stream ctb_length_stream into output_length_ctb as select ts, length(c3), length(c4), length(c5) from scalar_ct1;
sql create stream tb_length_stream into output_length_tb as select ts, length(c3), length(c4), length(c5) from scalar_tb;
sql create stream stb_lower_stream into output_lower_stb as select ts, lower(c3), lower(c4), lower(c5) from scalar_stb;
sql create stream ctb_lower_stream into output_lower_ctb as select ts, lower(c3), lower(c4), lower(c5) from scalar_ct1;
sql create stream tb_lower_stream into output_lower_tb as select ts, lower(c3), lower(c4), lower(c5) from scalar_tb;
sql create stream stb_ltrim_stream into output_ltrim_stb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_stb;
sql create stream ctb_ltrim_stream into output_ltrim_ctb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_ct1;
sql create stream tb_ltrim_stream into output_ltrim_tb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_tb;
sql create stream stb_rtrim_stream into output_rtrim_stb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_stb;
sql create stream ctb_rtrim_stream into output_rtrim_ctb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_ct1;
sql create stream tb_rtrim_stream into output_rtrim_tb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_tb;
sql create stream stb_substr_stream into output_substr_stb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_stb;
sql create stream ctb_substr_stream into output_substr_ctb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_ct1;
sql create stream tb_substr_stream into output_substr_tb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_tb;
sql create stream stb_upper_stream into output_upper_stb as select ts, upper(c3), upper(c4), upper(c5) from scalar_stb;
sql create stream ctb_upper_stream into output_upper_ctb as select ts, upper(c3), upper(c4), upper(c5) from scalar_ct1;
sql create stream tb_upper_stream into output_upper_tb as select ts, upper(c3), upper(c4), upper(c5) from scalar_tb;
sql insert into scalar_ct1 values (1656668180503, 100, 100.1, "beijing", "taos", "Taos");
sql insert into scalar_ct1 values (1656668180503+1s, -50, -50.1, "tianjin", "taosdata", "Taosdata");
sql insert into scalar_ct1 values (1656668180503+2s, 0, Null, "hebei", "TDengine", Null);
sql insert into scalar_tb values (1656668180503, 100, 100.1, "beijing", "taos", "Taos");
sql insert into scalar_tb values (1656668180503+1s, -50, -50.1, "tianjin", "taosdata", "Taosdata");
sql insert into scalar_tb values (1656668180503+2s, 0, Null, "hebei", "TDengine", Null);
sql insert into scalar_ct1 values (1656668180503+1s, -50, 50.1, "beiJing", "TDengine", "taos");
sql insert into scalar_tb values (1656668180503+1s, -50, 50.1, "beiJing", "TDengine", "taos");
sql insert into scalar_ct1 values (1656668180503+1s, -50, 50.1, "beiJing", "TDengine", "taos");
sql insert into scalar_tb values (1656668180503+1s, -50, 50.1, "beiJing", "TDengine", "taos");
print ========== step4
sql drop database test;
print ========== step5 repeat
sql drop database if exists test;
sql create database if not exists test vgroups 1 precision "ms" ;
sql use test;
sql create table test.scalar_function_stb (ts timestamp, c1 tinyint, c2 smallint, c3 int, c4 bigint, c5 tinyint unsigned, c6 smallint unsigned, c7 int unsigned, c8 bigint unsigned, c9 float, c10 double, c11 binary(256), c12 nchar(256), c13 bool) tags (t1 tinyint, t2 smallint, t3 int, t4 bigint, t5 tinyint unsigned, t6 smallint unsigned, t7 int unsigned, t8 bigint unsigned, t9 float, t10 double, t11 binary(256), t12 nchar(256), t13 bool) ;
sql create table scalar_function_ct1 using scalar_function_stb tags (-38, -32456, 509722288, -1404014954778348330, 87, 8879, 3351927345, 1840080781675115605, 3.002364316200592e+38, 6.698140580387119e+37, "bktezshfyvmrmgzwrwerytfwudlblkyyxismpommiqpqsptpiucptwqutzhajxbiitqxkrpobqhgqvjlvgsudewmelpunjspurbpbbwypvgbwjfrwidrchnojtxyhrwfjwgdiabzfoujxkwcjjxjqsrnhmryjhrykldmdfiwircdfahldtrtuafzvybkihyjatiqivbtpydjtmbfddcgyzjuqidwcchtsamnwyqwvajftayyvfrmqcqygbxmxgjx", "ddlxkxhrvviwnjeqhewbercnlontwbsyevcjsocrwyupautsjkdzqbwuzsuetptgsdfyjzfkqyobkysikpaxtqqonxtocfowaehgovshwyciyzfmdmcmwaolkhdunfhwhcanetepxyppuullxnclockmadyaaufywllwburgsfxizcjgzvboydpqymlwgktslclidbcwiyyubyuvhjgwldkgxswigjkpbpslvlsbigdlmuldmtbqencbntbaohxr", False) ;
sql create table test.scalar_function_tb1 (ts timestamp, c1 tinyint, c2 smallint, c3 int, c4 bigint, c5 tinyint unsigned, c6 smallint unsigned, c7 int unsigned, c8 bigint unsigned, c9 float, c10 double, c11 binary(256), c12 nchar(256), c13 bool) ;
sql create table if not exists scalar_stb (ts timestamp, c1 int, c2 double, c3 binary(20), c4 binary(20), c5 nchar(20)) tags (t1 int);
sql create table scalar_ct1 using scalar_stb tags(10);
sql create table if not exists scalar_tb (ts timestamp, c1 int, c2 double, c3 binary(20), c4 binary(20), c5 nchar(20));
sql create stream stb_abs_stream trigger at_once into output_abs_stb as select ts, abs(c1), abs(c2), c3 from scalar_stb;
sql create stream ctb_abs_stream trigger at_once into output_abs_ctb as select ts, abs(c1), abs(c2), c3 from scalar_ct1;
sql create stream tb_abs_stream trigger at_once into output_abs_tb as select ts, abs(c1), abs(c2), c3 from scalar_tb;
sql create stream stb_acos_stream trigger at_once into output_acos_stb as select ts, acos(c1), acos(c2), c3 from scalar_stb;
sql create stream ctb_acos_stream trigger at_once into output_acos_ctb as select ts, acos(c1), acos(c2), c3 from scalar_ct1;
sql create stream tb_acos_stream trigger at_once into output_acos_tb as select ts, acos(c1), acos(c2), c3 from scalar_tb;
sql create stream stb_asin_stream trigger at_once into output_asin_stb as select ts, asin(c1), asin(c2), c3 from scalar_stb;
sql create stream ctb_asin_stream trigger at_once into output_asin_ctb as select ts, asin(c1), asin(c2), c3 from scalar_ct1;
sql create stream tb_asin_stream trigger at_once into output_asin_tb as select ts, asin(c1), asin(c2), c3 from scalar_tb;
sql create stream stb_atan_stream trigger at_once into output_atan_stb as select ts, atan(c1), atan(c2), c3 from scalar_stb;
sql create stream ctb_atan_stream trigger at_once into output_atan_ctb as select ts, atan(c1), atan(c2), c3 from scalar_ct1;
sql create stream tb_atan_stream trigger at_once into output_atan_tb as select ts, atan(c1), atan(c2), c3 from scalar_tb;
sql create stream stb_ceil_stream trigger at_once into output_ceil_stb as select ts, ceil(c1), ceil(c2), c3 from scalar_stb;
sql create stream ctb_ceil_stream trigger at_once into output_ceil_ctb as select ts, ceil(c1), ceil(c2), c3 from scalar_ct1;
sql create stream tb_ceil_stream trigger at_once into output_ceil_tb as select ts, ceil(c1), ceil(c2), c3 from scalar_tb;
sql create stream stb_cos_stream trigger at_once into output_cos_stb as select ts, cos(c1), cos(c2), c3 from scalar_stb;
sql create stream ctb_cos_stream trigger at_once into output_cos_ctb as select ts, cos(c1), cos(c2), c3 from scalar_ct1;
sql create stream tb_cos_stream trigger at_once into output_cos_tb as select ts, cos(c1), cos(c2), c3 from scalar_tb;
sql create stream stb_floor_stream trigger at_once into output_floor_stb as select ts, floor(c1), floor(c2), c3 from scalar_stb;
sql create stream ctb_floor_stream trigger at_once into output_floor_ctb as select ts, floor(c1), floor(c2), c3 from scalar_ct1;
sql create stream tb_floor_stream trigger at_once into output_floor_tb as select ts, floor(c1), floor(c2), c3 from scalar_tb;
sql create stream stb_log_stream trigger at_once into output_log_stb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_stb;
sql create stream ctb_log_stream trigger at_once into output_log_ctb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_ct1;
sql create stream tb_log_stream trigger at_once into output_log_tb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_tb;
sql create stream stb_pow_stream trigger at_once into output_pow_stb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_stb;
sql create stream ctb_pow_stream trigger at_once into output_pow_ctb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_ct1;
sql create stream tb_pow_stream trigger at_once into output_pow_tb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_tb;
sql create stream stb_round_stream trigger at_once into output_round_stb as select ts, round(c1), round(c2), c3 from scalar_stb;
sql create stream ctb_round_stream trigger at_once into output_round_ctb as select ts, round(c1), round(c2), c3 from scalar_ct1;
sql create stream tb_round_stream trigger at_once into output_round_tb as select ts, round(c1), round(c2), c3 from scalar_tb;
sql create stream stb_sin_stream trigger at_once into output_sin_stb as select ts, sin(c1), sin(c2), c3 from scalar_stb;
sql create stream ctb_sin_stream trigger at_once into output_sin_ctb as select ts, sin(c1), sin(c2), c3 from scalar_ct1;
sql create stream tb_sin_stream trigger at_once into output_sin_tb as select ts, sin(c1), sin(c2), c3 from scalar_tb;
sql create stream stb_sqrt_stream trigger at_once into output_sqrt_stb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_stb;
sql create stream ctb_sqrt_stream trigger at_once into output_sqrt_ctb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_ct1;
sql create stream tb_sqrt_stream trigger at_once into output_sqrt_tb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_tb;
sql create stream stb_tan_stream trigger at_once into output_tan_stb as select ts, tan(c1), tan(c2), c3 from scalar_stb;
sql create stream ctb_tan_stream trigger at_once into output_tan_ctb as select ts, tan(c1), tan(c2), c3 from scalar_ct1;
sql create stream tb_tan_stream trigger at_once into output_tan_tb as select ts, tan(c1), tan(c2), c3 from scalar_tb;
sql create stream stb_char_length_stream into output_char_length_stb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_stb;
sql create stream ctb_char_length_stream into output_char_length_ctb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_ct1;
sql create stream tb_char_length_stream into output_char_length_tb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_tb;
sql create stream stb_concat_stream into output_concat_stb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_stb;
sql create stream ctb_concat_stream into output_concat_ctb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_ct1;
sql create stream tb_concat_stream into output_concat_tb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_tb;
sql create stream stb_concat_ws_stream into output_concat_ws_stb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_stb;
sql create stream ctb_concat_ws_stream into output_concat_ws_ctb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_ct1;
sql create stream tb_concat_ws_stream into output_concat_ws_tb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_tb;
sql create stream stb_length_stream into output_length_stb as select ts, length(c3), length(c4), length(c5) from scalar_stb;
sql create stream ctb_length_stream into output_length_ctb as select ts, length(c3), length(c4), length(c5) from scalar_ct1;
sql create stream tb_length_stream into output_length_tb as select ts, length(c3), length(c4), length(c5) from scalar_tb;
sql create stream stb_lower_stream into output_lower_stb as select ts, lower(c3), lower(c4), lower(c5) from scalar_stb;
sql create stream ctb_lower_stream into output_lower_ctb as select ts, lower(c3), lower(c4), lower(c5) from scalar_ct1;
sql create stream tb_lower_stream into output_lower_tb as select ts, lower(c3), lower(c4), lower(c5) from scalar_tb;
sql create stream stb_ltrim_stream into output_ltrim_stb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_stb;
sql create stream ctb_ltrim_stream into output_ltrim_ctb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_ct1;
sql create stream tb_ltrim_stream into output_ltrim_tb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_tb;
sql create stream stb_rtrim_stream into output_rtrim_stb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_stb;
sql create stream ctb_rtrim_stream into output_rtrim_ctb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_ct1;
sql create stream tb_rtrim_stream into output_rtrim_tb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_tb;
sql create stream stb_substr_stream into output_substr_stb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_stb;
sql create stream ctb_substr_stream into output_substr_ctb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_ct1;
sql create stream tb_substr_stream into output_substr_tb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_tb;
sql create stream stb_upper_stream into output_upper_stb as select ts, upper(c3), upper(c4), upper(c5) from scalar_stb;
sql create stream ctb_upper_stream into output_upper_ctb as select ts, upper(c3), upper(c4), upper(c5) from scalar_ct1;
sql create stream tb_upper_stream into output_upper_tb as select ts, upper(c3), upper(c4), upper(c5) from scalar_tb;
sql insert into scalar_ct1 values (1656668180503, 100, 100.1, "beijing", "taos", "Taos");
sql insert into scalar_ct1 values (1656668180503+1s, -50, -50.1, "tianjin", "taosdata", "Taosdata");
sql insert into scalar_ct1 values (1656668180503+2s, 0, Null, "hebei", "TDengine", Null);
sql insert into scalar_tb values (1656668180503, 100, 100.1, "beijing", "taos", "Taos");
sql insert into scalar_tb values (1656668180503+1s, -50, -50.1, "tianjin", "taosdata", "Taosdata");
sql insert into scalar_tb values (1656668180503+2s, 0, Null, "hebei", "TDengine", Null);
sql insert into scalar_ct1 values (1656668180503+1s, -50, 50.1, "beiJing", "TDengine", "taos");
sql insert into scalar_tb values (1656668180503+1s, -50, 50.1, "beiJing", "TDengine", "taos");
sql insert into scalar_ct1 values (1656668180503+1s, -50, 50.1, "beiJing", "TDengine", "taos");
sql insert into scalar_tb values (1656668180503+1s, -50, 50.1, "beiJing", "TDengine", "taos");
print ========== step6 repeat
sql drop database test;
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册