distribute_agg_min.py 11.0 KB
Newer Older
1 2 3 4
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
C
cpwu 已提交
5
import random
6 7 8


class TDTestCase:
C
cpwu 已提交
9 10

    updatecfgDict = {"maxTablesPerVnode":2 ,"minTablesPerVnode":2,"tableIncStepPerVnode":2 }
11

12
    def init(self, conn, logSql, replicaVar=1):
13 14 15 16 17 18 19 20 21 22 23 24 25
        tdLog.debug("start to execute %s" % __file__)
        tdSql.init(conn.cursor())
        self.vnode_disbutes = None
        self.ts = 1537146000000


    def check_min_functions(self, tbname , col_name):

        min_sql = f"select min({col_name}) from {tbname};"

        same_sql = f"select {col_name} from {tbname} where {col_name} is not null order by {col_name} asc limit 1"

        tdSql.query(min_sql)
C
cpwu 已提交
26
        min_result = tdSql.queryResult
27 28 29 30 31 32 33 34 35

        tdSql.query(same_sql)
        same_result = tdSql.queryResult

        if min_result !=same_result:
            tdLog.exit(" min function work not as expected, sql : %s "% min_sql)
        else:
            tdLog.info(" min function work as expected, sql : %s "% min_sql)

C
cpwu 已提交
36
    def prepare_datas_of_distribute(self, dbname="testdb"):
37 38

        # prepate datas for  20 tables distributed at different vgroups
C
cpwu 已提交
39 40
        tdSql.execute(f"create database if not exists {dbname} keep 3650 duration 1000 vgroups 5")
        tdSql.execute(f" use {dbname} ")
41
        tdSql.execute(
C
cpwu 已提交
42
            f'''create table {dbname}.stb1
43 44 45 46 47 48
            (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
            tags (t0 timestamp, t1 int, t2 bigint, t3 smallint, t4 tinyint, t5 float, t6 double, t7 bool, t8 binary(16),t9 nchar(32))
            '''
        )

        for i in range(20):
C
cpwu 已提交
49
            tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
50 51 52

        for i in range(9):
            tdSql.execute(
C
cpwu 已提交
53
                f"insert into {dbname}.ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
54 55
            )
            tdSql.execute(
C
cpwu 已提交
56
                f"insert into {dbname}.ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
57 58 59 60 61 62
            )

        for i in range(1,21):
            if i ==1 or i == 4:
                continue
            else:
C
cpwu 已提交
63
                tbname = f"{dbname}.ct{i}"
64 65 66 67
                for j in range(9):
                    tdSql.execute(
                f"insert into {tbname} values ( now()-{(i+j)*10}s, {1*(j+i)}, {11111*(j+i)}, {111*(j+i)}, {11*(j)}, {1.11*(j+i)}, {11.11*(j+i)}, {(j+i)%2}, 'binary{j}', 'nchar{j}', now()+{1*j}a )"
            )
C
cpwu 已提交
68 69 70 71
        tdSql.execute(f"insert into {dbname}.ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
72

C
cpwu 已提交
73 74 75
        tdSql.execute(f"insert into {dbname}.ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL  ) ")
76 77 78

        tdLog.info(" prepare data for distributed_aggregate done! ")

C
cpwu 已提交
79
    def check_distribute_datas(self, dbname="testdb"):
80
        # get vgroup_ids of all
C
cpwu 已提交
81
        tdSql.query(f"show {dbname}.vgroups ")
82 83 84
        vgroups = tdSql.queryResult

        vnode_tables={}
C
cpwu 已提交
85

86 87 88 89
        for vgroup_id in vgroups:
            vnode_tables[vgroup_id[0]]=[]

        # check sub_table of per vnode ,make sure sub_table has been distributed
X
Xiaoyu Wang 已提交
90
        tdSql.query(f"select * from information_schema.ins_tables where db_name = '{dbname}' and table_name like 'ct%'")
91 92 93
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
C
cpwu 已提交
94
            vnode_tables[table_name[6]].append(table_name[0])
95 96 97 98 99 100 101 102 103
        self.vnode_disbutes = vnode_tables

        count = 0
        for k ,v in vnode_tables.items():
            if len(v)>=2:
                count+=1
        if count < 2:
            tdLog.exit(" the datas of all not satisfy sub_table has been distributed ")

C
cpwu 已提交
104 105
    def check_min_distribute_diff_vnode(self,col_name, dbname="testdb"):

106 107 108 109
        vgroup_ids = []
        for k ,v in self.vnode_disbutes.items():
            if len(v)>=2:
                vgroup_ids.append(k)
C
cpwu 已提交
110

111
        distribute_tbnames = []
C
cpwu 已提交
112

113 114 115 116 117 118 119 120 121
        for vgroup_id in vgroup_ids:
            vnode_tables = self.vnode_disbutes[vgroup_id]
            distribute_tbnames.append(random.sample(vnode_tables,1)[0])
        tbname_ins = ""
        for tbname in distribute_tbnames:
            tbname_ins += "'%s' ,"%tbname

        tbname_filters = tbname_ins[:-1]

C
cpwu 已提交
122 123 124
        min_sql = f"select min({col_name}) from {dbname}.stb1 where tbname in ({tbname_filters});"

        same_sql = f"select {col_name} from {dbname}.stb1 where tbname in ({tbname_filters}) and {col_name} is not null order by {col_name} asc limit 1"
125 126

        tdSql.query(min_sql)
C
cpwu 已提交
127
        min_result = tdSql.queryResult
128 129 130 131 132 133 134 135 136

        tdSql.query(same_sql)
        same_result = tdSql.queryResult

        if min_result !=same_result:
            tdLog.exit(" min function work not as expected, sql : %s "% min_sql)
        else:
            tdLog.info(" min function work as expected, sql : %s "% min_sql)

C
cpwu 已提交
137 138 139 140
    def check_min_status(self, dbname="testdb"):
        # check min function work status

        tdSql.query(f"show {dbname}.tables like 'ct%'")
141 142 143
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
C
cpwu 已提交
144
            tablenames.append(f"{dbname}.{table_name[0]}")
145

C
cpwu 已提交
146
        tdSql.query(f"desc {dbname}.stb1")
147
        col_names = tdSql.queryResult
C
cpwu 已提交
148

149 150 151 152
        colnames = []
        for col_name in col_names:
            if col_name[1] in ["INT" ,"BIGINT" ,"SMALLINT" ,"TINYINT" , "FLOAT" ,"DOUBLE"]:
                colnames.append(col_name[0])
C
cpwu 已提交
153

154 155 156 157
        for tablename in tablenames:
            for colname in colnames:
                self.check_min_functions(tablename,colname)

C
cpwu 已提交
158
        # check min function for different vnode
159 160 161

        for colname in colnames:
            if colname.startswith("c"):
C
cpwu 已提交
162
                self.check_min_distribute_diff_vnode(colname, dbname)
163
            else:
C
cpwu 已提交
164
                # self.check_min_distribute_diff_vnode(colname, dbname) # bug for tag
165 166
                pass

C
cpwu 已提交
167
    def distribute_agg_query(self, dbname="testdb"):
168
        # basic filter
C
cpwu 已提交
169
        tdSql.query(f"select min(c1) from {dbname}.stb1 where c1 is null")
170
        tdSql.checkRows(1)
171

C
cpwu 已提交
172
        tdSql.query(f"select min(c1) from {dbname}.stb1 where t1=1")
173 174
        tdSql.checkData(0,0,2)

C
cpwu 已提交
175
        tdSql.query(f"select min(c1+c2) from {dbname}.stb1 where c1 =1 ")
176 177
        tdSql.checkData(0,0,11112.000000000)

C
cpwu 已提交
178 179
        tdSql.query(f"select min(c1) from {dbname}.stb1 where tbname=\"ct2\"")
        tdSql.checkData(0, 0, 2)
180

C
cpwu 已提交
181
        tdSql.query(f"select min(c1) from {dbname}.stb1 partition by tbname")
182 183
        tdSql.checkRows(20)

C
cpwu 已提交
184
        tdSql.query(f"select min(c1) from {dbname}.stb1 where t1> 4  partition by tbname")
185 186
        tdSql.checkRows(15)

C
cpwu 已提交
187 188
        # union all
        tdSql.query(f"select min(c1) from {dbname}.stb1 union all select min(c1) from {dbname}.stb1 ")
189
        tdSql.checkRows(2)
C
cpwu 已提交
190
        tdSql.checkData(0, 0, 0)
191

C
cpwu 已提交
192
        # join
193 194 195

        tdSql.execute(" create database if not exists db ")
        tdSql.execute(" use db ")
C
cpwu 已提交
196 197 198 199
        tdSql.execute(" create stable db.st (ts timestamp , c1 int ,c2 float) tags(t1 int) ")
        tdSql.execute(" create table db.tb1 using db.st tags(1) ")
        tdSql.execute(" create table db.tb2 using db.st tags(2) ")

200 201 202

        for i in range(10):
            ts = i*10 + self.ts
C
cpwu 已提交
203 204
            tdSql.execute(f" insert into db.tb1 values({ts},{i},{i}.0)")
            tdSql.execute(f" insert into db.tb2 values({ts},{i},{i}.0)")
205

C
cpwu 已提交
206
        tdSql.query(f"select min(tb1.c1), tb2.c2 from db.tb1 tb1, db.tb2 tb2 where tb1.ts=tb2.ts")
207 208 209 210
        tdSql.checkRows(1)
        tdSql.checkData(0,0,0)
        tdSql.checkData(0,0,0.00000)

C
cpwu 已提交
211
        # group by
C
cpwu 已提交
212
        tdSql.execute(f"use {dbname} ")
C
cpwu 已提交
213
        tdSql.query(f"select min(c1),c1  from {dbname}.stb1 group by t1 ")
214
        tdSql.checkRows(20)
C
cpwu 已提交
215
        tdSql.query(f"select min(c1),c1  from {dbname}.stb1 group by c1 ")
216
        tdSql.checkRows(30)
C
cpwu 已提交
217
        tdSql.query(f"select min(c1),c2  from {dbname}.stb1 group by c2 ")
218 219
        tdSql.checkRows(31)

220
        # selective common cols of datas
C
cpwu 已提交
221
        tdSql.query(f"select min(c1),c2,c3,c5 from {dbname}.stb1")
222 223 224 225 226 227
        tdSql.checkRows(1)
        tdSql.checkData(0,0,0)
        tdSql.checkData(0,1,0)
        tdSql.checkData(0,2,0)
        tdSql.checkData(0,3,0)

C
cpwu 已提交
228
        tdSql.query(f"select min(c1),t1,c2,t3 from {dbname}.stb1 where c1 > 5")
229 230 231 232
        tdSql.checkRows(1)
        tdSql.checkData(0,0,6)
        tdSql.checkData(0,2,66666)

C
cpwu 已提交
233
        tdSql.query(f"select min(c1),ceil(t1),pow(c2,1)+2,abs(t3) from {dbname}.stb1 where c1 > 12")
234 235
        tdSql.checkRows(1)
        tdSql.checkData(0,0,13)
C
cpwu 已提交
236 237
        tdSql.checkData(0,2,144445.000000000)

238
        # partition by tbname or partition by tag
C
cpwu 已提交
239
        tdSql.query(f"select min(c1),tbname from {dbname}.stb1 partition by tbname")
240
        query_data = tdSql.queryResult
C
cpwu 已提交
241

242
        for row in query_data:
C
cpwu 已提交
243 244
            tbname = f"{dbname}.{row[1]}"
            tdSql.query(f"select min(c1) from %s "%tbname)
245 246
            tdSql.checkData(0,0,row[0])

C
cpwu 已提交
247
        tdSql.query(f"select min(c1),tbname from {dbname}.stb1 partition by t1")
248
        query_data = tdSql.queryResult
C
cpwu 已提交
249

250
        for row in query_data:
C
cpwu 已提交
251 252
            tbname = f"{dbname}.{row[1]}"
            tdSql.query(f"select min(c1) from %s "%tbname)
253 254
            tdSql.checkData(0,0,row[0])

C
cpwu 已提交
255 256
        # nest query for support min
        tdSql.query(f"select abs(c2+2)+1 from (select min(c1) c2  from {dbname}.stb1)")
257
        tdSql.checkData(0,0,3.000000000)
C
cpwu 已提交
258
        tdSql.query(f"select min(c1+2)+1  as c2 from (select ts ,c1 ,c2  from {dbname}.stb1)")
259
        tdSql.checkData(0,0,3.000000000)
C
cpwu 已提交
260
        tdSql.query(f"select min(a+2)+1  as c2 from (select ts ,abs(c1) a ,c2  from {dbname}.stb1)")
261 262 263
        tdSql.checkData(0,0,3.000000000)

        # mixup with other functions
C
cpwu 已提交
264
        tdSql.query(f"select max(c1),count(c1),last(c2,c3) from {dbname}.stb1")
265 266 267 268 269 270 271 272 273 274 275 276
        tdSql.checkData(0,0,28)
        tdSql.checkData(0,1,184)
        tdSql.checkData(0,2,-99999)
        tdSql.checkData(0,3,-999)

    def run(self):

        self.prepare_datas_of_distribute()
        self.check_distribute_datas()
        self.check_min_status()
        self.distribute_agg_query()

C
cpwu 已提交
277

278 279 280 281 282 283
    def stop(self):
        tdSql.close()
        tdLog.success("%s successfully executed" % __file__)

tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())