diff --git a/tests/system-test/0-others/udfTest.py b/tests/system-test/0-others/udfTest.py index 0a998aee2bb3ed0df5fcc4ca1c0826738a3473b3..342b5685a0edc488481c6a81c16210fd480edbd2 100644 --- a/tests/system-test/0-others/udfTest.py +++ b/tests/system-test/0-others/udfTest.py @@ -1,3 +1,4 @@ +from distutils.log import error import taos import sys import time @@ -43,12 +44,14 @@ class TDTestCase: libudf1 = subprocess.Popen('find %s -name "libudf1.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") libudf2 = subprocess.Popen('find %s -name "libudf2.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") os.system("mkdir /tmp/udf/") - os.system("sudo cp %s /tmp/udf/ "%libudf1.replace("\n" ,"")) - os.system("sudo cp %s /tmp/udf/ "%libudf2.replace("\n" ,"")) + os.system("cp %s /tmp/udf/ "%libudf1.replace("\n" ,"")) + os.system("cp %s /tmp/udf/ "%libudf2.replace("\n" ,"")) def prepare_data(self): + tdSql.execute("drop database if exists db ") + tdSql.execute("create database if not exists db days 300") tdSql.execute("use db") tdSql.execute( '''create table stb1 @@ -117,6 +120,17 @@ class TDTestCase: ''' ) + # udf functions with join + ts_start = 1652517451000 + tdSql.execute("create stable st (ts timestamp , c1 int , c2 int ,c3 double ,c4 double ) tags(ind int)") + tdSql.execute("create table sub1 using st tags(1)") + tdSql.execute("create table sub2 using st tags(2)") + + for i in range(10): + ts = ts_start + i *1000 + tdSql.execute(" insert into sub1 values({} , {},{},{},{})".format(ts,i ,i*10,i*100.0,i*1000.0)) + tdSql.execute(" insert into sub2 values({} , {},{},{},{})".format(ts,i ,i*10,i*100.0,i*1000.0)) + def create_udf_function(self): @@ -378,17 +392,6 @@ class TDTestCase: tdSql.checkData(0,1,88) tdSql.checkData(0,2,-99.990000000) tdSql.checkData(0,3,88) - - # udf functions with join - ts_start = 1652517451000 - tdSql.execute("create stable st (ts timestamp , c1 int , c2 int ,c3 double ,c4 double ) tags(ind int)") - tdSql.execute("create table sub1 using st tags(1)") - tdSql.execute("create table sub2 using st tags(2)") - - for i in range(10): - ts = ts_start + i *1000 - tdSql.execute(" insert into sub1 values({} , {},{},{},{})".format(ts,i ,i*10,i*100.0,i*1000.0)) - tdSql.execute(" insert into sub2 values({} , {},{},{},{})".format(ts,i ,i*10,i*100.0,i*1000.0)) tdSql.query("select sub1.c1, sub2.c2 from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null") tdSql.checkData(0,0,0) @@ -468,10 +471,103 @@ class TDTestCase: tdSql.checkData(0,0,169.661427555) tdSql.checkData(0,1,169.661427555) + def try_query_sql(self): + udf1_sqls = [ + "select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb" , + "select c1 , udf1(c1) ,c2 ,udf1(c2), c3 ,udf1(c3), c4 ,udf1(c4) from stb1 order by c1" , + "select udf1(num1) , max(num1) from tb;" , + "select udf1(num1) , min(num1) from tb;" , + "select udf1(num1) , top(num1,1) from tb;" , + "select udf1(num1) , bottom(num1,1) from tb;" , + "select udf1(c1) , max(c1) from stb1;" , + "select udf1(c1) , min(c1) from stb1;" , + "select udf1(c1) , top(c1 ,1) from stb1;" , + "select udf1(c1) , bottom(c1,1) from stb1;" , + "select udf1(num1) , abs(num1) from tb;" , + "select udf1(num1) , csum(num1) from tb;" , + "select udf1(c1) , csum(c1) from stb1;" , + "select udf1(c1) , abs(c1) from stb1;" , + "select abs(udf1(c1)) , abs(ceil(c1)) from stb1 order by ts;" , + "select abs(udf1(c1)) , abs(ceil(c1)) from ct1 order by ts;" , + "select abs(udf1(c1)) , abs(ceil(c1)) from stb1 where c1 is null order by ts;" , + "select c1 ,udf1(c1) , c6 ,udf1(c6) from stb1 where c1 > 8 order by ts" , + "select udf1(sub1.c1), udf1(sub2.c2) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null" , + "select sub1.c1 , udf1(sub1.c1), sub2.c2 ,udf1(sub2.c2) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null" , + "select udf1(c1) from ct1 group by c1" , + "select udf1(c1) from stb1 group by c1" , + "select c1,c2, udf1(c1,c2) from ct1 group by c1,c2" , + "select c1,c2, udf1(c1,c2) from stb1 group by c1,c2" , + "select num1,num2,num3,udf1(num1,num2,num3) from tb" , + "select c1,c6,udf1(c1,c6) from stb1 order by ts" , + "select abs(udf1(c1,c6,c1,c6)) , abs(ceil(c1)) from stb1 where c1 is not null order by ts;" + ] + udf2_sqls = ["select udf2(sub1.c1), udf2(sub2.c2) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null" , + "select udf2(c1) from stb1 group by 1-udf1(c1)" , + "select udf2(num1) ,udf2(num2), udf2(num3) from tb" , + "select udf2(num1)+100 ,udf2(num2)-100, udf2(num3)*100 ,udf2(num3)/100 from tb" , + "select udf2(c1) ,udf2(c6) from stb1 " , + "select udf2(c1)+100 ,udf2(c6)-100 ,udf2(c1)*100 ,udf2(c6)/100 from stb1 " , + "select udf2(c1+100) ,udf2(c6-100) ,udf2(c1*100) ,udf2(c6/100) from ct1" , + "select udf2(c1+100) ,udf2(c6-100) ,udf2(c1*100) ,udf2(c6/100) from stb1 " , + "select udf2(c1) from ct1 group by c1" , + "select udf2(c1) from stb1 group by c1" , + "select c1,c2, udf2(c1,c6) from ct1 group by c1,c2" , + "select c1,c2, udf2(c1,c6) from stb1 group by c1,c2" , + "select udf2(c1) from stb1 group by udf1(c1)" , + "select udf2(c1) from stb1 group by floor(c1)" , + "select udf2(c1) from stb1 group by floor(c1) order by udf2(c1)" , + + "select udf2(sub1.c1 ,sub1.c2), udf2(sub2.c2 ,sub2.c1) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null" , + "select udf2(sub1.c1 ,sub1.c2), udf2(sub2.c2 ,sub2.c1) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null" , + "select udf2(sub1.c1 ,sub1.c2), udf2(sub2.c2 ,sub2.c1) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null" , + "select udf2(sub1.c1 ,sub1.c2), udf2(sub2.c2 ,sub2.c1) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null"] + + return udf1_sqls ,udf2_sqls + + def unexpected_create(self): + + tdLog.info(" create function with out bufsize ") + tdSql.query("drop function udf1 ") + tdSql.query("drop function udf2 ") + + # create function without buffer + tdSql.execute("create function udf1 as '/tmp/udf/libudf1.so' outputtype int") + tdSql.execute("create aggregate function udf2 as '/tmp/udf/libudf2.so' outputtype double") + udf1_sqls ,udf2_sqls = self.try_query_sql() + + for scalar_sql in udf1_sqls: + tdSql.query(scalar_sql) + for aggregate_sql in udf2_sqls: + tdSql.error(aggregate_sql) + + # create function without aggregate + + tdLog.info(" create function with out aggregate ") + tdSql.query("drop function udf1 ") + tdSql.query("drop function udf2 ") + + # create function without buffer + tdSql.execute("create aggregate function udf1 as '/tmp/udf/libudf1.so' outputtype int bufSize 8 ") + tdSql.execute("create function udf2 as '/tmp/udf/libudf2.so' outputtype double bufSize 8") + udf1_sqls ,udf2_sqls = self.try_query_sql() + + for scalar_sql in udf1_sqls: + tdSql.error(scalar_sql) + for aggregate_sql in udf2_sqls: + tdSql.error(aggregate_sql) + + tdSql.execute(" create function db as '/tmp/udf/libudf1.so' outputtype int bufSize 8 ") + tdSql.execute(" create aggregate function test as '/tmp/udf/libudf1.so' outputtype int bufSize 8 ") + tdSql.error(" select db(c1) from stb1 ") + tdSql.error(" select db(c1,c6), db(c6) from stb1 ") + tdSql.error(" select db(num1,num2), db(num1) from tb ") + tdSql.error(" select test(c1) from stb1 ") + tdSql.error(" select test(c1,c6), test(c6) from stb1 ") + tdSql.error(" select test(num1,num2), test(num1) from tb ") + - tdSql.query("select udf2(sub1.c1 ,sub1.c2), udf2(sub2.c2 ,sub2.c1) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null") def loop_kill_udfd(self): @@ -484,7 +580,7 @@ class TDTestCase: cfgPath = buildPath + "/../sim/dnode1/cfg" udfdPath = buildPath +'/build/bin/udfd' - for i in range(5): + for i in range(3): tdLog.info(" loop restart udfd %d_th" % i) @@ -492,7 +588,7 @@ class TDTestCase: tdSql.checkData(0,0,169.661427555) tdSql.checkData(0,1,169.661427555) # stop udfd cmds - get_processID = "ps -ef | grep -w udfd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}'" + get_processID = "ps -ef | grep -w udfd | grep -v grep| grep -v defunct | awk '{print $2}'" processID = subprocess.check_output(get_processID, shell=True).decode("utf-8") stop_udfd = " kill -9 %s" % processID os.system(stop_udfd) @@ -507,11 +603,27 @@ class TDTestCase: # start_udfd = "nohup " + udfdPath +'-c' +cfgPath +" > /dev/null 2>&1 &" # tdLog.info("start udfd : %s " % start_udfd) - + def test_function_name(self): + tdLog.info(" create function name is not build_in functions ") + tdSql.execute(" drop function udf1 ") + tdSql.execute(" drop function udf2 ") + tdSql.error("create function max as '/tmp/udf/libudf1.so' outputtype int bufSize 8") + tdSql.error("create aggregate function sum as '/tmp/udf/libudf2.so' outputtype double bufSize 8") + tdSql.error("create function max as '/tmp/udf/libudf1.so' outputtype int bufSize 8") + tdSql.error("create aggregate function sum as '/tmp/udf/libudf2.so' outputtype double bufSize 8") + tdSql.error("create aggregate function tbname as '/tmp/udf/libudf2.so' outputtype double bufSize 8") + tdSql.error("create aggregate function function as '/tmp/udf/libudf2.so' outputtype double bufSize 8") + tdSql.error("create aggregate function stable as '/tmp/udf/libudf2.so' outputtype double bufSize 8") + tdSql.error("create aggregate function union as '/tmp/udf/libudf2.so' outputtype double bufSize 8") + tdSql.error("create aggregate function 123 as '/tmp/udf/libudf2.so' outputtype double bufSize 8") + tdSql.error("create aggregate function 123db as '/tmp/udf/libudf2.so' outputtype double bufSize 8") + tdSql.error("create aggregate function mnode as '/tmp/udf/libudf2.so' outputtype double bufSize 8") + def restart_taosd_query_udf(self): + self.create_udf_function() + for i in range(5): - time.sleep(5) tdLog.info(" this is %d_th restart taosd " %i) tdSql.execute("use db ") tdSql.query("select count(*) from stb1") @@ -520,21 +632,29 @@ class TDTestCase: tdSql.checkData(0,0,169.661427555) tdSql.checkData(0,1,169.661427555) tdDnodes.stop(1) - time.sleep(2) tdDnodes.start(1) - time.sleep(5) - + time.sleep(2) def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring - tdSql.prepare() + print(" env is ok for all ") self.prepare_udf_so() self.prepare_data() self.create_udf_function() self.basic_udf_query() self.loop_kill_udfd() - # self.restart_taosd_query_udf() + + self.unexpected_create() + tdSql.execute(" drop function udf1 ") + tdSql.execute(" drop function udf2 ") + self.create_udf_function() + time.sleep(2) + self.basic_udf_query() + self.test_function_name() + self.restart_taosd_query_udf() + + def stop(self): tdSql.close() diff --git a/tests/system-test/0-others/udf_cluster.py b/tests/system-test/0-others/udf_cluster.py new file mode 100644 index 0000000000000000000000000000000000000000..de998e9087c2bc8ef1ff3b1aab09695cf57fd8f4 --- /dev/null +++ b/tests/system-test/0-others/udf_cluster.py @@ -0,0 +1,338 @@ +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +import socket +import subprocess + +class MyDnodes(TDDnodes): + def __init__(self ,dnodes_lists): + super(MyDnodes,self).__init__() + self.dnodes = dnodes_lists # dnode must be TDDnode instance + self.simDeployed = False + +class TDTestCase: + + def init(self,conn ,logSql): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + self.depoly_cluster(3) + self.master_dnode = self.TDDnodes.dnodes[0] + conn1 = taos.connect(self.master_dnode.cfgDict["fqdn"] , config=self.master_dnode.cfgDir) + tdSql.init(conn1.cursor()) + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def prepare_udf_so(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + print(projPath) + + libudf1 = subprocess.Popen('find %s -name "libudf1.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + libudf2 = subprocess.Popen('find %s -name "libudf2.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + os.system("mkdir /tmp/udf/") + os.system("sudo cp %s /tmp/udf/ "%libudf1.replace("\n" ,"")) + os.system("sudo cp %s /tmp/udf/ "%libudf2.replace("\n" ,"")) + + + def prepare_data(self): + + tdSql.execute("drop database if exists db") + tdSql.execute("create database if not exists db replica 1 days 300") + tdSql.execute("use db") + tdSql.execute( + '''create table stb1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + tags (t1 int) + ''' + ) + + tdSql.execute( + ''' + create table t1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + ''' + ) + for i in range(4): + tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') + + for i in range(9): + tdSql.execute( + f"insert into ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )" + ) + tdSql.execute( + f"insert into ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )" + ) + tdSql.execute("insert into ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )") + tdSql.execute("insert into ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )") + tdSql.execute("insert into ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )") + tdSql.execute("insert into ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )") + + tdSql.execute("insert into ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + tdSql.execute("insert into ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + tdSql.execute("insert into ct4 values (now()+9d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + + tdSql.execute( + f'''insert into t1 values + ( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a ) + ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a ) + ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a ) + ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a ) + ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a ) + ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a ) + ( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" ) + ( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" ) + ( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" ) + ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ''' + ) + + tdSql.execute("create table tb (ts timestamp , num1 int , num2 int, num3 double , num4 binary(30))") + tdSql.execute( + f'''insert into tb values + ( '2020-04-21 01:01:01.000', NULL, 1, 1, "binary1" ) + ( '2020-10-21 01:01:01.000', 1, 1, 1.11, "binary1" ) + ( '2020-12-31 01:01:01.000', 2, 22222, 22, "binary1" ) + ( '2021-01-01 01:01:06.000', 3, 33333, 33, "binary1" ) + ( '2021-05-07 01:01:10.000', 4, 44444, 44, "binary1" ) + ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, "binary1" ) + ( '2021-09-30 01:01:16.000', 5, 55555, 55, "binary1" ) + ( '2022-02-01 01:01:20.000', 6, 66666, 66, "binary1" ) + ( '2022-10-28 01:01:26.000', 0, 00000, 00, "binary1" ) + ( '2022-12-01 01:01:30.000', 8, -88888, -88, "binary1" ) + ( '2022-12-31 01:01:36.000', 9, -9999999, -99, "binary1" ) + ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, "binary1" ) + ''' + ) + + + def create_udf_function(self ): + + for i in range(10): + # create scalar functions + tdSql.execute("create function udf1 as '/tmp/udf/libudf1.so' outputtype int bufSize 8;") + + # create aggregate functions + + tdSql.execute("create aggregate function udf2 as '/tmp/udf/libudf2.so' outputtype double bufSize 8;") + + # functions = tdSql.getResult("show functions") + # function_nums = len(functions) + # if function_nums == 2: + # tdLog.info("create two udf functions success ") + + # drop functions + + tdSql.execute("drop function udf1") + tdSql.execute("drop function udf2") + + functions = tdSql.getResult("show functions") + for function in functions: + if "udf1" in function[0] or "udf2" in function[0]: + tdLog.info("drop udf functions failed ") + tdLog.exit("drop udf functions failed") + + tdLog.info("drop two udf functions success ") + + # create scalar functions + tdSql.execute("create function udf1 as '/tmp/udf/libudf1.so' outputtype int bufSize 8;") + + # create aggregate functions + + tdSql.execute("create aggregate function udf2 as '/tmp/udf/libudf2.so' outputtype double bufSize 8;") + + functions = tdSql.getResult("show functions") + function_nums = len(functions) + if function_nums == 2: + tdLog.info("create two udf functions success ") + + def basic_udf_query(self , dnode): + + mytdSql = self.getConnection(dnode) + # scalar functions + + mytdSql.execute("use db ") + + result = mytdSql.query("select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb") + data = result.fetch_all() + print(data) + if data == [(None, None, 1, 88, 1.0, 88, 'binary1', 88), (1, 88, 1, 88, 1.11, 88, 'binary1', 88), (2, 88, 22222, 88, 22.0, 88, 'binary1', 88), (3, 88, 33333, 88, 33.0, 88, 'binary1', 88), (4, 88, 44444, 88, 44.0, 88, 'binary1', 88), (None, None, None, None, None, None, 'binary1', 88), (5, 88, 55555, 88, 55.0, 88, 'binary1', 88), (6, 88, 66666, 88, 66.0, 88, 'binary1', 88), (0, 88, 0, 88, 0.0, 88, 'binary1', 88), (8, 88, -88888, 88, -88.0, 88, 'binary1', 88), (9, 88, -9999999, 88, -99.0, 88, 'binary1', 88), (None, None, None, None, None, None, 'binary1', 88)]: + tdLog.info(" UDF query check ok at :dnode_index %s" %dnode.index) + else: + tdLog.info(" UDF query check failed at :dnode_index %s" %dnode.index) + tdLog.exit("query check failed at :dnode_index %s" %dnode.index ) + + result = mytdSql.query("select udf1(c1,c6), udf1(c1) ,udf1(c6) from stb1 order by ts") + data = result.fetch_all() + print(data) + if data == [(None, None, None), (88, 88, 88), (88, 88, 88), (88, 88, 88), (88, 88, 88), (None, None, None), (88, 88, 88), (88, 88, 88), (88, 88, 88), (88, 88, 88), (88, 88, 88), (88, 88, 88), (88, 88, 88), (88, 88, 88), (88, 88, 88), (88, 88, 88), (88, 88, 88), (88, 88, 88), (88, 88, 88), (88, 88, 88), (88, 88, 88), (88, 88, 88), (None, 88, None), (88, 88, 88), (None, None, None)]: + tdLog.info(" UDF query check ok at :dnode_index %s" %dnode.index) + else: + tdLog.info(" UDF query check failed at :dnode_index %s" %dnode.index) + tdLog.exit("query check failed at :dnode_index %s" %dnode.index ) + + result = mytdSql.query("select udf2(c1,c6), udf2(c1) ,udf2(c6) from stb1 ") + data = result.fetch_all() + print(data) + expect_data = [(266.47194411419747, 25.514701644346147, 265.247614503882)] + status = True + for index in range(len(expect_data[0])): + if abs(expect_data[0][index] - data[0][index]) >0.0001: + status = False + break + + if status : + tdLog.info(" UDF query check ok at :dnode_index %s" %dnode.index) + else: + tdLog.info(" UDF query check failed at :dnode_index %s" %dnode.index) + tdLog.exit("query check failed at :dnode_index %s" %dnode.index ) + + result = mytdSql.query("select udf2(num1,num2,num3), udf2(num1) ,udf2(num2) from tb ") + data = result.fetch_all() + print(data) + expect_data = [(10000949.554622812, 15.362291495737216, 10000949.553189287)] + status = True + for index in range(len(expect_data[0])): + if abs(expect_data[0][index] - data[0][index]) >0.0001: + status = False + break + + if status : + tdLog.info(" UDF query check ok at :dnode_index %s" %dnode.index) + else: + tdLog.info(" UDF query check failed at :dnode_index %s" %dnode.index) + tdLog.exit("query check failed at :dnode_index %s" %dnode.index ) + + + def check_UDF_query(self): + + for i in range(20): + for dnode in self.TDDnodes.dnodes: + self.basic_udf_query(dnode) + + + def depoly_cluster(self ,dnodes_nums): + + testCluster = False + valgrind = 0 + hostname = socket.gethostname() + dnodes = [] + start_port = 6030 + for num in range(1, dnodes_nums+1): + dnode = TDDnode(num) + dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}") + dnode.addExtraCfg("fqdn", f"{hostname}") + dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}") + dnode.addExtraCfg("monitorFqdn", hostname) + dnode.addExtraCfg("monitorPort", 7043) + dnodes.append(dnode) + + self.TDDnodes = MyDnodes(dnodes) + self.TDDnodes.init("") + self.TDDnodes.setTestCluster(testCluster) + self.TDDnodes.setValgrind(valgrind) + self.TDDnodes.stopAll() + for dnode in self.TDDnodes.dnodes: + self.TDDnodes.deploy(dnode.index,{}) + + for dnode in self.TDDnodes.dnodes: + self.TDDnodes.start(dnode.index) + + # create cluster + + for dnode in self.TDDnodes.dnodes: + print(dnode.cfgDict) + dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"] + dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0] + dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1] + cmd = f" taos -h {dnode_first_host} -P {dnode_first_port} -s ' create dnode \"{dnode_id} \" ' ;" + print(cmd) + os.system(cmd) + + time.sleep(2) + tdLog.info(" create cluster done! ") + + + + def getConnection(self, dnode): + host = dnode.cfgDict["fqdn"] + port = dnode.cfgDict["serverPort"] + config_dir = dnode.cfgDir + return taos.connect(host=host, port=int(port), config=config_dir) + + def restart_udfd(self, dnode): + + buildPath = self.getBuildPath() + + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + + cfgPath = dnode.cfgDir + + udfdPath = buildPath +'/build/bin/udfd' + + for i in range(5): + + tdLog.info(" loop restart udfd %d_th at dnode_index : %s" % (i ,dnode.index)) + self.basic_udf_query(dnode) + # stop udfd cmds + get_processID = "ps -ef | grep -w udfd | grep %s | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}'"%cfgPath + processID = subprocess.check_output(get_processID, shell=True).decode("utf-8") + stop_udfd = " kill -9 %s" % processID + os.system(stop_udfd) + self.basic_udf_query(dnode) + + def test_restart_udfd_All_dnodes(self): + + for dnode in self.TDDnodes.dnodes: + tdLog.info(" start restart udfd for dnode_index :%s" %dnode.index ) + self.restart_udfd(dnode) + + + def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring + print(self.master_dnode.cfgDict) + self.prepare_data() + self.prepare_udf_so() + self.create_udf_function() + self.basic_udf_query(self.master_dnode) + # self.check_UDF_query() + self.restart_udfd(self.master_dnode) + # self.test_restart_udfd_All_dnodes() + + + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file