udfpy_main.py 14.6 KB
Newer Older
A
Alex Duan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-


from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *

import random
import os


A
Alex Duan 已提交
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
class PerfDB:
    def __init__(self):
        self.sqls = []
        self.spends = []

    # execute
    def execute(self, sql):
        print(f"  perfdb execute {sql}")
        stime = time.time()
        ret = tdSql.execute(sql, 1)
        spend = time.time() - stime

        self.sqls.append(sql)
        self.spends.append(spend)
        return ret

    # query
    def query(self, sql):
        print(f"  perfdb query {sql}")
        start = time.time()
        ret = tdSql.query(sql, None, 1)
        spend = time.time() - start
        self.sqls.append(sql)
        self.spends.append(spend)
        return ret
    

A
Alex Duan 已提交
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
class TDTestCase:
    def init(self, conn, logSql, replicaVar=1):
        self.replicaVar = int(replicaVar)
        tdLog.debug("start to execute %s" % __file__)
        tdSql.init(conn.cursor())
        self.setsql = TDSetSql()

        # udf path
        self.udf_path = os.path.dirname(os.path.realpath(__file__)) + "/udfpy"


        self.column_dict = {
            'ts': 'timestamp',
            'col1': 'tinyint',
            'col2': 'smallint',
            'col3': 'int',
            'col4': 'bigint',
            'col5': 'tinyint unsigned',
            'col6': 'smallint unsigned',
            'col7': 'int unsigned',
            'col8': 'bigint unsigned',
            'col9': 'float',
            'col10': 'double',
            'col11': 'bool',
A
Alex Duan 已提交
76
            'col12': 'varchar(120)',
A
Alex Duan 已提交
77
            'col13': 'nchar(100)',
A
Alex Duan 已提交
78 79 80 81 82 83 84 85 86 87 88 89 90
        }
        self.tag_dict = {
            't1': 'tinyint',
            't2': 'smallint',
            't3': 'int',
            't4': 'bigint',
            't5': 'tinyint unsigned',
            't6': 'smallint unsigned',
            't7': 'int unsigned',
            't8': 'bigint unsigned',
            't9': 'float',
            't10': 'double',
            't11': 'bool',
A
Alex Duan 已提交
91
            't12': 'varchar(120)',
A
Alex Duan 已提交
92
            't13': 'nchar(100)',         
A
Alex Duan 已提交
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
        }

    def set_stb_sql(self,stbname,column_dict,tag_dict):
        column_sql = ''
        tag_sql = ''
        for k,v in column_dict.items():
            column_sql += f"{k} {v}, "
        for k,v in tag_dict.items():
            tag_sql += f"{k} {v}, "
        create_stb_sql = f'create stable {stbname} ({column_sql[:-2]}) tags ({tag_sql[:-2]})'
        return create_stb_sql
    
    # create stable and child tables
    def create_table(self, stbname, tbname, count):
        tdSql.prepare()
        tdSql.execute('use db')
        self.child_count = count
        self.stbname = stbname
        self.tbname  = tbname
        
        # create stable
        create_table_sql = self.set_stb_sql(stbname, self.column_dict, self.tag_dict)
        tdSql.execute(create_table_sql)

A
Alex Duan 已提交
117
        batch_size = 1000
A
Alex Duan 已提交
118 119 120
        # create child table
        for i in range(count):
            ti = i % 128
A
Alex Duan 已提交
121
            tags = f'{ti},{ti},{i},{i},{ti},{ti},{i},{i},{i}.000{i},{i}.000{i},true,"var{i}","nch{i}"'
A
Alex Duan 已提交
122 123 124 125
            sql  = f'create table {tbname}{i} using {stbname} tags({tags});'
            tdSql.execute(sql)            
            if i % batch_size == 0:
               tdLog.info(f" create child table {i} ...")
A
Alex Duan 已提交
126 127 128

        tdLog.info(f" create {count} child tables ok.")

A
Alex Duan 已提交
129 130 131 132
    # create with dicts
    def create_sf_dicts(self, dicts, filename):
        for fun_name, out_type in dicts.items():
            sql = f' create function {fun_name} as "{self.udf_path}/{filename}" outputtype {out_type} language "Python" '
A
Alex Duan 已提交
133
            tdSql.execute(sql)
A
Alex Duan 已提交
134
            tdLog.info(sql)
A
Alex Duan 已提交
135 136

    # create_udfpy_function
A
Alex Duan 已提交
137
    def create_scalar_udfpy(self):
A
Alex Duan 已提交
138 139
        # scalar funciton
        self.scalar_funs = {
A
Alex Duan 已提交
140
            'sf0': 'timestamp',    
A
Alex Duan 已提交
141 142 143 144 145 146 147 148 149 150 151
            'sf1': 'tinyint',
            'sf2': 'smallint',
            'sf3': 'int',
            'sf4': 'bigint',
            'sf5': 'tinyint unsigned',
            'sf6': 'smallint unsigned',
            'sf7': 'int unsigned',
            'sf8': 'bigint unsigned',
            'sf9': 'float',
            'sf10': 'double',
            'sf11': 'bool',
A
Alex Duan 已提交
152
            'sf12': 'varchar(120)',
A
Alex Duan 已提交
153
            'sf13': 'nchar(100)'
A
Alex Duan 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167
        }
        # agg function
        self.agg_funs = {
            'af1': 'tinyint',
            'af2': 'smallint',
            'af3': 'int',
            'af4': 'bigint',
            'af5': 'tinyint unsigned',
            'af6': 'smallint unsigned',
            'af7': 'int unsigned',
            'af8': 'bigint unsigned',
            'af9': 'float',
            'af10': 'double',
            'af11': 'bool',
A
Alex Duan 已提交
168
            'af12': 'varchar(120)',
A
Alex Duan 已提交
169
            'af13': 'nchar(100)',
A
Alex Duan 已提交
170 171 172
            'af14': 'timestamp'            
        }

A
Alex Duan 已提交
173
        # multi_args
A
Alex Duan 已提交
174
        self.create_sf_dicts(self.scalar_funs, "sf_origin.py")
A
Alex Duan 已提交
175 176 177 178 179 180 181
        fun_name = "sf_multi_args"
        self.create_udf_sf(fun_name, f'{fun_name}.py', "binary(1024)")

        # all type check null
        for col_name, col_type in self.column_dict.items():
             self.create_udf_sf(f"sf_null_{col_name}", "sf_null.py", col_type)

A
Alex Duan 已提交
182 183 184 185 186 187
        # concat
        fun_name = "sf_concat_var"
        self.create_udf_sf(fun_name, f'{fun_name}.py', "varchar(1024)")
        fun_name = "sf_concat_nch"
        self.create_udf_sf(fun_name, f'{fun_name}.py', "nchar(1024)")
             
A
Alex Duan 已提交
188

A
Alex Duan 已提交
189
    # fun_name == fun_name.py
A
Alex Duan 已提交
190 191
    def create_udf_sf(self, fun_name, file_name, out_type):
        sql = f'create function {fun_name} as "{self.udf_path}/{file_name}" outputtype {out_type} language "Python" '
A
Alex Duan 已提交
192
        tdSql.execute(sql)
A
Alex Duan 已提交
193
        tdLog.info(sql)
A
Alex Duan 已提交
194

A
Alex Duan 已提交
195 196
    def create_udf_af(self, fun_name, file_name, out_type, bufsize):
        sql = f'create aggregate function {fun_name} as "{self.udf_path}/{file_name}" outputtype {out_type} bufsize {bufsize} language "Python" '
A
Alex Duan 已提交
197
        tdSql.execute(sql)
A
Alex Duan 已提交
198
        tdLog.info(sql)
A
Alex Duan 已提交
199 200 201 202 203 204 205 206 207 208 209 210 211


    # sql1 query result eual with sql2
    def verify_same_result(self, sql1, sql2):
        # query
        result1 = tdSql.getResult(sql1)
        tdSql.query(sql2)
        
        for i, row in enumerate(result1):
            for j , val in enumerate(row):
                tdSql.checkData(i, j, result1[i][j])

    # same value like select col1, udf_fun1(col1) from st
A
Alex Duan 已提交
212
    def verify_same_value(self, sql):
A
Alex Duan 已提交
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
        tdSql.query(sql)
        nrows = tdSql.getRows()
        for i in range(nrows):
            val = tdSql.getData(i, 0)
            tdSql.checkData(i, 1, val)

    # verify multi values
    def verify_same_multi_values(self, sql):
        tdSql.query(sql)
        nrows = tdSql.getRows()
        for i in range(nrows):
            udf_val = tdSql.getData(i, 0)
            vals = udf_val.split(',')
            for j,val in enumerate(vals, 1):
                tdSql.checkData(i, j, val)
            
    # query multi-args
    def query_multi_args(self):   
A
Alex Duan 已提交
231 232 233
        cols = list(self.column_dict.keys()) + list(self.tag_dict.keys())
        cols.remove("col13")
        cols.remove("t13")
A
Alex Duan 已提交
234
        cols.remove("ts")
A
Alex Duan 已提交
235
        ncols = len(cols)
A
Alex Duan 已提交
236
        print(cols)
A
Alex Duan 已提交
237
        for i in range(2, ncols):
A
Alex Duan 已提交
238 239
            sample = random.sample(cols, i)
            print(sample)
A
Alex Duan 已提交
240
            cols_name = ','.join(sample)
A
Alex Duan 已提交
241
            sql = f'select sf_multi_args({cols_name}),{cols_name} from {self.stbname} limit 10'
A
Alex Duan 已提交
242
            self.verify_same_multi_values(sql)
A
Alex Duan 已提交
243
            tdLog.info(sql)
A
Alex Duan 已提交
244 245 246 247 248

    
    # query_udfpy
    def query_scalar_udfpy(self):
        # col
A
Alex Duan 已提交
249 250
        for col_name, col_type in self.column_dict.items():
           for fun_name, out_type in self.scalar_funs.items():
A
Alex Duan 已提交
251
               if col_type == out_type :                   
A
Alex Duan 已提交
252
                    sql = f'select {col_name}, {fun_name}({col_name}) from {self.stbname} limit 10'
A
Alex Duan 已提交
253 254
                    tdLog.info(sql)
                    self.verify_same_value(sql)
A
Alex Duan 已提交
255
                    sql = f'select * from (select {col_name} as a, {fun_name}({col_name}) as b from {self.stbname}  limit 100) order by b,a desc'
A
Alex Duan 已提交
256 257
                    tdLog.info(sql)
                    self.verify_same_value(sql)
A
Alex Duan 已提交
258

A
Alex Duan 已提交
259 260 261
        # multi-args
        self.query_multi_args()       

A
Alex Duan 已提交
262
        # all type check null
A
Alex Duan 已提交
263
        for col_name, col_type in self.column_dict.items():             
A
Alex Duan 已提交
264 265 266
             fun_name = f"sf_null_{col_name}"
             sql = f'select {fun_name}({col_name}) from {self.stbname}'
             tdSql.query(sql)
A
Alex Duan 已提交
267 268 269 270 271 272 273 274
             if col_type != "timestamp":
                tdSql.checkData(0, 0, "None")
             else:
                val = tdSql.getData(0, 0)
                if val is not None:
                    tdLog.exit(f" check {sql} not expect None.")

        # concat
A
Alex Duan 已提交
275
        sql = f'select sf_concat_var(col12, t12), concat(col12, t12) from {self.stbname} limit 1000'
A
Alex Duan 已提交
276
        self.verify_same_value(sql)
A
Alex Duan 已提交
277
        sql = f'select sf_concat_nch(col13, t13), concat(col13, t13) from {self.stbname} limit 1000'
A
Alex Duan 已提交
278
        self.verify_same_value(sql)
A
Alex Duan 已提交
279

A
Alex Duan 已提交
280 281
    # create aggregate 
    def create_aggr_udfpy(self):
A
Alex Duan 已提交
282 283

        bufsize = 200 * 1024
A
Alex Duan 已提交
284
        # all type check null
A
Alex Duan 已提交
285
        for col_name, col_type in self.column_dict.items():
A
Alex Duan 已提交
286
             self.create_udf_af(f"af_null_{col_name}", "af_null.py", col_type, bufsize)
A
Alex Duan 已提交
287 288

        # min
A
Alex Duan 已提交
289 290
        file_name = "af_min.py"
        fun_name = "af_min_float"
A
Alex Duan 已提交
291
        self.create_udf_af(fun_name, file_name, f"float", bufsize)
A
Alex Duan 已提交
292
        fun_name = "af_min_int"
A
Alex Duan 已提交
293
        self.create_udf_af(fun_name, file_name, f"int", bufsize)
A
Alex Duan 已提交
294 295

        # sum
A
Alex Duan 已提交
296 297
        file_name = "af_sum.py"
        fun_name = "af_sum_float"
A
Alex Duan 已提交
298
        self.create_udf_af(fun_name, file_name, f"float", bufsize)
A
Alex Duan 已提交
299
        fun_name = "af_sum_int"
A
Alex Duan 已提交
300
        self.create_udf_af(fun_name, file_name, f"int", bufsize)
A
Alex Duan 已提交
301
        fun_name = "af_sum_bigint"
A
Alex Duan 已提交
302
        self.create_udf_af(fun_name, file_name, f"bigint", bufsize)
A
Alex Duan 已提交
303 304 305 306

        # count
        file_name = "af_count.py"
        fun_name = "af_count_float"
A
Alex Duan 已提交
307
        self.create_udf_af(fun_name, file_name, f"float", bufsize)
A
Alex Duan 已提交
308
        fun_name = "af_count_int"
A
Alex Duan 已提交
309
        self.create_udf_af(fun_name, file_name, f"int", bufsize)
A
Alex Duan 已提交
310
        fun_name = "af_count_bigint"
A
Alex Duan 已提交
311
        self.create_udf_af(fun_name, file_name, f"bigint", bufsize)
A
Alex Duan 已提交
312 313 314 315 316


    # query aggregate 
    def query_aggr_udfpy(self) :
        # all type check null
A
Alex Duan 已提交
317
        for col_name, col_type in self.column_dict.items():
A
Alex Duan 已提交
318
             fun_name = f"af_null_{col_name}"
A
Alex Duan 已提交
319
             sql = f'select {fun_name}({col_name}) from {self.stbname}'
A
Alex Duan 已提交
320
             tdSql.query(sql)
A
Alex Duan 已提交
321 322 323 324 325 326
             if col_type != "timestamp":
                tdSql.checkData(0, 0, "None")
             else:
                val = tdSql.getData(0, 0)
                if val is not None:
                    tdLog.exit(f" check {sql} not expect None.")
A
Alex Duan 已提交
327 328 329

        # min
        sql = f'select min(col3), af_min_int(col3) from {self.stbname}'
A
Alex Duan 已提交
330
        self.verify_same_value(sql)
A
Alex Duan 已提交
331
        sql = f'select min(col7), af_min_int(col7) from {self.stbname}'
A
Alex Duan 已提交
332
        self.verify_same_value(sql)
A
Alex Duan 已提交
333
        sql = f'select min(col9), af_min_float(col9) from {self.stbname}'
A
Alex Duan 已提交
334
        self.verify_same_value(sql)
A
Alex Duan 已提交
335 336

        # sum
A
Alex Duan 已提交
337
        sql = f'select sum(col1), af_sum_int(col1) from d0'
A
Alex Duan 已提交
338
        self.verify_same_value(sql)
A
Alex Duan 已提交
339
        sql = f'select sum(col3), af_sum_bigint(col3) from {self.stbname}'
A
Alex Duan 已提交
340
        self.verify_same_value(sql)
A
Alex Duan 已提交
341
        sql = f'select sum(col9), af_sum_float(col9) from {self.stbname}'
A
Alex Duan 已提交
342
        self.verify_same_value(sql)
A
Alex Duan 已提交
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364

        # count
        sql = f'select count(col1), af_count_int(col1) from {self.stbname}'
        self.verify_same_value(sql)
        sql = f'select count(col7), af_count_bigint(col7) from {self.stbname}'
        self.verify_same_value(sql)
        sql = f'select count(col8), af_count_float(col8) from {self.stbname}'
        self.verify_same_value(sql)

        # nest
        sql = f'select a+1000,b+1000 from (select count(col8) as a, af_count_float(col8) as b from {self.stbname})'
        self.verify_same_value(sql)
        # group by
        sql = f'select a+1000,b+1000 from (select count(col8) as a, af_count_float(col8) as b from {self.stbname} group by tbname)'
        self.verify_same_value(sql)
        # two filed expr
        sql = f'select sum(col1+col2),af_sum_float(col1+col2) from {self.stbname};'
        self.verify_same_value(sql)
        # interval
        sql = f'select af_sum_float(col2+col3),sum(col3+col2) from {self.stbname} interval(1s)'
        self.verify_same_value(sql)

A
Alex Duan 已提交
365 366 367 368
    
    # insert to child table d1 data
    def insert_data(self, tbname, rows):
        ts = 1670000000000
A
Alex Duan 已提交
369
        values = ""
A
Alex Duan 已提交
370
        batch_size = 500
A
Alex Duan 已提交
371
        child_name = ""
A
Alex Duan 已提交
372 373
        for i in range(self.child_count):
            for j in range(rows):
A
Alex Duan 已提交
374
                tj = j % 128
A
Alex Duan 已提交
375 376 377 378 379 380 381 382 383
                cols = f'{tj},{tj},{j},{j},{tj},{tj},{j},{j},{j}.000{j},{j}.000{j},true,"var{j}","nch{j}涛思数据codepage is utf_32_le"'
                value = f'({ts+j},{cols})' 
                if values == "":
                    values = value
                else:
                    values += f",{value}"
                if j % batch_size == 0 or j + 1 == rows:
                   sql = f'insert into {tbname}{i} values {values};' 
                   tdSql.execute(sql)
A
Alex Duan 已提交
384
                   tdLog.info(f" child table={i} rows={j} insert data.")
A
Alex Duan 已提交
385
                   values = ""
A
Alex Duan 已提交
386

A
Alex Duan 已提交
387 388 389 390 391 392 393 394 395
        # partial columns upate
        sql = f'insert into {tbname}0(ts, col1, col9, col11) values(now, 100, 200, 0)'
        tdSql.execute(sql)
        sql = f'insert into {tbname}0(ts, col2, col5, col8) values(now, 100, 200, 300)'
        tdSql.execute(sql)
        sql = f'insert into {tbname}0(ts, col3, col7, col13) values(now, null, null, null)'
        tdSql.execute(sql)        
        sql = f'insert into {tbname}0(ts) values(now)'
        tdSql.execute(sql)
A
Alex Duan 已提交
396
        tdLog.info(f" insert {rows} to child table {self.child_count} .")
A
Alex Duan 已提交
397 398 399 400 401 402 403


    # run
    def run(self):
        # var
        stable = "meters"
        tbname = "d"
A
Alex Duan 已提交
404 405
        count = 10
        rows =  50000
A
Alex Duan 已提交
406 407
        # do 
        self.create_table(stable, tbname, count)
A
Alex Duan 已提交
408
        self.insert_data(tbname, rows)
A
Alex Duan 已提交
409

A
Alex Duan 已提交
410
        # create
A
Alex Duan 已提交
411 412
        self.create_scalar_udfpy()
        self.create_aggr_udfpy()
A
Alex Duan 已提交
413 414
        # query
        self.query_scalar_udfpy()
A
Alex Duan 已提交
415 416
        self.query_aggr_udfpy()

A
Alex Duan 已提交
417 418
        # show performance

A
Alex Duan 已提交
419 420 421 422 423 424 425

    def stop(self):
        tdSql.close()
        tdLog.success("%s successfully executed" % __file__)

tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())