distribute_agg_count.py 10.5 KB
Newer Older
1 2 3 4
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
C
cpwu 已提交
5
import random
6 7 8 9


class TDTestCase:

C
cpwu 已提交
10
    updatecfgDict = {"maxTablesPerVnode":2 ,"minTablesPerVnode":2,"tableIncStepPerVnode":2 }
11
    def init(self, conn, logSql, replicaVar=1):
12 13 14 15 16 17 18 19 20 21 22 23 24
        tdLog.debug("start to execute %s" % __file__)
        tdSql.init(conn.cursor())
        self.vnode_disbutes = None
        self.ts = 1537146000000


    def check_count_functions(self, tbname , col_name):

        max_sql = f"select count({col_name}) from {tbname};"

        same_sql = f"select sum(c) from (select {col_name} ,1 as c  from {tbname} where {col_name} is not null) "

        tdSql.query(max_sql)
C
cpwu 已提交
25
        max_result = tdSql.queryResult
26 27 28 29 30 31 32 33 34

        tdSql.query(same_sql)
        same_result = tdSql.queryResult

        if max_result !=same_result:
            tdLog.exit(" count function work not as expected, sql : %s "% max_sql)
        else:
            tdLog.info(" count function work as expected, sql : %s "% max_sql)

C
cpwu 已提交
35
    def prepare_datas_of_distribute(self, dbname="testdb"):
36 37

        # prepate datas for  20 tables distributed at different vgroups
C
cpwu 已提交
38 39
        tdSql.execute(f"create database if not exists {dbname} keep 3650 duration 1000 vgroups 5")
        tdSql.execute(f" use {dbname} ")
40
        tdSql.execute(
C
cpwu 已提交
41
            f'''create table {dbname}.stb1
42 43 44 45 46 47
            (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
            tags (t0 timestamp, t1 int, t2 bigint, t3 smallint, t4 tinyint, t5 float, t6 double, t7 bool, t8 binary(16),t9 nchar(32))
            '''
        )

        for i in range(20):
C
cpwu 已提交
48
            tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
49 50 51

        for i in range(9):
            tdSql.execute(
C
cpwu 已提交
52
                f"insert into {dbname}.ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
53 54
            )
            tdSql.execute(
C
cpwu 已提交
55
                f"insert into {dbname}.ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
56 57 58 59 60 61
            )

        for i in range(1,21):
            if i ==1 or i == 4:
                continue
            else:
C
cpwu 已提交
62
                tbname = f"{dbname}.ct{i}"
63 64 65 66
                for j in range(9):
                    tdSql.execute(
                f"insert into {tbname} values ( now()-{(i+j)*10}s, {1*(j+i)}, {11111*(j+i)}, {111*(j+i)}, {11*(j)}, {1.11*(j+i)}, {11.11*(j+i)}, {(j+i)%2}, 'binary{j}', 'nchar{j}', now()+{1*j}a )"
            )
C
cpwu 已提交
67 68 69 70
        tdSql.execute(f"insert into {dbname}.ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
71

C
cpwu 已提交
72 73 74
        tdSql.execute(f"insert into {dbname}.ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL  ) ")
75 76 77

        tdLog.info(" prepare data for distributed_aggregate done! ")

C
cpwu 已提交
78
    def check_distribute_datas(self, dbname="testdb"):
79
        # get vgroup_ids of all
C
cpwu 已提交
80
        tdSql.query(f"show {dbname}.vgroups ")
81 82 83
        vgroups = tdSql.queryResult

        vnode_tables={}
C
cpwu 已提交
84

85 86
        for vgroup_id in vgroups:
            vnode_tables[vgroup_id[0]]=[]
C
cpwu 已提交
87

88 89

        # check sub_table of per vnode ,make sure sub_table has been distributed
X
Xiaoyu Wang 已提交
90
        tdSql.query(f"select * from information_schema.ins_tables where db_name = '{dbname}' and table_name like 'ct%'")
91 92 93
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
C
cpwu 已提交
94
            vnode_tables[table_name[6]].append(table_name[0])
95 96 97 98 99 100 101 102 103
        self.vnode_disbutes = vnode_tables

        count = 0
        for k ,v in vnode_tables.items():
            if len(v)>=2:
                count+=1
        if count < 2:
            tdLog.exit(" the datas of all not satisfy sub_table has been distributed ")

C
cpwu 已提交
104 105
    def check_count_distribute_diff_vnode(self,col_name, dbname="testdb"):

106 107 108 109
        vgroup_ids = []
        for k ,v in self.vnode_disbutes.items():
            if len(v)>=2:
                vgroup_ids.append(k)
C
cpwu 已提交
110

111
        distribute_tbnames = []
C
cpwu 已提交
112

113 114 115 116 117 118 119 120 121
        for vgroup_id in vgroup_ids:
            vnode_tables = self.vnode_disbutes[vgroup_id]
            distribute_tbnames.append(random.sample(vnode_tables,1)[0])
        tbname_ins = ""
        for tbname in distribute_tbnames:
            tbname_ins += "'%s' ,"%tbname

        tbname_filters = tbname_ins[:-1]

C
cpwu 已提交
122 123 124
        max_sql = f"select count({col_name}) from {dbname}.stb1 where tbname in ({tbname_filters});"

        same_sql = f"select sum(c) from (select {col_name} ,1 as c  from {dbname}.stb1 where tbname in ({tbname_filters}) and {col_name} is not null) "
125 126

        tdSql.query(max_sql)
C
cpwu 已提交
127
        max_result = tdSql.queryResult
128 129 130 131 132 133 134 135 136

        tdSql.query(same_sql)
        same_result = tdSql.queryResult

        if max_result !=same_result:
            tdLog.exit(" count function work not as expected, sql : %s "% max_sql)
        else:
            tdLog.info(" count function work as expected, sql : %s "% max_sql)

C
cpwu 已提交
137 138 139 140
    def check_count_status(self, dbname="testdb"):
        # check max function work status

        tdSql.query(f"show {dbname}.tables like 'ct%'")
141 142 143
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
C
cpwu 已提交
144
            tablenames.append(f"{dbname}.{table_name[0]}")
145

C
cpwu 已提交
146
        tdSql.query(f"desc {dbname}.stb1")
147
        col_names = tdSql.queryResult
C
cpwu 已提交
148

149 150 151 152
        colnames = []
        for col_name in col_names:
            if col_name[1] in ["INT" ,"BIGINT" ,"SMALLINT" ,"TINYINT" , "FLOAT" ,"DOUBLE"]:
                colnames.append(col_name[0])
C
cpwu 已提交
153

154 155 156 157
        for tablename in tablenames:
            for colname in colnames:
                self.check_count_functions(tablename,colname)

C
cpwu 已提交
158
        # check max function for different vnode
159 160 161

        for colname in colnames:
            if colname.startswith("c"):
C
cpwu 已提交
162
                self.check_count_distribute_diff_vnode(colname, dbname)
163
            else:
C
cpwu 已提交
164
                # self.check_count_distribute_diff_vnode(colname, dbname) # bug for tag
165 166
                pass

C
cpwu 已提交
167
    def distribute_agg_query(self, dbname="testdb"):
168
        # basic filter
C
cpwu 已提交
169
        tdSql.query(f"select count(c1) from {dbname}.stb1 ")
170 171
        tdSql.checkData(0,0,184)

C
cpwu 已提交
172
        tdSql.query(f"select count(c1) from {dbname}.stb1 where t1=1")
173 174
        tdSql.checkData(0,0,9)

C
cpwu 已提交
175
        tdSql.query(f"select count(c1+c2) from {dbname}.stb1 where c1 =1 ")
176 177
        tdSql.checkData(0,0,2)

C
cpwu 已提交
178
        tdSql.query(f"select count(c1) from {dbname}.stb1 where tbname=\"ct2\"")
179 180
        tdSql.checkData(0,0,9)

C
cpwu 已提交
181
        tdSql.query(f"select count(c1) from {dbname}.stb1 partition by tbname")
182 183
        tdSql.checkRows(20)

C
cpwu 已提交
184
        tdSql.query(f"select count(c1) from {dbname}.stb1 where t1> 4  partition by tbname")
185 186
        tdSql.checkRows(15)

C
cpwu 已提交
187 188
        # union all
        tdSql.query(f"select count(c1) from {dbname}.stb1 union all select count(c1) from {dbname}.stb1 ")
189 190 191
        tdSql.checkRows(2)
        tdSql.checkData(0,0,184)

C
cpwu 已提交
192
        # join
193 194 195

        tdSql.execute(" create database if not exists db ")
        tdSql.execute(" use db ")
C
cpwu 已提交
196 197 198 199
        tdSql.execute(" create stable db.st (ts timestamp , c1 int ,c2 float) tags(t1 int) ")
        tdSql.execute(" create table db.tb1 using db.st tags(1) ")
        tdSql.execute(" create table db.tb2 using db.st tags(2) ")

200 201 202

        for i in range(10):
            ts = i*10 + self.ts
C
cpwu 已提交
203 204
            tdSql.execute(f" insert into db.tb1 values({ts},{i},{i}.0)")
            tdSql.execute(f" insert into db.tb2 values({ts},{i},{i}.0)")
205

C
cpwu 已提交
206
        tdSql.query(f"select count(tb1.c1), count(tb2.c2) from db.tb1 tb1, db.tb2 tb2 where tb1.ts=tb2.ts")
207 208 209 210
        tdSql.checkRows(1)
        tdSql.checkData(0,0,10)
        tdSql.checkData(0,1,10)

C
cpwu 已提交
211 212
        # group by
        tdSql.execute(f" use {dbname} ")
213

C
cpwu 已提交
214
        tdSql.query(f"select count(*)  from {dbname}.stb1 ")
215
        tdSql.checkData(0,0,187)
C
cpwu 已提交
216
        tdSql.query(f"select count(*)  from {dbname}.stb1 group by t1 ")
217
        tdSql.checkRows(20)
C
cpwu 已提交
218
        tdSql.query(f"select count(*)  from {dbname}.stb1 group by c1 ")
219
        tdSql.checkRows(30)
C
cpwu 已提交
220
        tdSql.query(f"select count(*)  from {dbname}.stb1 group by c2 ")
221 222 223
        tdSql.checkRows(31)

        # partition by tbname or partition by tag
C
cpwu 已提交
224
        tdSql.query(f"select max(c1),tbname from {dbname}.stb1 partition by tbname")
225
        query_data = tdSql.queryResult
C
cpwu 已提交
226

227
        for row in query_data:
C
cpwu 已提交
228 229
            tbname = f"{dbname}.{row[1]}"
            tdSql.query(f"select max(c1) from %s "%tbname)
230 231
            tdSql.checkData(0,0,row[0])

C
cpwu 已提交
232
        tdSql.query(f"select max(c1),tbname from {dbname}.stb1 partition by t1")
233
        query_data = tdSql.queryResult
C
cpwu 已提交
234

235
        for row in query_data:
C
cpwu 已提交
236 237
            tbname = f"{dbname}.{row[1]}"
            tdSql.query(f"select max(c1) from %s "%tbname)
238 239 240
            tdSql.checkData(0,0,row[0])

        # nest query for support max
C
cpwu 已提交
241
        tdSql.query(f"select abs(c2+2)+1 from (select count(c1) c2  from {dbname}.stb1)")
242
        tdSql.checkData(0,0,187.000000000)
C
cpwu 已提交
243
        tdSql.query(f"select count(c1+2)  as c2 from (select ts ,c1 ,c2  from {dbname}.stb1)")
244
        tdSql.checkData(0,0,184)
C
cpwu 已提交
245
        tdSql.query(f"select count(a+2)  as c2 from (select ts ,abs(c1) a ,c2  from {dbname}.stb1)")
246 247 248
        tdSql.checkData(0,0,184)

        # mixup with other functions
C
cpwu 已提交
249
        tdSql.query(f"select max(c1),count(c1),last(c2,c3) from {dbname}.stb1")
250 251 252 253 254 255 256 257 258 259 260 261
        tdSql.checkData(0,0,28)
        tdSql.checkData(0,1,184)
        tdSql.checkData(0,2,-99999)
        tdSql.checkData(0,3,-999)

    def run(self):

        self.prepare_datas_of_distribute()
        self.check_distribute_datas()
        self.check_count_status()
        self.distribute_agg_query()

C
cpwu 已提交
262

263 264 265 266 267 268
    def stop(self):
        tdSql.close()
        tdLog.success("%s successfully executed" % __file__)

tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())