twa.py 6.5 KB
Newer Older
W
wenzhouwww@live.cn 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
import random ,os ,sys
import platform
import math

class TDTestCase:
    updatecfgDict = {'debugFlag': 143 ,"cDebugFlag":143,"uDebugFlag":143 ,"rpcDebugFlag":143 , "tmrDebugFlag":143 , 
    "jniDebugFlag":143 ,"simDebugFlag":143,"dDebugFlag":143, "dDebugFlag":143,"vDebugFlag":143,"mDebugFlag":143,"qDebugFlag":143,
    "wDebugFlag":143,"sDebugFlag":143,"tsdbDebugFlag":143,"tqDebugFlag":143 ,"fsDebugFlag":143 ,"fnDebugFlag":143,
    "maxTablesPerVnode":2 ,"minTablesPerVnode":2,"tableIncStepPerVnode":2 }

    def init(self, conn, logSql):
        tdLog.debug("start to execute %s" % __file__)
        tdSql.init(conn.cursor())
        self.vnode_disbutes = None
        self.ts = 1537146000000
        self.tb_nums = 20
        self.row_nums = 100
        self.time_step = 1000


    def check_stddev_functions(self, tbname , col_name):

        stddev_sql = f"select stddev({col_name}) from {tbname};"

        same_sql = f"select {col_name} from {tbname} where {col_name} is not null "
   
        tdSql.query(same_sql)
        pre_data = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None]
        if (platform.system().lower() == 'windows' and pre_data.dtype == 'int32'):
            pre_data = np.array(pre_data, dtype = 'int64')
        pre_avg = np.sum(pre_data)/len(pre_data)

        # Calculate variance
        stddev_result = 0 
        for num in tdSql.queryResult:
            stddev_result += (num-pre_avg)*(num-pre_avg)/len(tdSql.queryResult)

        stddev_result = math.sqrt(stddev_result)

        tdSql.query(stddev_sql)
        
        if -0.0001 < tdSql.queryResult[0][0]-stddev_result < 0.0001:
            tdLog.info(" sql:%s; row:0 col:0 data:%d , expect:%d"%(stddev_sql,tdSql.queryResult[0][0],stddev_result))
        else:
            tdLog.exit(" sql:%s; row:0 col:0 data:%d , expect:%d"%(stddev_sql,tdSql.queryResult[0][0],stddev_result))

    def prepare_datas_of_distribute(self):
        
        # prepate datas for  20 tables distributed at different vgroups
        tdSql.execute("create database if not exists testdb keep 3650 duration 1000 vgroups 5")
        tdSql.execute(" use testdb ")
        tdSql.execute(
            '''create table stb1
            (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
            tags (t0 timestamp, t1 int, t2 bigint, t3 smallint, t4 tinyint, t5 float, t6 double, t7 bool, t8 binary(16),t9 nchar(32))
            '''
        )
        
        for i in range(self.tb_nums):
            tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
            ts = self.ts
            for j in range(self.row_nums):
                ts+=j*self.time_step 
                tdSql.execute(
                    f"insert into ct{i+1} values({ts}, 1, 11111, 111, 1, 1.11, 11.11, 2, 'binary{j}', 'nchar{j}', now()+{1*j}a )"
                )
           
        tdSql.execute("insert into ct1 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute("insert into ct1 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute("insert into ct1 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL  ) ")

        tdLog.info(" prepare data for distributed_aggregate done! ")

    def check_distribute_datas(self):
        # get vgroup_ids of all
        tdSql.query("show vgroups ")
        vgroups = tdSql.queryResult

        vnode_tables={}
        
        for vgroup_id in vgroups:
            vnode_tables[vgroup_id[0]]=[]
        

        # check sub_table of per vnode ,make sure sub_table has been distributed
        tdSql.query("show tables like 'ct%'")
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
            vnode_tables[table_name[6]].append(table_name[0]) 
        self.vnode_disbutes = vnode_tables

        count = 0
        for k ,v in vnode_tables.items():
            if len(v)>=2:
                count+=1
        if count < 2:
            tdLog.exit(" the datas of all not satisfy sub_table has been distributed ")

    def distribute_twa_query(self):
        # basic filter
        tdSql.query(" select twa(c1) from ct1  ")
        tdSql.checkData(0,0,1.000000000)

        tdSql.query(" select twa(c1) from stb1 partition by tbname  ")
        tdSql.checkRows(self.tb_nums)
        tdSql.checkData(0,0,1.000000000)

        tdSql.query(" select twa(c2) from stb1 group by tbname ")
        tdSql.checkRows(self.tb_nums)
        tdSql.checkData(0,0,11111.000000000)

        tdSql.query("select twa(c1+c2) from stb1 partition by tbname ")
        tdSql.checkData(0,0,11112.000000000)

        tdSql.query("select twa(c1) from stb1 partition by t1")
        tdSql.checkRows(self.tb_nums)
        tdSql.checkData(0,0,1.000000000)

        # union all 
        tdSql.query(" select twa(c1) from stb1 partition by tbname union all select twa(c1) from stb1 partition by tbname ")
        tdSql.checkRows(40)
        tdSql.checkData(0,0,1.000000000)

        # join 

        tdSql.execute(" create database if not exists db ")
        tdSql.execute(" use db ")
        tdSql.execute(" create stable st (ts timestamp , c1 int ,c2 float) tags(t1 int) ")
        tdSql.execute(" create table tb1 using st tags(1) ")
        tdSql.execute(" create table tb2 using st tags(2) ")

        
        for i in range(10):
            ts = i*10 + self.ts
            tdSql.execute(f" insert into tb1 values({ts},{i},{i}.0)")
            tdSql.execute(f" insert into tb2 values({ts},{i},{i}.0)")

        tdSql.query(" select twa(tb1.c1), twa(tb2.c2) from tb1, tb2 where tb1.ts=tb2.ts ")
        tdSql.checkRows(1)
        tdSql.checkData(0,0,4.500000000)
        tdSql.checkData(0,1,4.500000000)

        # group by 
        tdSql.execute(" use testdb ")

        # mixup with other functions
        tdSql.query(" select twa(c1),twa(c2),max(c1),elapsed(ts) from stb1 ")
        tdSql.checkData(0,0,1.000000000)
        tdSql.checkData(0,1,11111.000000000)
        tdSql.checkData(0,2,1)


    def run(self):

        self.prepare_datas_of_distribute()
        self.check_distribute_datas()
        self.distribute_twa_query()

    
    def stop(self):
        tdSql.close()
        tdLog.success("%s successfully executed" % __file__)

tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())