distribute_agg_avg.py 10.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
import random ,os ,sys
import platform


class TDTestCase:

C
cpwu 已提交
11
    updatecfgDict = {"maxTablesPerVnode":2 ,"minTablesPerVnode":2,"tableIncStepPerVnode":2 }
12 13 14 15 16 17 18 19 20 21 22 23
    def init(self, conn, logSql):
        tdLog.debug("start to execute %s" % __file__)
        tdSql.init(conn.cursor())
        self.vnode_disbutes = None
        self.ts = 1537146000000


    def check_avg_functions(self, tbname , col_name):

        avg_sql = f"select avg({col_name}) from {tbname};"

        same_sql = f"select {col_name} from {tbname} where {col_name} is not null "
C
cpwu 已提交
24

25 26 27 28 29 30 31 32 33
        tdSql.query(same_sql)
        pre_data = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None]
        if (platform.system().lower() == 'windows' and pre_data.dtype == 'int32'):
            pre_data = np.array(pre_data, dtype = 'int64')
        pre_avg = np.sum(pre_data)/len(pre_data)

        tdSql.query(avg_sql)
        tdSql.checkData(0,0,pre_avg)

C
cpwu 已提交
34 35
    def prepare_datas_of_distribute(self, dbname="testdb"):

36
        # prepate datas for  20 tables distributed at different vgroups
C
cpwu 已提交
37 38
        tdSql.execute(f"create database if not exists {dbname} keep 3650 duration 1000 vgroups 5")
        tdSql.execute(f" use {dbname} ")
39
        tdSql.execute(
C
cpwu 已提交
40
            f'''create table {dbname}.stb1
41 42 43 44 45 46
            (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
            tags (t0 timestamp, t1 int, t2 bigint, t3 smallint, t4 tinyint, t5 float, t6 double, t7 bool, t8 binary(16),t9 nchar(32))
            '''
        )

        for i in range(20):
C
cpwu 已提交
47
            tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
48 49 50

        for i in range(9):
            tdSql.execute(
C
cpwu 已提交
51
                f"insert into {dbname}.ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
52 53
            )
            tdSql.execute(
C
cpwu 已提交
54
                f"insert into {dbname}.ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
55 56 57 58 59 60
            )

        for i in range(1,21):
            if i ==1 or i == 4:
                continue
            else:
C
cpwu 已提交
61
                tbname = f"{dbname}.ct{i}"
62 63 64 65
                for j in range(9):
                    tdSql.execute(
                f"insert into {tbname} values ( now()-{(i+j)*10}s, {1*(j+i)}, {11111*(j+i)}, {111*(j+i)}, {11*(j)}, {1.11*(j+i)}, {11.11*(j+i)}, {(j+i)%2}, 'binary{j}', 'nchar{j}', now()+{1*j}a )"
            )
C
cpwu 已提交
66 67 68 69
        tdSql.execute(f"insert into {dbname}.ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
70

C
cpwu 已提交
71 72 73
        tdSql.execute(f"insert into {dbname}.ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL  ) ")
74 75 76

        tdLog.info(" prepare data for distributed_aggregate done! ")

C
cpwu 已提交
77
    def check_distribute_datas(self, dbname="testdb"):
78
        # get vgroup_ids of all
C
cpwu 已提交
79
        tdSql.query(f"show {dbname}.vgroups ")
80 81 82
        vgroups = tdSql.queryResult

        vnode_tables={}
C
cpwu 已提交
83

84 85
        for vgroup_id in vgroups:
            vnode_tables[vgroup_id[0]]=[]
C
cpwu 已提交
86

87 88

        # check sub_table of per vnode ,make sure sub_table has been distributed
C
cpwu 已提交
89
        tdSql.query(f"show {dbname}.tables like 'ct%'")
90 91 92
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
C
cpwu 已提交
93
            vnode_tables[table_name[6]].append(table_name[0])
94 95 96 97 98 99 100 101 102
        self.vnode_disbutes = vnode_tables

        count = 0
        for k ,v in vnode_tables.items():
            if len(v)>=2:
                count+=1
        if count < 2:
            tdLog.exit(" the datas of all not satisfy sub_table has been distributed ")

C
cpwu 已提交
103 104
    def check_avg_distribute_diff_vnode(self,col_name, dbname="testdb"):

105 106 107 108
        vgroup_ids = []
        for k ,v in self.vnode_disbutes.items():
            if len(v)>=2:
                vgroup_ids.append(k)
C
cpwu 已提交
109

110
        distribute_tbnames = []
C
cpwu 已提交
111

112 113 114 115 116 117 118 119 120
        for vgroup_id in vgroup_ids:
            vnode_tables = self.vnode_disbutes[vgroup_id]
            distribute_tbnames.append(random.sample(vnode_tables,1)[0])
        tbname_ins = ""
        for tbname in distribute_tbnames:
            tbname_ins += "'%s' ,"%tbname

        tbname_filters = tbname_ins[:-1]

C
cpwu 已提交
121 122 123
        avg_sql = f"select avg({col_name}) from {dbname}.stb1 where tbname in ({tbname_filters});"

        same_sql = f"select {col_name}  from {dbname}.stb1 where tbname in ({tbname_filters}) and {col_name} is not null "
124 125 126 127 128 129 130 131 132 133

        tdSql.query(same_sql)
        pre_data = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None]
        if (platform.system().lower() == 'windows' and pre_data.dtype == 'int32'):
            pre_data = np.array(pre_data, dtype = 'int64')
        pre_avg = np.sum(pre_data)/len(pre_data)

        tdSql.query(avg_sql)
        tdSql.checkData(0,0,pre_avg)

C
cpwu 已提交
134 135 136 137
    def check_avg_status(self, dbname="testdb"):
        # check max function work status

        tdSql.query(f"show {dbname}.tables like 'ct%'")
138 139 140
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
C
cpwu 已提交
141
            tablenames.append(f"{dbname}.{table_name[0]}")
142

C
cpwu 已提交
143
        tdSql.query(f"desc {dbname}.stb1")
144
        col_names = tdSql.queryResult
C
cpwu 已提交
145

146 147 148 149
        colnames = []
        for col_name in col_names:
            if col_name[1] in ["INT" ,"BIGINT" ,"SMALLINT" ,"TINYINT" , "FLOAT" ,"DOUBLE"]:
                colnames.append(col_name[0])
C
cpwu 已提交
150

151 152 153 154
        for tablename in tablenames:
            for colname in colnames:
                self.check_avg_functions(tablename,colname)

C
cpwu 已提交
155
        # check max function for different vnode
156 157 158

        for colname in colnames:
            if colname.startswith("c"):
C
cpwu 已提交
159
                self.check_avg_distribute_diff_vnode(colname, dbname)
160
            else:
C
cpwu 已提交
161
                # self.check_avg_distribute_diff_vnode(colname, dbname) # bug for tag
162 163
                pass

C
cpwu 已提交
164 165

    def distribute_agg_query(self, dbname="testdb"):
166
        # basic filter
C
cpwu 已提交
167
        tdSql.query(f"select avg(c1) from {dbname}.stb1 ")
168 169
        tdSql.checkData(0,0,14.086956522)

C
cpwu 已提交
170
        tdSql.query(f"select avg(a) from (select avg(c1) a  from {dbname}.stb1 partition by tbname) ")
171 172
        tdSql.checkData(0,0,14.292307692)

C
cpwu 已提交
173
        tdSql.query(f"select avg(c1) from {dbname}.stb1 where t1=1")
174 175
        tdSql.checkData(0,0,6.000000000)

C
cpwu 已提交
176
        tdSql.query(f"select avg(c1+c2) from {dbname}.stb1 where c1 =1 ")
177 178
        tdSql.checkData(0,0,11112.000000000)

C
cpwu 已提交
179
        tdSql.query(f"select avg(c1) from {dbname}.stb1 where tbname=\"ct2\"")
180 181
        tdSql.checkData(0,0,6.000000000)

C
cpwu 已提交
182
        tdSql.query(f"select avg(c1) from {dbname}.stb1 partition by tbname")
183 184
        tdSql.checkRows(20)

C
cpwu 已提交
185
        tdSql.query(f"select avg(c1) from {dbname}.stb1 where t1> 4  partition by tbname")
186 187
        tdSql.checkRows(15)

C
cpwu 已提交
188 189
        # union all
        tdSql.query(f"select avg(c1) from {dbname}.stb1 union all select avg(c1) from {dbname}.stb1 ")
190 191 192
        tdSql.checkRows(2)
        tdSql.checkData(0,0,14.086956522)

C
cpwu 已提交
193
        tdSql.query(f"select avg(a) from (select avg(c1) a from {dbname}.stb1 union all select avg(c1) a  from {dbname}.stb1)")
194 195 196
        tdSql.checkRows(1)
        tdSql.checkData(0,0,14.086956522)

C
cpwu 已提交
197
        # join
198 199 200

        tdSql.execute(" create database if not exists db ")
        tdSql.execute(" use db ")
C
cpwu 已提交
201 202 203 204
        tdSql.execute(" create stable db.st (ts timestamp , c1 int ,c2 float) tags(t1 int) ")
        tdSql.execute(" create table db.tb1 using db.st tags(1) ")
        tdSql.execute(" create table db.tb2 using db.st tags(2) ")

205 206 207

        for i in range(10):
            ts = i*10 + self.ts
C
cpwu 已提交
208 209
            tdSql.execute(f" insert into db.tb1 values({ts},{i},{i}.0)")
            tdSql.execute(f" insert into db.tb2 values({ts},{i},{i}.0)")
210

C
cpwu 已提交
211
        tdSql.query(f"select avg(tb1.c1), avg(tb2.c2) from db.tb1 tb1, db.tb2 tb2 where tb1.ts=tb2.ts")
212 213 214 215
        tdSql.checkRows(1)
        tdSql.checkData(0,0,4.500000000)
        tdSql.checkData(0,1,4.500000000)

C
cpwu 已提交
216 217
        # group by
        tdSql.execute(f" use {dbname} ")
218 219

        # partition by tbname or partition by tag
C
cpwu 已提交
220
        tdSql.query(f"select avg(c1) from {dbname}.stb1 partition by tbname")
221 222 223
        tdSql.checkRows(20)

        # nest query for support max
C
cpwu 已提交
224
        tdSql.query(f"select avg(c2+2)+1 from (select avg(c1) c2  from {dbname}.stb1)")
225
        tdSql.checkData(0,0,17.086956522)
C
cpwu 已提交
226
        tdSql.query(f"select avg(c1+2)  as c2 from (select ts ,c1 ,c2  from {dbname}.stb1)")
227
        tdSql.checkData(0,0,16.086956522)
C
cpwu 已提交
228
        tdSql.query(f"select avg(a+2)  as c2 from (select ts ,abs(c1) a ,c2  from {dbname}.stb1)")
229 230 231
        tdSql.checkData(0,0,16.086956522)

        # mixup with other functions
C
cpwu 已提交
232
        tdSql.query(f"select max(c1),count(c1),last(c2,c3),sum(c1+c2),avg(c1) from {dbname}.stb1")
233 234 235 236 237 238 239 240 241 242 243 244 245 246
        tdSql.checkData(0,0,28)
        tdSql.checkData(0,1,184)
        tdSql.checkData(0,2,-99999)
        tdSql.checkData(0,3,-999)
        tdSql.checkData(0,4,28202310.000000000)
        tdSql.checkData(0,5,14.086956522)

    def run(self):

        self.prepare_datas_of_distribute()
        self.check_distribute_datas()
        self.check_avg_status()
        self.distribute_agg_query()

C
cpwu 已提交
247

248 249 250 251 252 253
    def stop(self):
        tdSql.close()
        tdLog.success("%s successfully executed" % __file__)

tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())