Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
6a65afb3
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
6a65afb3
编写于
5月 19, 2022
作者:
H
Hui Li
提交者:
GitHub
5月 19, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12711 from taosdata/test/udf_test
test : add test case for udf create and basic query for all
上级
2288428d
10101cb8
变更
2
显示空白变更内容
内联
并排
Showing
2 changed file
with
481 addition
and
23 deletion
+481
-23
tests/system-test/0-others/udfTest.py
tests/system-test/0-others/udfTest.py
+143
-23
tests/system-test/0-others/udf_cluster.py
tests/system-test/0-others/udf_cluster.py
+338
-0
未找到文件。
tests/system-test/0-others/udfTest.py
浏览文件 @
6a65afb3
from
distutils.log
import
error
import
taos
import
sys
import
time
...
...
@@ -43,12 +44,14 @@ class TDTestCase:
libudf1
=
subprocess
.
Popen
(
'find %s -name "libudf1.so"|grep lib|head -n1'
%
projPath
,
shell
=
True
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
STDOUT
).
stdout
.
read
().
decode
(
"utf-8"
)
libudf2
=
subprocess
.
Popen
(
'find %s -name "libudf2.so"|grep lib|head -n1'
%
projPath
,
shell
=
True
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
STDOUT
).
stdout
.
read
().
decode
(
"utf-8"
)
os
.
system
(
"mkdir /tmp/udf/"
)
os
.
system
(
"
sudo
cp %s /tmp/udf/ "
%
libudf1
.
replace
(
"
\n
"
,
""
))
os
.
system
(
"
sudo
cp %s /tmp/udf/ "
%
libudf2
.
replace
(
"
\n
"
,
""
))
os
.
system
(
"cp %s /tmp/udf/ "
%
libudf1
.
replace
(
"
\n
"
,
""
))
os
.
system
(
"cp %s /tmp/udf/ "
%
libudf2
.
replace
(
"
\n
"
,
""
))
def
prepare_data
(
self
):
tdSql
.
execute
(
"drop database if exists db "
)
tdSql
.
execute
(
"create database if not exists db days 300"
)
tdSql
.
execute
(
"use db"
)
tdSql
.
execute
(
'''create table stb1
...
...
@@ -117,6 +120,17 @@ class TDTestCase:
'''
)
# udf functions with join
ts_start
=
1652517451000
tdSql
.
execute
(
"create stable st (ts timestamp , c1 int , c2 int ,c3 double ,c4 double ) tags(ind int)"
)
tdSql
.
execute
(
"create table sub1 using st tags(1)"
)
tdSql
.
execute
(
"create table sub2 using st tags(2)"
)
for
i
in
range
(
10
):
ts
=
ts_start
+
i
*
1000
tdSql
.
execute
(
" insert into sub1 values({} , {},{},{},{})"
.
format
(
ts
,
i
,
i
*
10
,
i
*
100.0
,
i
*
1000.0
))
tdSql
.
execute
(
" insert into sub2 values({} , {},{},{},{})"
.
format
(
ts
,
i
,
i
*
10
,
i
*
100.0
,
i
*
1000.0
))
def
create_udf_function
(
self
):
...
...
@@ -379,17 +393,6 @@ class TDTestCase:
tdSql
.
checkData
(
0
,
2
,
-
99.990000000
)
tdSql
.
checkData
(
0
,
3
,
88
)
# udf functions with join
ts_start
=
1652517451000
tdSql
.
execute
(
"create stable st (ts timestamp , c1 int , c2 int ,c3 double ,c4 double ) tags(ind int)"
)
tdSql
.
execute
(
"create table sub1 using st tags(1)"
)
tdSql
.
execute
(
"create table sub2 using st tags(2)"
)
for
i
in
range
(
10
):
ts
=
ts_start
+
i
*
1000
tdSql
.
execute
(
" insert into sub1 values({} , {},{},{},{})"
.
format
(
ts
,
i
,
i
*
10
,
i
*
100.0
,
i
*
1000.0
))
tdSql
.
execute
(
" insert into sub2 values({} , {},{},{},{})"
.
format
(
ts
,
i
,
i
*
10
,
i
*
100.0
,
i
*
1000.0
))
tdSql
.
query
(
"select sub1.c1, sub2.c2 from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null"
)
tdSql
.
checkData
(
0
,
0
,
0
)
tdSql
.
checkData
(
0
,
1
,
0
)
...
...
@@ -468,10 +471,103 @@ class TDTestCase:
tdSql
.
checkData
(
0
,
0
,
169.661427555
)
tdSql
.
checkData
(
0
,
1
,
169.661427555
)
def
try_query_sql
(
self
):
udf1_sqls
=
[
"select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb"
,
"select c1 , udf1(c1) ,c2 ,udf1(c2), c3 ,udf1(c3), c4 ,udf1(c4) from stb1 order by c1"
,
"select udf1(num1) , max(num1) from tb;"
,
"select udf1(num1) , min(num1) from tb;"
,
"select udf1(num1) , top(num1,1) from tb;"
,
"select udf1(num1) , bottom(num1,1) from tb;"
,
"select udf1(c1) , max(c1) from stb1;"
,
"select udf1(c1) , min(c1) from stb1;"
,
"select udf1(c1) , top(c1 ,1) from stb1;"
,
"select udf1(c1) , bottom(c1,1) from stb1;"
,
"select udf1(num1) , abs(num1) from tb;"
,
"select udf1(num1) , csum(num1) from tb;"
,
"select udf1(c1) , csum(c1) from stb1;"
,
"select udf1(c1) , abs(c1) from stb1;"
,
"select abs(udf1(c1)) , abs(ceil(c1)) from stb1 order by ts;"
,
"select abs(udf1(c1)) , abs(ceil(c1)) from ct1 order by ts;"
,
"select abs(udf1(c1)) , abs(ceil(c1)) from stb1 where c1 is null order by ts;"
,
"select c1 ,udf1(c1) , c6 ,udf1(c6) from stb1 where c1 > 8 order by ts"
,
"select udf1(sub1.c1), udf1(sub2.c2) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null"
,
"select sub1.c1 , udf1(sub1.c1), sub2.c2 ,udf1(sub2.c2) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null"
,
"select udf1(c1) from ct1 group by c1"
,
"select udf1(c1) from stb1 group by c1"
,
"select c1,c2, udf1(c1,c2) from ct1 group by c1,c2"
,
"select c1,c2, udf1(c1,c2) from stb1 group by c1,c2"
,
"select num1,num2,num3,udf1(num1,num2,num3) from tb"
,
"select c1,c6,udf1(c1,c6) from stb1 order by ts"
,
"select abs(udf1(c1,c6,c1,c6)) , abs(ceil(c1)) from stb1 where c1 is not null order by ts;"
]
udf2_sqls
=
[
"select udf2(sub1.c1), udf2(sub2.c2) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null"
,
"select udf2(c1) from stb1 group by 1-udf1(c1)"
,
"select udf2(num1) ,udf2(num2), udf2(num3) from tb"
,
"select udf2(num1)+100 ,udf2(num2)-100, udf2(num3)*100 ,udf2(num3)/100 from tb"
,
"select udf2(c1) ,udf2(c6) from stb1 "
,
"select udf2(c1)+100 ,udf2(c6)-100 ,udf2(c1)*100 ,udf2(c6)/100 from stb1 "
,
"select udf2(c1+100) ,udf2(c6-100) ,udf2(c1*100) ,udf2(c6/100) from ct1"
,
"select udf2(c1+100) ,udf2(c6-100) ,udf2(c1*100) ,udf2(c6/100) from stb1 "
,
"select udf2(c1) from ct1 group by c1"
,
"select udf2(c1) from stb1 group by c1"
,
"select c1,c2, udf2(c1,c6) from ct1 group by c1,c2"
,
"select c1,c2, udf2(c1,c6) from stb1 group by c1,c2"
,
"select udf2(c1) from stb1 group by udf1(c1)"
,
"select udf2(c1) from stb1 group by floor(c1)"
,
"select udf2(c1) from stb1 group by floor(c1) order by udf2(c1)"
,
"select udf2(sub1.c1 ,sub1.c2), udf2(sub2.c2 ,sub2.c1) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null"
,
"select udf2(sub1.c1 ,sub1.c2), udf2(sub2.c2 ,sub2.c1) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null"
,
"select udf2(sub1.c1 ,sub1.c2), udf2(sub2.c2 ,sub2.c1) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null"
,
"select udf2(sub1.c1 ,sub1.c2), udf2(sub2.c2 ,sub2.c1) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null"
]
return
udf1_sqls
,
udf2_sqls
def
unexpected_create
(
self
):
tdSql
.
query
(
"select udf2(sub1.c1 ,sub1.c2), udf2(sub2.c2 ,sub2.c1) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null"
)
tdLog
.
info
(
" create function with out bufsize "
)
tdSql
.
query
(
"drop function udf1 "
)
tdSql
.
query
(
"drop function udf2 "
)
# create function without buffer
tdSql
.
execute
(
"create function udf1 as '/tmp/udf/libudf1.so' outputtype int"
)
tdSql
.
execute
(
"create aggregate function udf2 as '/tmp/udf/libudf2.so' outputtype double"
)
udf1_sqls
,
udf2_sqls
=
self
.
try_query_sql
()
for
scalar_sql
in
udf1_sqls
:
tdSql
.
query
(
scalar_sql
)
for
aggregate_sql
in
udf2_sqls
:
tdSql
.
error
(
aggregate_sql
)
# create function without aggregate
tdLog
.
info
(
" create function with out aggregate "
)
tdSql
.
query
(
"drop function udf1 "
)
tdSql
.
query
(
"drop function udf2 "
)
# create function without buffer
tdSql
.
execute
(
"create aggregate function udf1 as '/tmp/udf/libudf1.so' outputtype int bufSize 8 "
)
tdSql
.
execute
(
"create function udf2 as '/tmp/udf/libudf2.so' outputtype double bufSize 8"
)
udf1_sqls
,
udf2_sqls
=
self
.
try_query_sql
()
for
scalar_sql
in
udf1_sqls
:
tdSql
.
error
(
scalar_sql
)
for
aggregate_sql
in
udf2_sqls
:
tdSql
.
error
(
aggregate_sql
)
tdSql
.
execute
(
" create function db as '/tmp/udf/libudf1.so' outputtype int bufSize 8 "
)
tdSql
.
execute
(
" create aggregate function test as '/tmp/udf/libudf1.so' outputtype int bufSize 8 "
)
tdSql
.
error
(
" select db(c1) from stb1 "
)
tdSql
.
error
(
" select db(c1,c6), db(c6) from stb1 "
)
tdSql
.
error
(
" select db(num1,num2), db(num1) from tb "
)
tdSql
.
error
(
" select test(c1) from stb1 "
)
tdSql
.
error
(
" select test(c1,c6), test(c6) from stb1 "
)
tdSql
.
error
(
" select test(num1,num2), test(num1) from tb "
)
def
loop_kill_udfd
(
self
):
...
...
@@ -484,7 +580,7 @@ class TDTestCase:
cfgPath
=
buildPath
+
"/../sim/dnode1/cfg"
udfdPath
=
buildPath
+
'/build/bin/udfd'
for
i
in
range
(
5
):
for
i
in
range
(
3
):
tdLog
.
info
(
" loop restart udfd %d_th"
%
i
)
...
...
@@ -492,7 +588,7 @@ class TDTestCase:
tdSql
.
checkData
(
0
,
0
,
169.661427555
)
tdSql
.
checkData
(
0
,
1
,
169.661427555
)
# stop udfd cmds
get_processID
=
"ps -ef | grep -w udfd | grep
'root' | grep
-v grep| grep -v defunct | awk '{print $2}'"
get_processID
=
"ps -ef | grep -w udfd | grep -v grep| grep -v defunct | awk '{print $2}'"
processID
=
subprocess
.
check_output
(
get_processID
,
shell
=
True
).
decode
(
"utf-8"
)
stop_udfd
=
" kill -9 %s"
%
processID
os
.
system
(
stop_udfd
)
...
...
@@ -507,11 +603,27 @@ class TDTestCase:
# start_udfd = "nohup " + udfdPath +'-c' +cfgPath +" > /dev/null 2>&1 &"
# tdLog.info("start udfd : %s " % start_udfd)
def
test_function_name
(
self
):
tdLog
.
info
(
" create function name is not build_in functions "
)
tdSql
.
execute
(
" drop function udf1 "
)
tdSql
.
execute
(
" drop function udf2 "
)
tdSql
.
error
(
"create function max as '/tmp/udf/libudf1.so' outputtype int bufSize 8"
)
tdSql
.
error
(
"create aggregate function sum as '/tmp/udf/libudf2.so' outputtype double bufSize 8"
)
tdSql
.
error
(
"create function max as '/tmp/udf/libudf1.so' outputtype int bufSize 8"
)
tdSql
.
error
(
"create aggregate function sum as '/tmp/udf/libudf2.so' outputtype double bufSize 8"
)
tdSql
.
error
(
"create aggregate function tbname as '/tmp/udf/libudf2.so' outputtype double bufSize 8"
)
tdSql
.
error
(
"create aggregate function function as '/tmp/udf/libudf2.so' outputtype double bufSize 8"
)
tdSql
.
error
(
"create aggregate function stable as '/tmp/udf/libudf2.so' outputtype double bufSize 8"
)
tdSql
.
error
(
"create aggregate function union as '/tmp/udf/libudf2.so' outputtype double bufSize 8"
)
tdSql
.
error
(
"create aggregate function 123 as '/tmp/udf/libudf2.so' outputtype double bufSize 8"
)
tdSql
.
error
(
"create aggregate function 123db as '/tmp/udf/libudf2.so' outputtype double bufSize 8"
)
tdSql
.
error
(
"create aggregate function mnode as '/tmp/udf/libudf2.so' outputtype double bufSize 8"
)
def
restart_taosd_query_udf
(
self
):
self
.
create_udf_function
()
for
i
in
range
(
5
):
time
.
sleep
(
5
)
tdLog
.
info
(
" this is %d_th restart taosd "
%
i
)
tdSql
.
execute
(
"use db "
)
tdSql
.
query
(
"select count(*) from stb1"
)
...
...
@@ -520,21 +632,29 @@ class TDTestCase:
tdSql
.
checkData
(
0
,
0
,
169.661427555
)
tdSql
.
checkData
(
0
,
1
,
169.661427555
)
tdDnodes
.
stop
(
1
)
time
.
sleep
(
2
)
tdDnodes
.
start
(
1
)
time
.
sleep
(
5
)
time
.
sleep
(
2
)
def
run
(
self
):
# sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql
.
prepare
()
print
(
" env is ok for all "
)
self
.
prepare_udf_so
()
self
.
prepare_data
()
self
.
create_udf_function
()
self
.
basic_udf_query
()
self
.
loop_kill_udfd
()
# self.restart_taosd_query_udf()
self
.
unexpected_create
()
tdSql
.
execute
(
" drop function udf1 "
)
tdSql
.
execute
(
" drop function udf2 "
)
self
.
create_udf_function
()
time
.
sleep
(
2
)
self
.
basic_udf_query
()
self
.
test_function_name
()
self
.
restart_taosd_query_udf
()
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/system-test/0-others/udf_cluster.py
0 → 100644
浏览文件 @
6a65afb3
import
taos
import
sys
import
time
import
os
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
TDDnodes
from
util.dnodes
import
TDDnode
import
socket
import
subprocess
class
MyDnodes
(
TDDnodes
):
def
__init__
(
self
,
dnodes_lists
):
super
(
MyDnodes
,
self
).
__init__
()
self
.
dnodes
=
dnodes_lists
# dnode must be TDDnode instance
self
.
simDeployed
=
False
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
self
.
TDDnodes
=
None
self
.
depoly_cluster
(
3
)
self
.
master_dnode
=
self
.
TDDnodes
.
dnodes
[
0
]
conn1
=
taos
.
connect
(
self
.
master_dnode
.
cfgDict
[
"fqdn"
]
,
config
=
self
.
master_dnode
.
cfgDir
)
tdSql
.
init
(
conn1
.
cursor
())
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
(
"taosd"
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
buildPath
=
root
[:
len
(
root
)
-
len
(
"/build/bin"
)]
break
return
buildPath
def
prepare_udf_so
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
print
(
projPath
)
libudf1
=
subprocess
.
Popen
(
'find %s -name "libudf1.so"|grep lib|head -n1'
%
projPath
,
shell
=
True
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
STDOUT
).
stdout
.
read
().
decode
(
"utf-8"
)
libudf2
=
subprocess
.
Popen
(
'find %s -name "libudf2.so"|grep lib|head -n1'
%
projPath
,
shell
=
True
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
STDOUT
).
stdout
.
read
().
decode
(
"utf-8"
)
os
.
system
(
"mkdir /tmp/udf/"
)
os
.
system
(
"sudo cp %s /tmp/udf/ "
%
libudf1
.
replace
(
"
\n
"
,
""
))
os
.
system
(
"sudo cp %s /tmp/udf/ "
%
libudf2
.
replace
(
"
\n
"
,
""
))
def
prepare_data
(
self
):
tdSql
.
execute
(
"drop database if exists db"
)
tdSql
.
execute
(
"create database if not exists db replica 1 days 300"
)
tdSql
.
execute
(
"use db"
)
tdSql
.
execute
(
'''create table stb1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
tags (t1 int)
'''
)
tdSql
.
execute
(
'''
create table t1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
'''
)
for
i
in
range
(
4
):
tdSql
.
execute
(
f
'create table ct
{
i
+
1
}
using stb1 tags (
{
i
+
1
}
)'
)
for
i
in
range
(
9
):
tdSql
.
execute
(
f
"insert into ct1 values ( now()-
{
i
*
10
}
s,
{
1
*
i
}
,
{
11111
*
i
}
,
{
111
*
i
}
,
{
11
*
i
}
,
{
1.11
*
i
}
,
{
11.11
*
i
}
,
{
i
%
2
}
, 'binary
{
i
}
', 'nchar
{
i
}
', now()+
{
1
*
i
}
a )"
)
tdSql
.
execute
(
f
"insert into ct4 values ( now()-
{
i
*
90
}
d,
{
1
*
i
}
,
{
11111
*
i
}
,
{
111
*
i
}
,
{
11
*
i
}
,
{
1.11
*
i
}
,
{
11.11
*
i
}
,
{
i
%
2
}
, 'binary
{
i
}
', 'nchar
{
i
}
', now()+
{
1
*
i
}
a )"
)
tdSql
.
execute
(
"insert into ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )"
)
tdSql
.
execute
(
"insert into ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )"
)
tdSql
.
execute
(
"insert into ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )"
)
tdSql
.
execute
(
"insert into ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )"
)
tdSql
.
execute
(
"insert into ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) "
)
tdSql
.
execute
(
"insert into ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) "
)
tdSql
.
execute
(
"insert into ct4 values (now()+9d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) "
)
tdSql
.
execute
(
f
'''insert into t1 values
( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a )
( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a )
( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a )
( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a )
( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a )
( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a )
( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" )
( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" )
( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" )
( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
'''
)
tdSql
.
execute
(
"create table tb (ts timestamp , num1 int , num2 int, num3 double , num4 binary(30))"
)
tdSql
.
execute
(
f
'''insert into tb values
( '2020-04-21 01:01:01.000', NULL, 1, 1, "binary1" )
( '2020-10-21 01:01:01.000', 1, 1, 1.11, "binary1" )
( '2020-12-31 01:01:01.000', 2, 22222, 22, "binary1" )
( '2021-01-01 01:01:06.000', 3, 33333, 33, "binary1" )
( '2021-05-07 01:01:10.000', 4, 44444, 44, "binary1" )
( '2021-07-21 01:01:01.000', NULL, NULL, NULL, "binary1" )
( '2021-09-30 01:01:16.000', 5, 55555, 55, "binary1" )
( '2022-02-01 01:01:20.000', 6, 66666, 66, "binary1" )
( '2022-10-28 01:01:26.000', 0, 00000, 00, "binary1" )
( '2022-12-01 01:01:30.000', 8, -88888, -88, "binary1" )
( '2022-12-31 01:01:36.000', 9, -9999999, -99, "binary1" )
( '2023-02-21 01:01:01.000', NULL, NULL, NULL, "binary1" )
'''
)
def
create_udf_function
(
self
):
for
i
in
range
(
10
):
# create scalar functions
tdSql
.
execute
(
"create function udf1 as '/tmp/udf/libudf1.so' outputtype int bufSize 8;"
)
# create aggregate functions
tdSql
.
execute
(
"create aggregate function udf2 as '/tmp/udf/libudf2.so' outputtype double bufSize 8;"
)
# functions = tdSql.getResult("show functions")
# function_nums = len(functions)
# if function_nums == 2:
# tdLog.info("create two udf functions success ")
# drop functions
tdSql
.
execute
(
"drop function udf1"
)
tdSql
.
execute
(
"drop function udf2"
)
functions
=
tdSql
.
getResult
(
"show functions"
)
for
function
in
functions
:
if
"udf1"
in
function
[
0
]
or
"udf2"
in
function
[
0
]:
tdLog
.
info
(
"drop udf functions failed "
)
tdLog
.
exit
(
"drop udf functions failed"
)
tdLog
.
info
(
"drop two udf functions success "
)
# create scalar functions
tdSql
.
execute
(
"create function udf1 as '/tmp/udf/libudf1.so' outputtype int bufSize 8;"
)
# create aggregate functions
tdSql
.
execute
(
"create aggregate function udf2 as '/tmp/udf/libudf2.so' outputtype double bufSize 8;"
)
functions
=
tdSql
.
getResult
(
"show functions"
)
function_nums
=
len
(
functions
)
if
function_nums
==
2
:
tdLog
.
info
(
"create two udf functions success "
)
def
basic_udf_query
(
self
,
dnode
):
mytdSql
=
self
.
getConnection
(
dnode
)
# scalar functions
mytdSql
.
execute
(
"use db "
)
result
=
mytdSql
.
query
(
"select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb"
)
data
=
result
.
fetch_all
()
print
(
data
)
if
data
==
[(
None
,
None
,
1
,
88
,
1.0
,
88
,
'binary1'
,
88
),
(
1
,
88
,
1
,
88
,
1.11
,
88
,
'binary1'
,
88
),
(
2
,
88
,
22222
,
88
,
22.0
,
88
,
'binary1'
,
88
),
(
3
,
88
,
33333
,
88
,
33.0
,
88
,
'binary1'
,
88
),
(
4
,
88
,
44444
,
88
,
44.0
,
88
,
'binary1'
,
88
),
(
None
,
None
,
None
,
None
,
None
,
None
,
'binary1'
,
88
),
(
5
,
88
,
55555
,
88
,
55.0
,
88
,
'binary1'
,
88
),
(
6
,
88
,
66666
,
88
,
66.0
,
88
,
'binary1'
,
88
),
(
0
,
88
,
0
,
88
,
0.0
,
88
,
'binary1'
,
88
),
(
8
,
88
,
-
88888
,
88
,
-
88.0
,
88
,
'binary1'
,
88
),
(
9
,
88
,
-
9999999
,
88
,
-
99.0
,
88
,
'binary1'
,
88
),
(
None
,
None
,
None
,
None
,
None
,
None
,
'binary1'
,
88
)]:
tdLog
.
info
(
" UDF query check ok at :dnode_index %s"
%
dnode
.
index
)
else
:
tdLog
.
info
(
" UDF query check failed at :dnode_index %s"
%
dnode
.
index
)
tdLog
.
exit
(
"query check failed at :dnode_index %s"
%
dnode
.
index
)
result
=
mytdSql
.
query
(
"select udf1(c1,c6), udf1(c1) ,udf1(c6) from stb1 order by ts"
)
data
=
result
.
fetch_all
()
print
(
data
)
if
data
==
[(
None
,
None
,
None
),
(
88
,
88
,
88
),
(
88
,
88
,
88
),
(
88
,
88
,
88
),
(
88
,
88
,
88
),
(
None
,
None
,
None
),
(
88
,
88
,
88
),
(
88
,
88
,
88
),
(
88
,
88
,
88
),
(
88
,
88
,
88
),
(
88
,
88
,
88
),
(
88
,
88
,
88
),
(
88
,
88
,
88
),
(
88
,
88
,
88
),
(
88
,
88
,
88
),
(
88
,
88
,
88
),
(
88
,
88
,
88
),
(
88
,
88
,
88
),
(
88
,
88
,
88
),
(
88
,
88
,
88
),
(
88
,
88
,
88
),
(
88
,
88
,
88
),
(
None
,
88
,
None
),
(
88
,
88
,
88
),
(
None
,
None
,
None
)]:
tdLog
.
info
(
" UDF query check ok at :dnode_index %s"
%
dnode
.
index
)
else
:
tdLog
.
info
(
" UDF query check failed at :dnode_index %s"
%
dnode
.
index
)
tdLog
.
exit
(
"query check failed at :dnode_index %s"
%
dnode
.
index
)
result
=
mytdSql
.
query
(
"select udf2(c1,c6), udf2(c1) ,udf2(c6) from stb1 "
)
data
=
result
.
fetch_all
()
print
(
data
)
expect_data
=
[(
266.47194411419747
,
25.514701644346147
,
265.247614503882
)]
status
=
True
for
index
in
range
(
len
(
expect_data
[
0
])):
if
abs
(
expect_data
[
0
][
index
]
-
data
[
0
][
index
])
>
0.0001
:
status
=
False
break
if
status
:
tdLog
.
info
(
" UDF query check ok at :dnode_index %s"
%
dnode
.
index
)
else
:
tdLog
.
info
(
" UDF query check failed at :dnode_index %s"
%
dnode
.
index
)
tdLog
.
exit
(
"query check failed at :dnode_index %s"
%
dnode
.
index
)
result
=
mytdSql
.
query
(
"select udf2(num1,num2,num3), udf2(num1) ,udf2(num2) from tb "
)
data
=
result
.
fetch_all
()
print
(
data
)
expect_data
=
[(
10000949.554622812
,
15.362291495737216
,
10000949.553189287
)]
status
=
True
for
index
in
range
(
len
(
expect_data
[
0
])):
if
abs
(
expect_data
[
0
][
index
]
-
data
[
0
][
index
])
>
0.0001
:
status
=
False
break
if
status
:
tdLog
.
info
(
" UDF query check ok at :dnode_index %s"
%
dnode
.
index
)
else
:
tdLog
.
info
(
" UDF query check failed at :dnode_index %s"
%
dnode
.
index
)
tdLog
.
exit
(
"query check failed at :dnode_index %s"
%
dnode
.
index
)
def
check_UDF_query
(
self
):
for
i
in
range
(
20
):
for
dnode
in
self
.
TDDnodes
.
dnodes
:
self
.
basic_udf_query
(
dnode
)
def
depoly_cluster
(
self
,
dnodes_nums
):
testCluster
=
False
valgrind
=
0
hostname
=
socket
.
gethostname
()
dnodes
=
[]
start_port
=
6030
for
num
in
range
(
1
,
dnodes_nums
+
1
):
dnode
=
TDDnode
(
num
)
dnode
.
addExtraCfg
(
"firstEp"
,
f
"
{
hostname
}
:
{
start_port
}
"
)
dnode
.
addExtraCfg
(
"fqdn"
,
f
"
{
hostname
}
"
)
dnode
.
addExtraCfg
(
"serverPort"
,
f
"
{
start_port
+
(
num
-
1
)
*
100
}
"
)
dnode
.
addExtraCfg
(
"monitorFqdn"
,
hostname
)
dnode
.
addExtraCfg
(
"monitorPort"
,
7043
)
dnodes
.
append
(
dnode
)
self
.
TDDnodes
=
MyDnodes
(
dnodes
)
self
.
TDDnodes
.
init
(
""
)
self
.
TDDnodes
.
setTestCluster
(
testCluster
)
self
.
TDDnodes
.
setValgrind
(
valgrind
)
self
.
TDDnodes
.
stopAll
()
for
dnode
in
self
.
TDDnodes
.
dnodes
:
self
.
TDDnodes
.
deploy
(
dnode
.
index
,{})
for
dnode
in
self
.
TDDnodes
.
dnodes
:
self
.
TDDnodes
.
start
(
dnode
.
index
)
# create cluster
for
dnode
in
self
.
TDDnodes
.
dnodes
:
print
(
dnode
.
cfgDict
)
dnode_id
=
dnode
.
cfgDict
[
"fqdn"
]
+
":"
+
dnode
.
cfgDict
[
"serverPort"
]
dnode_first_host
=
dnode
.
cfgDict
[
"firstEp"
].
split
(
":"
)[
0
]
dnode_first_port
=
dnode
.
cfgDict
[
"firstEp"
].
split
(
":"
)[
-
1
]
cmd
=
f
" taos -h
{
dnode_first_host
}
-P
{
dnode_first_port
}
-s ' create dnode
\"
{
dnode_id
}
\"
' ;"
print
(
cmd
)
os
.
system
(
cmd
)
time
.
sleep
(
2
)
tdLog
.
info
(
" create cluster done! "
)
def
getConnection
(
self
,
dnode
):
host
=
dnode
.
cfgDict
[
"fqdn"
]
port
=
dnode
.
cfgDict
[
"serverPort"
]
config_dir
=
dnode
.
cfgDir
return
taos
.
connect
(
host
=
host
,
port
=
int
(
port
),
config
=
config_dir
)
def
restart_udfd
(
self
,
dnode
):
buildPath
=
self
.
getBuildPath
()
if
(
buildPath
==
""
):
tdLog
.
exit
(
"taosd not found!"
)
else
:
tdLog
.
info
(
"taosd found in %s"
%
buildPath
)
cfgPath
=
dnode
.
cfgDir
udfdPath
=
buildPath
+
'/build/bin/udfd'
for
i
in
range
(
5
):
tdLog
.
info
(
" loop restart udfd %d_th at dnode_index : %s"
%
(
i
,
dnode
.
index
))
self
.
basic_udf_query
(
dnode
)
# stop udfd cmds
get_processID
=
"ps -ef | grep -w udfd | grep %s | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}'"
%
cfgPath
processID
=
subprocess
.
check_output
(
get_processID
,
shell
=
True
).
decode
(
"utf-8"
)
stop_udfd
=
" kill -9 %s"
%
processID
os
.
system
(
stop_udfd
)
self
.
basic_udf_query
(
dnode
)
def
test_restart_udfd_All_dnodes
(
self
):
for
dnode
in
self
.
TDDnodes
.
dnodes
:
tdLog
.
info
(
" start restart udfd for dnode_index :%s"
%
dnode
.
index
)
self
.
restart_udfd
(
dnode
)
def
run
(
self
):
# sourcery skip: extract-duplicate-method, remove-redundant-fstring
print
(
self
.
master_dnode
.
cfgDict
)
self
.
prepare_data
()
self
.
prepare_udf_so
()
self
.
create_udf_function
()
self
.
basic_udf_query
(
self
.
master_dnode
)
# self.check_UDF_query()
self
.
restart_udfd
(
self
.
master_dnode
)
# self.test_restart_udfd_All_dnodes()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录