distribute_agg_count.py 10.5 KB
Newer Older
1 2 3 4
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
C
cpwu 已提交
5
import random
6 7 8 9


class TDTestCase:

C
cpwu 已提交
10
    updatecfgDict = {"maxTablesPerVnode":2 ,"minTablesPerVnode":2,"tableIncStepPerVnode":2 }
11
    def init(self, conn, logSql, replicaVar=1):
12
        self.replicaVar = int(replicaVar)
13 14 15 16 17 18 19 20 21 22 23 24 25
        tdLog.debug("start to execute %s" % __file__)
        tdSql.init(conn.cursor())
        self.vnode_disbutes = None
        self.ts = 1537146000000


    def check_count_functions(self, tbname , col_name):

        max_sql = f"select count({col_name}) from {tbname};"

        same_sql = f"select sum(c) from (select {col_name} ,1 as c  from {tbname} where {col_name} is not null) "

        tdSql.query(max_sql)
C
cpwu 已提交
26
        max_result = tdSql.queryResult
27 28 29 30 31 32 33 34 35

        tdSql.query(same_sql)
        same_result = tdSql.queryResult

        if max_result !=same_result:
            tdLog.exit(" count function work not as expected, sql : %s "% max_sql)
        else:
            tdLog.info(" count function work as expected, sql : %s "% max_sql)

C
cpwu 已提交
36
    def prepare_datas_of_distribute(self, dbname="testdb"):
37 38

        # prepate datas for  20 tables distributed at different vgroups
C
cpwu 已提交
39 40
        tdSql.execute(f"create database if not exists {dbname} keep 3650 duration 1000 vgroups 5")
        tdSql.execute(f" use {dbname} ")
41
        tdSql.execute(
C
cpwu 已提交
42
            f'''create table {dbname}.stb1
43 44 45 46 47 48
            (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
            tags (t0 timestamp, t1 int, t2 bigint, t3 smallint, t4 tinyint, t5 float, t6 double, t7 bool, t8 binary(16),t9 nchar(32))
            '''
        )

        for i in range(20):
C
cpwu 已提交
49
            tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
50 51 52

        for i in range(9):
            tdSql.execute(
C
cpwu 已提交
53
                f"insert into {dbname}.ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
54 55
            )
            tdSql.execute(
C
cpwu 已提交
56
                f"insert into {dbname}.ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
57 58 59 60 61 62
            )

        for i in range(1,21):
            if i ==1 or i == 4:
                continue
            else:
C
cpwu 已提交
63
                tbname = f"{dbname}.ct{i}"
64 65 66 67
                for j in range(9):
                    tdSql.execute(
                f"insert into {tbname} values ( now()-{(i+j)*10}s, {1*(j+i)}, {11111*(j+i)}, {111*(j+i)}, {11*(j)}, {1.11*(j+i)}, {11.11*(j+i)}, {(j+i)%2}, 'binary{j}', 'nchar{j}', now()+{1*j}a )"
            )
C
cpwu 已提交
68 69 70 71
        tdSql.execute(f"insert into {dbname}.ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
72

C
cpwu 已提交
73 74 75
        tdSql.execute(f"insert into {dbname}.ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL  ) ")
76 77 78

        tdLog.info(" prepare data for distributed_aggregate done! ")

C
cpwu 已提交
79
    def check_distribute_datas(self, dbname="testdb"):
80
        # get vgroup_ids of all
C
cpwu 已提交
81
        tdSql.query(f"show {dbname}.vgroups ")
82 83 84
        vgroups = tdSql.queryResult

        vnode_tables={}
C
cpwu 已提交
85

86 87
        for vgroup_id in vgroups:
            vnode_tables[vgroup_id[0]]=[]
C
cpwu 已提交
88

89 90

        # check sub_table of per vnode ,make sure sub_table has been distributed
X
Xiaoyu Wang 已提交
91
        tdSql.query(f"select * from information_schema.ins_tables where db_name = '{dbname}' and table_name like 'ct%'")
92 93 94
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
C
cpwu 已提交
95
            vnode_tables[table_name[6]].append(table_name[0])
96 97 98 99 100 101 102 103 104
        self.vnode_disbutes = vnode_tables

        count = 0
        for k ,v in vnode_tables.items():
            if len(v)>=2:
                count+=1
        if count < 2:
            tdLog.exit(" the datas of all not satisfy sub_table has been distributed ")

C
cpwu 已提交
105 106
    def check_count_distribute_diff_vnode(self,col_name, dbname="testdb"):

107 108 109 110
        vgroup_ids = []
        for k ,v in self.vnode_disbutes.items():
            if len(v)>=2:
                vgroup_ids.append(k)
C
cpwu 已提交
111

112
        distribute_tbnames = []
C
cpwu 已提交
113

114 115 116 117 118 119 120 121 122
        for vgroup_id in vgroup_ids:
            vnode_tables = self.vnode_disbutes[vgroup_id]
            distribute_tbnames.append(random.sample(vnode_tables,1)[0])
        tbname_ins = ""
        for tbname in distribute_tbnames:
            tbname_ins += "'%s' ,"%tbname

        tbname_filters = tbname_ins[:-1]

C
cpwu 已提交
123 124 125
        max_sql = f"select count({col_name}) from {dbname}.stb1 where tbname in ({tbname_filters});"

        same_sql = f"select sum(c) from (select {col_name} ,1 as c  from {dbname}.stb1 where tbname in ({tbname_filters}) and {col_name} is not null) "
126 127

        tdSql.query(max_sql)
C
cpwu 已提交
128
        max_result = tdSql.queryResult
129 130 131 132 133 134 135 136 137

        tdSql.query(same_sql)
        same_result = tdSql.queryResult

        if max_result !=same_result:
            tdLog.exit(" count function work not as expected, sql : %s "% max_sql)
        else:
            tdLog.info(" count function work as expected, sql : %s "% max_sql)

C
cpwu 已提交
138 139 140 141
    def check_count_status(self, dbname="testdb"):
        # check max function work status

        tdSql.query(f"show {dbname}.tables like 'ct%'")
142 143 144
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
C
cpwu 已提交
145
            tablenames.append(f"{dbname}.{table_name[0]}")
146

C
cpwu 已提交
147
        tdSql.query(f"desc {dbname}.stb1")
148
        col_names = tdSql.queryResult
C
cpwu 已提交
149

150 151 152 153
        colnames = []
        for col_name in col_names:
            if col_name[1] in ["INT" ,"BIGINT" ,"SMALLINT" ,"TINYINT" , "FLOAT" ,"DOUBLE"]:
                colnames.append(col_name[0])
C
cpwu 已提交
154

155 156 157 158
        for tablename in tablenames:
            for colname in colnames:
                self.check_count_functions(tablename,colname)

C
cpwu 已提交
159
        # check max function for different vnode
160 161 162

        for colname in colnames:
            if colname.startswith("c"):
C
cpwu 已提交
163
                self.check_count_distribute_diff_vnode(colname, dbname)
164
            else:
C
cpwu 已提交
165
                # self.check_count_distribute_diff_vnode(colname, dbname) # bug for tag
166 167
                pass

C
cpwu 已提交
168
    def distribute_agg_query(self, dbname="testdb"):
169
        # basic filter
C
cpwu 已提交
170
        tdSql.query(f"select count(c1) from {dbname}.stb1 ")
171 172
        tdSql.checkData(0,0,184)

C
cpwu 已提交
173
        tdSql.query(f"select count(c1) from {dbname}.stb1 where t1=1")
174 175
        tdSql.checkData(0,0,9)

C
cpwu 已提交
176
        tdSql.query(f"select count(c1+c2) from {dbname}.stb1 where c1 =1 ")
177 178
        tdSql.checkData(0,0,2)

C
cpwu 已提交
179
        tdSql.query(f"select count(c1) from {dbname}.stb1 where tbname=\"ct2\"")
180 181
        tdSql.checkData(0,0,9)

C
cpwu 已提交
182
        tdSql.query(f"select count(c1) from {dbname}.stb1 partition by tbname")
183 184
        tdSql.checkRows(20)

C
cpwu 已提交
185
        tdSql.query(f"select count(c1) from {dbname}.stb1 where t1> 4  partition by tbname")
G
Ganlin Zhao 已提交
186
        tdSql.checkRows(15)
187

C
cpwu 已提交
188 189
        # union all
        tdSql.query(f"select count(c1) from {dbname}.stb1 union all select count(c1) from {dbname}.stb1 ")
190 191 192
        tdSql.checkRows(2)
        tdSql.checkData(0,0,184)

C
cpwu 已提交
193
        # join
194 195 196

        tdSql.execute(" create database if not exists db ")
        tdSql.execute(" use db ")
C
cpwu 已提交
197 198 199 200
        tdSql.execute(" create stable db.st (ts timestamp , c1 int ,c2 float) tags(t1 int) ")
        tdSql.execute(" create table db.tb1 using db.st tags(1) ")
        tdSql.execute(" create table db.tb2 using db.st tags(2) ")

201 202 203

        for i in range(10):
            ts = i*10 + self.ts
C
cpwu 已提交
204 205
            tdSql.execute(f" insert into db.tb1 values({ts},{i},{i}.0)")
            tdSql.execute(f" insert into db.tb2 values({ts},{i},{i}.0)")
206

C
cpwu 已提交
207
        tdSql.query(f"select count(tb1.c1), count(tb2.c2) from db.tb1 tb1, db.tb2 tb2 where tb1.ts=tb2.ts")
208 209 210 211
        tdSql.checkRows(1)
        tdSql.checkData(0,0,10)
        tdSql.checkData(0,1,10)

C
cpwu 已提交
212 213
        # group by
        tdSql.execute(f" use {dbname} ")
214

C
cpwu 已提交
215
        tdSql.query(f"select count(*)  from {dbname}.stb1 ")
216
        tdSql.checkData(0,0,187)
C
cpwu 已提交
217
        tdSql.query(f"select count(*)  from {dbname}.stb1 group by t1 ")
218
        tdSql.checkRows(20)
C
cpwu 已提交
219
        tdSql.query(f"select count(*)  from {dbname}.stb1 group by c1 ")
220
        tdSql.checkRows(30)
C
cpwu 已提交
221
        tdSql.query(f"select count(*)  from {dbname}.stb1 group by c2 ")
222 223 224
        tdSql.checkRows(31)

        # partition by tbname or partition by tag
C
cpwu 已提交
225
        tdSql.query(f"select max(c1),tbname from {dbname}.stb1 partition by tbname")
226
        query_data = tdSql.queryResult
C
cpwu 已提交
227

228
        for row in query_data:
C
cpwu 已提交
229 230
            tbname = f"{dbname}.{row[1]}"
            tdSql.query(f"select max(c1) from %s "%tbname)
231 232
            tdSql.checkData(0,0,row[0])

C
cpwu 已提交
233
        tdSql.query(f"select max(c1),tbname from {dbname}.stb1 partition by t1")
234
        query_data = tdSql.queryResult
C
cpwu 已提交
235

236
        for row in query_data:
C
cpwu 已提交
237 238
            tbname = f"{dbname}.{row[1]}"
            tdSql.query(f"select max(c1) from %s "%tbname)
239 240 241
            tdSql.checkData(0,0,row[0])

        # nest query for support max
C
cpwu 已提交
242
        tdSql.query(f"select abs(c2+2)+1 from (select count(c1) c2  from {dbname}.stb1)")
243
        tdSql.checkData(0,0,187.000000000)
C
cpwu 已提交
244
        tdSql.query(f"select count(c1+2)  as c2 from (select ts ,c1 ,c2  from {dbname}.stb1)")
245
        tdSql.checkData(0,0,184)
C
cpwu 已提交
246
        tdSql.query(f"select count(a+2)  as c2 from (select ts ,abs(c1) a ,c2  from {dbname}.stb1)")
247 248 249
        tdSql.checkData(0,0,184)

        # mixup with other functions
C
cpwu 已提交
250
        tdSql.query(f"select max(c1),count(c1),last(c2,c3) from {dbname}.stb1")
251 252 253 254 255 256 257 258 259 260 261 262
        tdSql.checkData(0,0,28)
        tdSql.checkData(0,1,184)
        tdSql.checkData(0,2,-99999)
        tdSql.checkData(0,3,-999)

    def run(self):

        self.prepare_datas_of_distribute()
        self.check_distribute_datas()
        self.check_count_status()
        self.distribute_agg_query()

C
cpwu 已提交
263

264 265 266 267 268 269
    def stop(self):
        tdSql.close()
        tdLog.success("%s successfully executed" % __file__)

tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())