distribute_agg_max.py 10.9 KB
Newer Older
1 2 3 4
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
C
cpwu 已提交
5
import random
6 7 8


class TDTestCase:
C
cpwu 已提交
9 10

    updatecfgDict = {"maxTablesPerVnode":2 ,"minTablesPerVnode":2,"tableIncStepPerVnode":2 }
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25

    def init(self, conn, logSql):
        tdLog.debug("start to execute %s" % __file__)
        tdSql.init(conn.cursor())
        self.vnode_disbutes = None
        self.ts = 1537146000000


    def check_max_functions(self, tbname , col_name):

        max_sql = f"select max({col_name}) from {tbname};"

        same_sql = f"select {col_name} from {tbname} order by {col_name} desc limit 1"

        tdSql.query(max_sql)
C
cpwu 已提交
26
        max_result = tdSql.queryResult
27 28 29 30 31 32 33 34 35 36

        tdSql.query(same_sql)
        same_result = tdSql.queryResult

        if max_result !=same_result:
            tdLog.exit(" max function work not as expected, sql : %s "% max_sql)
        else:
            tdLog.info(" max function work as expected, sql : %s "% max_sql)


C
cpwu 已提交
37 38
    def prepare_datas_of_distribute(self, dbname="testdb"):

39
        # prepate datas for  20 tables distributed at different vgroups
C
cpwu 已提交
40 41
        tdSql.execute(f"create database if not exists {dbname} keep 3650 duration 1000 vgroups 5")
        tdSql.execute(f" use {dbname} ")
42
        tdSql.execute(
C
cpwu 已提交
43
            f'''create table {dbname}.stb1
44 45 46 47 48 49
            (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
            tags (t0 timestamp, t1 int, t2 bigint, t3 smallint, t4 tinyint, t5 float, t6 double, t7 bool, t8 binary(16),t9 nchar(32))
            '''
        )

        for i in range(20):
C
cpwu 已提交
50
            tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
51 52 53

        for i in range(9):
            tdSql.execute(
C
cpwu 已提交
54
                f"insert into {dbname}.ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
55 56
            )
            tdSql.execute(
C
cpwu 已提交
57
                f"insert into {dbname}.ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
58 59 60 61 62 63
            )

        for i in range(1,21):
            if i ==1 or i == 4:
                continue
            else:
C
cpwu 已提交
64
                tbname = f"{dbname}.ct{i}"
65 66 67 68
                for j in range(9):
                    tdSql.execute(
                f"insert into {tbname} values ( now()-{(i+j)*10}s, {1*(j+i)}, {11111*(j+i)}, {111*(j+i)}, {11*(j)}, {1.11*(j+i)}, {11.11*(j+i)}, {(j+i)%2}, 'binary{j}', 'nchar{j}', now()+{1*j}a )"
            )
C
cpwu 已提交
69 70 71 72
        tdSql.execute(f"insert into {dbname}.ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
73

C
cpwu 已提交
74 75 76
        tdSql.execute(f"insert into {dbname}.ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL  ) ")
77 78 79

        tdLog.info(" prepare data for distributed_aggregate done! ")

C
cpwu 已提交
80
    def check_distribute_datas(self, dbname="testdb"):
81
        # get vgroup_ids of all
C
cpwu 已提交
82
        tdSql.query(f"show {dbname}.vgroups ")
83 84 85
        vgroups = tdSql.queryResult

        vnode_tables={}
C
cpwu 已提交
86

87 88 89 90
        for vgroup_id in vgroups:
            vnode_tables[vgroup_id[0]]=[]

        # check sub_table of per vnode ,make sure sub_table has been distributed
C
cpwu 已提交
91
        tdSql.query(f"show {dbname}.tables like 'ct%'")
92 93 94
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
C
cpwu 已提交
95
            vnode_tables[table_name[6]].append(table_name[0])
96 97 98 99 100 101 102 103 104
        self.vnode_disbutes = vnode_tables

        count = 0
        for k ,v in vnode_tables.items():
            if len(v)>=2:
                count+=1
        if count < 2:
            tdLog.exit(" the datas of all not satisfy sub_table has been distributed ")

C
cpwu 已提交
105 106
    def check_max_distribute_diff_vnode(self,col_name, dbname="testdb"):

107 108 109 110
        vgroup_ids = []
        for k ,v in self.vnode_disbutes.items():
            if len(v)>=2:
                vgroup_ids.append(k)
C
cpwu 已提交
111

112
        distribute_tbnames = []
C
cpwu 已提交
113

114 115 116 117 118 119 120 121 122
        for vgroup_id in vgroup_ids:
            vnode_tables = self.vnode_disbutes[vgroup_id]
            distribute_tbnames.append(random.sample(vnode_tables,1)[0])
        tbname_ins = ""
        for tbname in distribute_tbnames:
            tbname_ins += "'%s' ,"%tbname

        tbname_filters = tbname_ins[:-1]

C
cpwu 已提交
123 124 125
        max_sql = f"select max({col_name}) from {dbname}.stb1 where tbname in ({tbname_filters});"

        same_sql = f"select {col_name} from {dbname}.stb1 where tbname in ({tbname_filters}) order by {col_name} desc limit 1"
126 127

        tdSql.query(max_sql)
C
cpwu 已提交
128
        max_result = tdSql.queryResult
129 130 131 132 133 134 135 136 137

        tdSql.query(same_sql)
        same_result = tdSql.queryResult

        if max_result !=same_result:
            tdLog.exit(" max function work not as expected, sql : %s "% max_sql)
        else:
            tdLog.info(" max function work as expected, sql : %s "% max_sql)

C
cpwu 已提交
138 139 140 141
    def check_max_status(self, dbname="testdb"):
        # check max function work status

        tdSql.query(f"show {dbname}.tables like 'ct%'")
142 143 144
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
C
cpwu 已提交
145
            tablenames.append(f"{dbname}.{table_name[0]}")
146

C
cpwu 已提交
147
        tdSql.query(f"desc {dbname}.stb1")
148
        col_names = tdSql.queryResult
C
cpwu 已提交
149

150 151 152 153
        colnames = []
        for col_name in col_names:
            if col_name[1] in ["INT" ,"BIGINT" ,"SMALLINT" ,"TINYINT" , "FLOAT" ,"DOUBLE"]:
                colnames.append(col_name[0])
C
cpwu 已提交
154

155 156 157 158
        for tablename in tablenames:
            for colname in colnames:
                self.check_max_functions(tablename,colname)

C
cpwu 已提交
159
        # check max function for different vnode
160 161 162

        for colname in colnames:
            if colname.startswith("c"):
C
cpwu 已提交
163
                self.check_max_distribute_diff_vnode(colname, dbname)
164
            else:
C
cpwu 已提交
165
                # self.check_max_distribute_diff_vnode(colname, dbname) # bug for tag
166 167
                pass

C
cpwu 已提交
168
    def distribute_agg_query(self, dbname="testdb"):
169
        # basic filter
C
cpwu 已提交
170
        tdSql.query(f"select max(c1) from {dbname}.stb1 where c1 is null")
171
        tdSql.checkRows(1)
172

C
cpwu 已提交
173
        tdSql.query(f"select max(c1) from {dbname}.stb1 where t1=1")
174 175
        tdSql.checkData(0,0,10)

C
cpwu 已提交
176
        tdSql.query(f"select max(c1+c2) from {dbname}.stb1 where c1 =1 ")
177 178
        tdSql.checkData(0,0,11112.000000000)

C
cpwu 已提交
179
        tdSql.query(f"select max(c1) from {dbname}.stb1 where tbname=\"ct2\"")
180 181
        tdSql.checkData(0,0,10)

C
cpwu 已提交
182
        tdSql.query(f"select max(c1) from {dbname}.stb1 partition by tbname")
183 184
        tdSql.checkRows(20)

C
cpwu 已提交
185
        tdSql.query(f"select max(c1) from {dbname}.stb1 where t1> 4  partition by tbname")
186 187
        tdSql.checkRows(15)

C
cpwu 已提交
188 189
        # union all
        tdSql.query(f"select max(c1) from {dbname}.stb1 union all select max(c1) from {dbname}.stb1 ")
190 191 192
        tdSql.checkRows(2)
        tdSql.checkData(0,0,28)

C
cpwu 已提交
193
        # join
194 195 196

        tdSql.execute(" create database if not exists db ")
        tdSql.execute(" use db ")
C
cpwu 已提交
197 198 199 200
        tdSql.execute(" create stable db.st (ts timestamp , c1 int ,c2 float) tags(t1 int) ")
        tdSql.execute(" create table db.tb1 using db.st tags(1) ")
        tdSql.execute(" create table db.tb2 using db.st tags(2) ")

201 202 203

        for i in range(10):
            ts = i*10 + self.ts
C
cpwu 已提交
204 205
            tdSql.execute(f" insert into db.tb1 values({ts},{i},{i}.0)")
            tdSql.execute(f" insert into db.tb2 values({ts},{i},{i}.0)")
206

C
cpwu 已提交
207
        tdSql.query(f"select max(tb1.c1), tb2.c2 from db.tb1 tb1, db.tb2 tb2 where tb1.ts=tb2.ts")
208 209 210 211
        tdSql.checkRows(1)
        tdSql.checkData(0,0,9)
        tdSql.checkData(0,0,9.00000)

C
cpwu 已提交
212 213 214
        # group by
        tdSql.execute("use testdb ")
        tdSql.query(f"select max(c1),c1  from {dbname}.stb1 group by t1 ")
215
        tdSql.checkRows(20)
C
cpwu 已提交
216
        tdSql.query(f"select max(c1),c1  from {dbname}.stb1 group by c1 ")
217
        tdSql.checkRows(30)
C
cpwu 已提交
218
        tdSql.query(f"select max(c1),c2  from {dbname}.stb1 group by c2 ")
219 220
        tdSql.checkRows(31)

221
        # selective common cols of datas
C
cpwu 已提交
222
        tdSql.query(f"select max(c1),c2,c3,c5 from {dbname}.stb1")
223 224 225 226 227 228
        tdSql.checkRows(1)
        tdSql.checkData(0,0,28)
        tdSql.checkData(0,1,311108)
        tdSql.checkData(0,2,3108)
        tdSql.checkData(0,3,31.08000)

C
cpwu 已提交
229
        tdSql.query(f"select max(c1),t1,c2,t3 from {dbname}.stb1")
230 231 232 233 234
        tdSql.checkRows(1)
        tdSql.checkData(0,0,28)
        tdSql.checkData(0,1,19)
        tdSql.checkData(0,2,311108)

C
cpwu 已提交
235
        tdSql.query(f"select max(c1),ceil(t1),pow(c2,1)+2,abs(t3) from {dbname}.stb1")
236 237 238
        tdSql.checkRows(1)
        tdSql.checkData(0,0,28)
        tdSql.checkData(0,1,19)
C
cpwu 已提交
239 240
        tdSql.checkData(0,2,311110.000000000)
        tdSql.checkData(0,3,2109)
241

242
        # partition by tbname or partition by tag
C
cpwu 已提交
243
        tdSql.query(f"select max(c1),tbname from {dbname}.stb1 partition by tbname")
244
        query_data = tdSql.queryResult
C
cpwu 已提交
245

246
        for row in query_data:
C
cpwu 已提交
247 248
            tbname = f"{dbname}.{row[1]}"
            tdSql.query(f"select max(c1) from %s "%tbname)
249 250
            tdSql.checkData(0,0,row[0])

C
cpwu 已提交
251
        tdSql.query(f"select max(c1),tbname from {dbname}.stb1 partition by t1")
252
        query_data = tdSql.queryResult
C
cpwu 已提交
253

254
        for row in query_data:
C
cpwu 已提交
255 256
            tbname = f"{dbname}.{row[1]}"
            tdSql.query(f"select max(c1) from %s "%tbname)
257 258 259
            tdSql.checkData(0,0,row[0])

        # nest query for support max
C
cpwu 已提交
260
        tdSql.query(f"select abs(c2+2)+1 from (select max(c1) c2  from {dbname}.stb1)")
261
        tdSql.checkData(0,0,31.000000000)
C
cpwu 已提交
262
        tdSql.query(f"select max(c1+2)+1  as c2 from (select ts ,c1 ,c2  from {dbname}.stb1)")
263
        tdSql.checkData(0,0,31.000000000)
C
cpwu 已提交
264
        tdSql.query(f"select max(a+2)+1  as c2 from (select ts ,abs(c1) a ,c2  from {dbname}.stb1)")
265 266 267
        tdSql.checkData(0,0,31.000000000)

        # mixup with other functions
C
cpwu 已提交
268
        tdSql.query(f"select max(c1),count(c1),last(c2,c3) from {dbname}.stb1")
269 270 271 272
        tdSql.checkData(0,0,28)
        tdSql.checkData(0,1,184)
        tdSql.checkData(0,2,-99999)
        tdSql.checkData(0,3,-999)
273 274 275 276 277 278 279 280

    def run(self):

        self.prepare_datas_of_distribute()
        self.check_distribute_datas()
        self.check_max_status()
        self.distribute_agg_query()

C
cpwu 已提交
281

282 283 284 285 286 287
    def stop(self):
        tdSql.close()
        tdLog.success("%s successfully executed" % __file__)

tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())