distribute_agg_spread.py 11.6 KB
Newer Older
1 2 3 4
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
G
Ganlin Zhao 已提交
5
import random
6 7 8


class TDTestCase:
C
cpwu 已提交
9
    updatecfgDict = {"maxTablesPerVnode":2 ,"minTablesPerVnode":2,"tableIncStepPerVnode":2 }
10

11
    def init(self, conn, logSql, replicaVar=1):
C
cpwu 已提交
12
        tdLog.debug(f"start to execute {__file__}")
13 14 15 16 17 18 19 20 21 22 23 24
        tdSql.init(conn.cursor())
        self.vnode_disbutes = None
        self.ts = 1537146000000


    def check_spread_functions(self, tbname , col_name):

        spread_sql = f"select spread({col_name}) from {tbname};"

        same_sql = f"select max({col_name})-min({col_name}) from {tbname}"

        tdSql.query(spread_sql)
G
Ganlin Zhao 已提交
25
        spread_result = tdSql.queryResult
26 27 28 29 30

        tdSql.query(same_sql)
        same_result = tdSql.queryResult

        if spread_result !=same_result:
C
cpwu 已提交
31
            tdLog.exit(f" max function work not as expected, sql : {spread_sql} ")
32
        else:
C
cpwu 已提交
33
            tdLog.info(f" max function work as expected, sql : {spread_sql} ")
34 35


C
cpwu 已提交
36
    def prepare_datas_of_distribute(self, dbname="testdb"):
G
Ganlin Zhao 已提交
37

38
        # prepate datas for  20 tables distributed at different vgroups
C
cpwu 已提交
39 40
        tdSql.execute(f"create database if not exists {dbname} keep 3650 duration 1000 vgroups 5")
        tdSql.execute(f" use {dbname}")
41
        tdSql.execute(
C
cpwu 已提交
42
            f'''create table {dbname}.stb1
43 44 45 46 47 48
            (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
            tags (t0 timestamp, t1 int, t2 bigint, t3 smallint, t4 tinyint, t5 float, t6 double, t7 bool, t8 binary(16),t9 nchar(32))
            '''
        )

        tdSql.execute(
C
cpwu 已提交
49 50
            f'''
            create table {dbname}.t1
51 52 53 54
            (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
            '''
        )
        for i in range(20):
C
cpwu 已提交
55
            tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
56 57 58

        for i in range(9):
            tdSql.execute(
C
cpwu 已提交
59
                f"insert into {dbname}.ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
60 61
            )
            tdSql.execute(
C
cpwu 已提交
62
                f"insert into {dbname}.ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
63 64 65 66 67 68
            )

        for i in range(1,21):
            if i ==1 or i == 4:
                continue
            else:
C
cpwu 已提交
69
                tbname = f"ct{i}"
70 71
                for j in range(9):
                    tdSql.execute(
C
cpwu 已提交
72
                f"insert into {dbname}.{tbname} values ( now()-{(i+j)*10}s, {1*(j+i)}, {11111*(j+i)}, {111*(j+i)}, {11*(j)}, {1.11*(j+i)}, {11.11*(j+i)}, {(j+i)%2}, 'binary{j}', 'nchar{j}', now()+{1*j}a )"
73 74
            )

C
cpwu 已提交
75 76 77 78 79 80 81 82
        tdSql.execute(f"insert into {dbname}.ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")

        tdSql.execute(f"insert into {dbname}.ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL  ) ")
83 84

        tdSql.execute(
C
cpwu 已提交
85
            f'''insert into {dbname}.t1 values
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
            ( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
            ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a )
            ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a )
            ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a )
            ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a )
            ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
            ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a )
            ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a )
            ( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" )
            ( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" )
            ( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" )
            ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
            '''
        )

C
cpwu 已提交
101
        tdLog.info(f" prepare data for distributed_aggregate done! ")
102

C
cpwu 已提交
103
    def check_distribute_datas(self, dbname="testdb"):
104
        # get vgroup_ids of all
C
cpwu 已提交
105
        tdSql.query(f"show {dbname}.vgroups ")
106 107 108
        vgroups = tdSql.queryResult

        vnode_tables={}
G
Ganlin Zhao 已提交
109

110 111
        for vgroup_id in vgroups:
            vnode_tables[vgroup_id[0]]=[]
G
Ganlin Zhao 已提交
112

113
        # check sub_table of per vnode ,make sure sub_table has been distributed
X
Xiaoyu Wang 已提交
114
        tdSql.query(f"select * from information_schema.ins_tables where db_name = '{dbname}' and table_name like 'ct%'")
115 116 117
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
G
Ganlin Zhao 已提交
118
            vnode_tables[table_name[6]].append(table_name[0])
119 120 121 122 123 124 125
        self.vnode_disbutes = vnode_tables

        count = 0
        for k ,v in vnode_tables.items():
            if len(v)>=2:
                count+=1
        if count < 2:
C
cpwu 已提交
126
            tdLog.exit(f" the datas of all not satisfy sub_table has been distributed ")
127

C
cpwu 已提交
128
    def check_spread_distribute_diff_vnode(self,col_name, dbname="testdb"):
G
Ganlin Zhao 已提交
129

130 131 132 133
        vgroup_ids = []
        for k ,v in self.vnode_disbutes.items():
            if len(v)>=2:
                vgroup_ids.append(k)
G
Ganlin Zhao 已提交
134

135
        distribute_tbnames = []
G
Ganlin Zhao 已提交
136

137 138 139 140 141
        for vgroup_id in vgroup_ids:
            vnode_tables = self.vnode_disbutes[vgroup_id]
            distribute_tbnames.append(random.sample(vnode_tables,1)[0])
        tbname_ins = ""
        for tbname in distribute_tbnames:
C
cpwu 已提交
142
            tbname_ins += f"'{tbname}' ,"
143 144

        tbname_filters = tbname_ins[:-1]
G
Ganlin Zhao 已提交
145

C
cpwu 已提交
146
        spread_sql = f"select spread({col_name}) from {dbname}.stb1 where tbname in ({tbname_filters})"
147

C
cpwu 已提交
148
        same_sql = f"select max({col_name}) - min({col_name}) from {dbname}.stb1 where tbname in ({tbname_filters})"
149 150

        tdSql.query(spread_sql)
G
Ganlin Zhao 已提交
151
        spread_result = tdSql.queryResult
152 153 154 155 156

        tdSql.query(same_sql)
        same_result = tdSql.queryResult

        if spread_result !=same_result:
C
cpwu 已提交
157
            tdLog.exit(f" spread function work not as expected, sql : {spread_sql} ")
158
        else:
C
cpwu 已提交
159
            tdLog.info(f" spread function work as expected, sql : {spread_sql} ")
160

C
cpwu 已提交
161
    def check_spread_status(self, dbname="testdb"):
G
Ganlin Zhao 已提交
162 163
        # check max function work status

C
cpwu 已提交
164
        tdSql.query(f"show {dbname}.tables like 'ct%'")
165 166 167
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
C
cpwu 已提交
168
            tablenames.append(f"{dbname}.{table_name[0]}")
169

C
cpwu 已提交
170
        tdSql.query(f"desc {dbname}.stb1")
171
        col_names = tdSql.queryResult
G
Ganlin Zhao 已提交
172

173 174 175 176
        colnames = []
        for col_name in col_names:
            if col_name[1] in ["INT" ,"BIGINT" ,"SMALLINT" ,"TINYINT" , "FLOAT" ,"DOUBLE"]:
                colnames.append(col_name[0])
G
Ganlin Zhao 已提交
177

178 179 180 181
        for tablename in tablenames:
            for colname in colnames:
                self.check_spread_functions(tablename,colname)

G
Ganlin Zhao 已提交
182
        # check max function for different vnode
183 184

        for colname in colnames:
C
cpwu 已提交
185
            if colname.startswith(f"c"):
186 187
                self.check_spread_distribute_diff_vnode(colname)

C
cpwu 已提交
188
    def distribute_agg_query(self, dbname="testdb"):
189
        # basic filter
C
cpwu 已提交
190
        tdSql.query(f"select spread(c1) from {dbname}.stb1 where c1 is null")
C
cpwu 已提交
191
        tdSql.checkRows(1)
192

C
cpwu 已提交
193
        tdSql.query(f"select spread(c1) from {dbname}.stb1 where t1=1")
194 195
        tdSql.checkData(0,0,8.000000000)

C
cpwu 已提交
196
        tdSql.query(f"select spread(c1+c2) from {dbname}.stb1 where c1 =1 ")
197 198
        tdSql.checkData(0,0,0.000000000)

C
cpwu 已提交
199
        tdSql.query(f"select spread(c1) from {dbname}.stb1 where tbname=\"ct2\"")
200 201
        tdSql.checkData(0,0,8.000000000)

C
cpwu 已提交
202
        tdSql.query(f"select spread(c1) from {dbname}.stb1 partition by tbname")
203 204
        tdSql.checkRows(20)

C
cpwu 已提交
205
        tdSql.query(f"select spread(c1) from {dbname}.stb1 where t1> 4  partition by tbname")
206 207
        tdSql.checkRows(15)

G
Ganlin Zhao 已提交
208
        # union all
C
cpwu 已提交
209
        tdSql.query(f"select spread(c1) from {dbname}.stb1 union all select max(c1)-min(c1) from {dbname}.stb1 ")
210 211 212
        tdSql.checkRows(2)
        tdSql.checkData(0,0,28.000000000)

G
Ganlin Zhao 已提交
213
        # join
214

C
cpwu 已提交
215 216 217 218 219
        tdSql.execute(f" create database if not exists db ")
        tdSql.execute(f" use db ")
        tdSql.execute(f" create stable db.st (ts timestamp , c1 int ,c2 float) tags(t1 int) ")
        tdSql.execute(f" create table db.tb1 using db.st tags(1) ")
        tdSql.execute(f" create table db.tb2 using db.st tags(2) ")
220

G
Ganlin Zhao 已提交
221

222 223
        for i in range(10):
            ts = i*10 + self.ts
C
cpwu 已提交
224 225
            tdSql.execute(f" insert into db.tb1 values({ts},{i},{i}.0)")
            tdSql.execute(f" insert into db.tb2 values({ts},{i},{i}.0)")
226

C
cpwu 已提交
227
        tdSql.query(f"select spread(tb1.c1), spread(tb2.c2) from db.tb1 tb1, db.tb2 tb2 where tb1.ts=tb2.ts")
228 229 230 231
        tdSql.checkRows(1)
        tdSql.checkData(0,0,9.000000000)
        tdSql.checkData(0,0,9.00000)

G
Ganlin Zhao 已提交
232
        # group by
C
cpwu 已提交
233 234
        tdSql.execute(f" use {dbname} ")
        tdSql.query(f" select max(c1),c1  from {dbname}.stb1 group by t1 ")
235
        tdSql.checkRows(20)
C
cpwu 已提交
236
        tdSql.query(f" select max(c1),c1  from {dbname}.stb1 group by c1 ")
237
        tdSql.checkRows(30)
C
cpwu 已提交
238
        tdSql.query(f" select max(c1),c2  from {dbname}.stb1 group by c2 ")
239 240 241
        tdSql.checkRows(31)

        # partition by tbname or partition by tag
C
cpwu 已提交
242
        tdSql.query(f"select spread(c1) from {dbname}.stb1 partition by tbname")
243 244 245
        query_data = tdSql.queryResult

        # nest query for support max
C
cpwu 已提交
246
        tdSql.query(f"select spread(c2+2)+1 from (select max(c1) c2  from {dbname}.stb1)")
247
        tdSql.checkData(0,0,1.000000000)
C
cpwu 已提交
248
        tdSql.query(f"select spread(c1+2)+1  as c2 from (select ts ,c1 ,c2  from {dbname}.stb1)")
249
        tdSql.checkData(0,0,29.000000000)
C
cpwu 已提交
250
        tdSql.query(f"select spread(a+2)+1  as c2 from (select ts ,abs(c1) a ,c2  from {dbname}.stb1)")
251 252 253
        tdSql.checkData(0,0,29.000000000)

        # mixup with other functions
C
cpwu 已提交
254
        tdSql.query(f"select max(c1),count(c1),last(c2,c3),spread(c1) from {dbname}.stb1")
255 256 257 258 259 260 261 262 263 264 265 266 267
        tdSql.checkData(0,0,28)
        tdSql.checkData(0,1,184)
        tdSql.checkData(0,2,-99999)
        tdSql.checkData(0,3,-999)
        tdSql.checkData(0,4,28.000000000)

    def run(self):

        self.prepare_datas_of_distribute()
        self.check_distribute_datas()
        self.check_spread_status()
        self.distribute_agg_query()

G
Ganlin Zhao 已提交
268

269 270
    def stop(self):
        tdSql.close()
C
cpwu 已提交
271
        tdLog.success(f"{__file__} successfully executed")
272 273 274

tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())