udfpy_main.py 15.6 KB
Newer Older
A
Alex Duan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-


from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *

import random
import os
A
Alex Duan 已提交
23
import subprocess
A
Alex Duan 已提交
24 25


A
Alex Duan 已提交
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
class PerfDB:
    def __init__(self):
        self.sqls = []
        self.spends = []

    # execute
    def execute(self, sql):
        print(f"  perfdb execute {sql}")
        stime = time.time()
        ret = tdSql.execute(sql, 1)
        spend = time.time() - stime

        self.sqls.append(sql)
        self.spends.append(spend)
        return ret

    # query
    def query(self, sql):
        print(f"  perfdb query {sql}")
        start = time.time()
        ret = tdSql.query(sql, None, 1)
        spend = time.time() - start
        self.sqls.append(sql)
        self.spends.append(spend)
        return ret
    

A
Alex Duan 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
class TDTestCase:
    def init(self, conn, logSql, replicaVar=1):
        self.replicaVar = int(replicaVar)
        tdLog.debug("start to execute %s" % __file__)
        tdSql.init(conn.cursor())
        self.setsql = TDSetSql()

        # udf path
        self.udf_path = os.path.dirname(os.path.realpath(__file__)) + "/udfpy"


        self.column_dict = {
            'ts': 'timestamp',
            'col1': 'tinyint',
            'col2': 'smallint',
            'col3': 'int',
            'col4': 'bigint',
            'col5': 'tinyint unsigned',
            'col6': 'smallint unsigned',
            'col7': 'int unsigned',
            'col8': 'bigint unsigned',
            'col9': 'float',
            'col10': 'double',
            'col11': 'bool',
A
Alex Duan 已提交
77
            'col12': 'varchar(120)',
A
Alex Duan 已提交
78
            'col13': 'nchar(100)',
A
Alex Duan 已提交
79 80 81 82 83 84 85 86 87 88 89 90 91
        }
        self.tag_dict = {
            't1': 'tinyint',
            't2': 'smallint',
            't3': 'int',
            't4': 'bigint',
            't5': 'tinyint unsigned',
            't6': 'smallint unsigned',
            't7': 'int unsigned',
            't8': 'bigint unsigned',
            't9': 'float',
            't10': 'double',
            't11': 'bool',
A
Alex Duan 已提交
92
            't12': 'varchar(120)',
A
Alex Duan 已提交
93
            't13': 'nchar(100)',         
A
Alex Duan 已提交
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
        }

    def set_stb_sql(self,stbname,column_dict,tag_dict):
        column_sql = ''
        tag_sql = ''
        for k,v in column_dict.items():
            column_sql += f"{k} {v}, "
        for k,v in tag_dict.items():
            tag_sql += f"{k} {v}, "
        create_stb_sql = f'create stable {stbname} ({column_sql[:-2]}) tags ({tag_sql[:-2]})'
        return create_stb_sql
    
    # create stable and child tables
    def create_table(self, stbname, tbname, count):
        tdSql.prepare()
        tdSql.execute('use db')
        self.child_count = count
        self.stbname = stbname
        self.tbname  = tbname
        
        # create stable
        create_table_sql = self.set_stb_sql(stbname, self.column_dict, self.tag_dict)
        tdSql.execute(create_table_sql)

A
Alex Duan 已提交
118
        batch_size = 1000
A
Alex Duan 已提交
119 120 121
        # create child table
        for i in range(count):
            ti = i % 128
A
Alex Duan 已提交
122
            tags = f'{ti},{ti},{i},{i},{ti},{ti},{i},{i},{i}.000{i},{i}.000{i},true,"var{i}","nch{i}"'
A
Alex Duan 已提交
123 124 125 126
            sql  = f'create table {tbname}{i} using {stbname} tags({tags});'
            tdSql.execute(sql)            
            if i % batch_size == 0:
               tdLog.info(f" create child table {i} ...")
A
Alex Duan 已提交
127 128 129

        tdLog.info(f" create {count} child tables ok.")

A
Alex Duan 已提交
130 131 132 133
    # create with dicts
    def create_sf_dicts(self, dicts, filename):
        for fun_name, out_type in dicts.items():
            sql = f' create function {fun_name} as "{self.udf_path}/{filename}" outputtype {out_type} language "Python" '
A
Alex Duan 已提交
134
            tdSql.execute(sql)
A
Alex Duan 已提交
135
            tdLog.info(sql)
A
Alex Duan 已提交
136 137

    # create_udfpy_function
A
Alex Duan 已提交
138
    def create_scalar_udfpy(self):
A
Alex Duan 已提交
139 140
        # scalar funciton
        self.scalar_funs = {
A
Alex Duan 已提交
141
            'sf0': 'timestamp',    
A
Alex Duan 已提交
142 143 144 145 146 147 148 149 150 151 152
            'sf1': 'tinyint',
            'sf2': 'smallint',
            'sf3': 'int',
            'sf4': 'bigint',
            'sf5': 'tinyint unsigned',
            'sf6': 'smallint unsigned',
            'sf7': 'int unsigned',
            'sf8': 'bigint unsigned',
            'sf9': 'float',
            'sf10': 'double',
            'sf11': 'bool',
A
Alex Duan 已提交
153
            'sf12': 'varchar(120)',
A
Alex Duan 已提交
154
            'sf13': 'nchar(100)'
A
Alex Duan 已提交
155 156 157 158 159 160 161 162 163 164 165 166 167 168
        }
        # agg function
        self.agg_funs = {
            'af1': 'tinyint',
            'af2': 'smallint',
            'af3': 'int',
            'af4': 'bigint',
            'af5': 'tinyint unsigned',
            'af6': 'smallint unsigned',
            'af7': 'int unsigned',
            'af8': 'bigint unsigned',
            'af9': 'float',
            'af10': 'double',
            'af11': 'bool',
A
Alex Duan 已提交
169
            'af12': 'varchar(120)',
A
Alex Duan 已提交
170
            'af13': 'nchar(100)',
A
Alex Duan 已提交
171 172 173
            'af14': 'timestamp'            
        }

A
Alex Duan 已提交
174
        # multi_args
A
Alex Duan 已提交
175
        self.create_sf_dicts(self.scalar_funs, "sf_origin.py")
A
Alex Duan 已提交
176 177 178 179 180 181 182
        fun_name = "sf_multi_args"
        self.create_udf_sf(fun_name, f'{fun_name}.py', "binary(1024)")

        # all type check null
        for col_name, col_type in self.column_dict.items():
             self.create_udf_sf(f"sf_null_{col_name}", "sf_null.py", col_type)

A
Alex Duan 已提交
183 184 185 186 187 188
        # concat
        fun_name = "sf_concat_var"
        self.create_udf_sf(fun_name, f'{fun_name}.py', "varchar(1024)")
        fun_name = "sf_concat_nch"
        self.create_udf_sf(fun_name, f'{fun_name}.py', "nchar(1024)")
             
A
Alex Duan 已提交
189

A
Alex Duan 已提交
190
    # fun_name == fun_name.py
A
Alex Duan 已提交
191 192
    def create_udf_sf(self, fun_name, file_name, out_type):
        sql = f'create function {fun_name} as "{self.udf_path}/{file_name}" outputtype {out_type} language "Python" '
A
Alex Duan 已提交
193
        tdSql.execute(sql)
A
Alex Duan 已提交
194
        tdLog.info(sql)
A
Alex Duan 已提交
195

A
Alex Duan 已提交
196 197
    def create_udf_af(self, fun_name, file_name, out_type, bufsize):
        sql = f'create aggregate function {fun_name} as "{self.udf_path}/{file_name}" outputtype {out_type} bufsize {bufsize} language "Python" '
A
Alex Duan 已提交
198
        tdSql.execute(sql)
A
Alex Duan 已提交
199
        tdLog.info(sql)
A
Alex Duan 已提交
200 201 202 203 204 205 206 207 208 209 210 211 212


    # sql1 query result eual with sql2
    def verify_same_result(self, sql1, sql2):
        # query
        result1 = tdSql.getResult(sql1)
        tdSql.query(sql2)
        
        for i, row in enumerate(result1):
            for j , val in enumerate(row):
                tdSql.checkData(i, j, result1[i][j])

    # same value like select col1, udf_fun1(col1) from st
213
    def verify_same_value(self, sql, col=0):
A
Alex Duan 已提交
214 215 216
        tdSql.query(sql)
        nrows = tdSql.getRows()
        for i in range(nrows):
217 218
            val = tdSql.getData(i, col)
            tdSql.checkData(i, col + 1, val)
A
Alex Duan 已提交
219 220 221 222 223 224 225 226 227 228 229 230 231

    # verify multi values
    def verify_same_multi_values(self, sql):
        tdSql.query(sql)
        nrows = tdSql.getRows()
        for i in range(nrows):
            udf_val = tdSql.getData(i, 0)
            vals = udf_val.split(',')
            for j,val in enumerate(vals, 1):
                tdSql.checkData(i, j, val)
            
    # query multi-args
    def query_multi_args(self):   
A
Alex Duan 已提交
232 233 234
        cols = list(self.column_dict.keys()) + list(self.tag_dict.keys())
        cols.remove("col13")
        cols.remove("t13")
A
Alex Duan 已提交
235
        cols.remove("ts")
A
Alex Duan 已提交
236
        ncols = len(cols)
A
Alex Duan 已提交
237
        print(cols)
A
Alex Duan 已提交
238
        for i in range(2, ncols):
A
Alex Duan 已提交
239 240
            sample = random.sample(cols, i)
            print(sample)
A
Alex Duan 已提交
241
            cols_name = ','.join(sample)
A
Alex Duan 已提交
242
            sql = f'select sf_multi_args({cols_name}),{cols_name} from {self.stbname} limit 10'
A
Alex Duan 已提交
243
            self.verify_same_multi_values(sql)
A
Alex Duan 已提交
244
            tdLog.info(sql)
A
Alex Duan 已提交
245 246 247 248 249

    
    # query_udfpy
    def query_scalar_udfpy(self):
        # col
A
Alex Duan 已提交
250 251
        for col_name, col_type in self.column_dict.items():
           for fun_name, out_type in self.scalar_funs.items():
A
Alex Duan 已提交
252
               if col_type == out_type :                   
A
Alex Duan 已提交
253
                    sql = f'select {col_name}, {fun_name}({col_name}) from {self.stbname} limit 10'
A
Alex Duan 已提交
254 255
                    tdLog.info(sql)
                    self.verify_same_value(sql)
A
Alex Duan 已提交
256
                    sql = f'select * from (select {col_name} as a, {fun_name}({col_name}) as b from {self.stbname}  limit 100) order by b,a desc'
A
Alex Duan 已提交
257 258
                    tdLog.info(sql)
                    self.verify_same_value(sql)
A
Alex Duan 已提交
259

A
Alex Duan 已提交
260 261 262
        # multi-args
        self.query_multi_args()       

A
Alex Duan 已提交
263
        # all type check null
A
Alex Duan 已提交
264
        for col_name, col_type in self.column_dict.items():             
A
Alex Duan 已提交
265 266 267
             fun_name = f"sf_null_{col_name}"
             sql = f'select {fun_name}({col_name}) from {self.stbname}'
             tdSql.query(sql)
A
Alex Duan 已提交
268 269 270 271 272 273 274 275
             if col_type != "timestamp":
                tdSql.checkData(0, 0, "None")
             else:
                val = tdSql.getData(0, 0)
                if val is not None:
                    tdLog.exit(f" check {sql} not expect None.")

        # concat
A
Alex Duan 已提交
276
        sql = f'select sf_concat_var(col12, t12), concat(col12, t12) from {self.stbname} limit 1000'
A
Alex Duan 已提交
277
        self.verify_same_value(sql)
A
Alex Duan 已提交
278
        sql = f'select sf_concat_nch(col13, t13), concat(col13, t13) from {self.stbname} limit 1000'
A
Alex Duan 已提交
279
        self.verify_same_value(sql)
A
Alex Duan 已提交
280

A
Alex Duan 已提交
281 282
    # create aggregate 
    def create_aggr_udfpy(self):
A
Alex Duan 已提交
283 284

        bufsize = 200 * 1024
A
Alex Duan 已提交
285
        # all type check null
A
Alex Duan 已提交
286
        for col_name, col_type in self.column_dict.items():
A
Alex Duan 已提交
287
             self.create_udf_af(f"af_null_{col_name}", "af_null.py", col_type, bufsize)
A
Alex Duan 已提交
288 289

        # min
A
Alex Duan 已提交
290 291
        file_name = "af_min.py"
        fun_name = "af_min_float"
A
Alex Duan 已提交
292
        self.create_udf_af(fun_name, file_name, f"float", bufsize)
A
Alex Duan 已提交
293
        fun_name = "af_min_int"
A
Alex Duan 已提交
294
        self.create_udf_af(fun_name, file_name, f"int", bufsize)
A
Alex Duan 已提交
295 296

        # sum
A
Alex Duan 已提交
297 298
        file_name = "af_sum.py"
        fun_name = "af_sum_float"
A
Alex Duan 已提交
299
        self.create_udf_af(fun_name, file_name, f"float", bufsize)
A
Alex Duan 已提交
300
        fun_name = "af_sum_int"
A
Alex Duan 已提交
301
        self.create_udf_af(fun_name, file_name, f"int", bufsize)
A
Alex Duan 已提交
302
        fun_name = "af_sum_bigint"
A
Alex Duan 已提交
303
        self.create_udf_af(fun_name, file_name, f"bigint", bufsize)
A
Alex Duan 已提交
304 305 306 307

        # count
        file_name = "af_count.py"
        fun_name = "af_count_float"
A
Alex Duan 已提交
308
        self.create_udf_af(fun_name, file_name, f"float", bufsize)
A
Alex Duan 已提交
309
        fun_name = "af_count_int"
A
Alex Duan 已提交
310
        self.create_udf_af(fun_name, file_name, f"int", bufsize)
A
Alex Duan 已提交
311
        fun_name = "af_count_bigint"
A
Alex Duan 已提交
312
        self.create_udf_af(fun_name, file_name, f"bigint", bufsize)
A
Alex Duan 已提交
313 314 315 316 317


    # query aggregate 
    def query_aggr_udfpy(self) :
        # all type check null
A
Alex Duan 已提交
318
        for col_name, col_type in self.column_dict.items():
A
Alex Duan 已提交
319
             fun_name = f"af_null_{col_name}"
A
Alex Duan 已提交
320
             sql = f'select {fun_name}({col_name}) from {self.stbname}'
A
Alex Duan 已提交
321
             tdSql.query(sql)
A
Alex Duan 已提交
322 323 324 325 326 327
             if col_type != "timestamp":
                tdSql.checkData(0, 0, "None")
             else:
                val = tdSql.getData(0, 0)
                if val is not None:
                    tdLog.exit(f" check {sql} not expect None.")
A
Alex Duan 已提交
328 329 330

        # min
        sql = f'select min(col3), af_min_int(col3) from {self.stbname}'
A
Alex Duan 已提交
331
        self.verify_same_value(sql)
A
Alex Duan 已提交
332
        sql = f'select min(col7), af_min_int(col7) from {self.stbname}'
A
Alex Duan 已提交
333
        self.verify_same_value(sql)
A
Alex Duan 已提交
334
        sql = f'select min(col9), af_min_float(col9) from {self.stbname}'
A
Alex Duan 已提交
335
        self.verify_same_value(sql)
A
Alex Duan 已提交
336 337

        # sum
A
Alex Duan 已提交
338
        sql = f'select sum(col1), af_sum_int(col1) from d0'
A
Alex Duan 已提交
339
        self.verify_same_value(sql)
A
Alex Duan 已提交
340
        sql = f'select sum(col3), af_sum_bigint(col3) from {self.stbname}'
A
Alex Duan 已提交
341
        self.verify_same_value(sql)
A
Alex Duan 已提交
342
        sql = f'select sum(col9), af_sum_float(col9) from {self.stbname}'
A
Alex Duan 已提交
343
        self.verify_same_value(sql)
A
Alex Duan 已提交
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365

        # count
        sql = f'select count(col1), af_count_int(col1) from {self.stbname}'
        self.verify_same_value(sql)
        sql = f'select count(col7), af_count_bigint(col7) from {self.stbname}'
        self.verify_same_value(sql)
        sql = f'select count(col8), af_count_float(col8) from {self.stbname}'
        self.verify_same_value(sql)

        # nest
        sql = f'select a+1000,b+1000 from (select count(col8) as a, af_count_float(col8) as b from {self.stbname})'
        self.verify_same_value(sql)
        # group by
        sql = f'select a+1000,b+1000 from (select count(col8) as a, af_count_float(col8) as b from {self.stbname} group by tbname)'
        self.verify_same_value(sql)
        # two filed expr
        sql = f'select sum(col1+col2),af_sum_float(col1+col2) from {self.stbname};'
        self.verify_same_value(sql)
        # interval
        sql = f'select af_sum_float(col2+col3),sum(col3+col2) from {self.stbname} interval(1s)'
        self.verify_same_value(sql)

A
Alex Duan 已提交
366 367 368 369
    
    # insert to child table d1 data
    def insert_data(self, tbname, rows):
        ts = 1670000000000
A
Alex Duan 已提交
370
        values = ""
A
Alex Duan 已提交
371
        batch_size = 500
A
Alex Duan 已提交
372
        child_name = ""
A
Alex Duan 已提交
373 374
        for i in range(self.child_count):
            for j in range(rows):
A
Alex Duan 已提交
375
                tj = j % 128
A
Alex Duan 已提交
376 377 378 379 380 381 382 383 384
                cols = f'{tj},{tj},{j},{j},{tj},{tj},{j},{j},{j}.000{j},{j}.000{j},true,"var{j}","nch{j}涛思数据codepage is utf_32_le"'
                value = f'({ts+j},{cols})' 
                if values == "":
                    values = value
                else:
                    values += f",{value}"
                if j % batch_size == 0 or j + 1 == rows:
                   sql = f'insert into {tbname}{i} values {values};' 
                   tdSql.execute(sql)
A
Alex Duan 已提交
385
                   tdLog.info(f" child table={i} rows={j} insert data.")
A
Alex Duan 已提交
386
                   values = ""
A
Alex Duan 已提交
387

A
Alex Duan 已提交
388 389 390 391 392 393 394 395 396
        # partial columns upate
        sql = f'insert into {tbname}0(ts, col1, col9, col11) values(now, 100, 200, 0)'
        tdSql.execute(sql)
        sql = f'insert into {tbname}0(ts, col2, col5, col8) values(now, 100, 200, 300)'
        tdSql.execute(sql)
        sql = f'insert into {tbname}0(ts, col3, col7, col13) values(now, null, null, null)'
        tdSql.execute(sql)        
        sql = f'insert into {tbname}0(ts) values(now)'
        tdSql.execute(sql)
A
Alex Duan 已提交
397
        tdLog.info(f" insert {rows} to child table {self.child_count} .")
A
Alex Duan 已提交
398

399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416
   
    # create stream
    def create_stream(self):
        sql = f"create stream ma  into sta subtable(concat('sta_',tbname)) \
            as select _wstart,count(col1),af_count_bigint(col1) from {self.stbname} partition by tbname interval(1s);"
        tdSql.execute(sql)
        tdLog.info(sql)

    #  query stream
    def verify_stream(self):
        sql = f"select * from sta limit 10"
        self.verify_same_value(sql, 1)

    # create tmq
    def create_tmq(self):
        sql = f"create topic topa as select concat(col12,t12),sf_concat_var(col12,t12) from {self.stbname};"   
        tdSql.execute(sql)
        tdLog.info(sql)
A
Alex Duan 已提交
417

A
Alex Duan 已提交
418 419 420 421 422 423 424
    def install_taospy(self):
        tdLog.info("install taospyudf...")
        packs = ["taospyudf"]
        for pack in packs:
            subprocess.check_call([sys.executable, '-m', 'pip', 'install', pack])
        tdLog.info("install taospyudf successfully.")

A
Alex Duan 已提交
425 426
    # run
    def run(self):
A
Alex Duan 已提交
427 428
        self.install_taospy()

A
Alex Duan 已提交
429 430 431
        # var
        stable = "meters"
        tbname = "d"
A
Alex Duan 已提交
432
        count = 10
433
        rows =  5000
A
Alex Duan 已提交
434 435 436
        # do 
        self.create_table(stable, tbname, count)

A
Alex Duan 已提交
437
        # create
A
Alex Duan 已提交
438 439
        self.create_scalar_udfpy()
        self.create_aggr_udfpy()
440 441 442 443 444 445 446 447 448 449

        # create stream
        self.create_stream()

        # create tmq
        self.create_tmq()

        # insert data
        self.insert_data(tbname, rows)

A
Alex Duan 已提交
450 451
        # query
        self.query_scalar_udfpy()
A
Alex Duan 已提交
452 453
        self.query_aggr_udfpy()

A
Alex Duan 已提交
454 455
        # show performance

A
Alex Duan 已提交
456 457 458 459 460 461 462

    def stop(self):
        tdSql.close()
        tdLog.success("%s successfully executed" % __file__)

tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())