distribute_agg_spread.py 11.5 KB
Newer Older
1 2 3 4
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
G
Ganlin Zhao 已提交
5
import random
6 7 8 9 10


class TDTestCase:

    def init(self, conn, logSql):
C
cpwu 已提交
11
        tdLog.debug(f"start to execute {__file__}")
12 13 14 15 16 17 18 19 20 21 22 23
        tdSql.init(conn.cursor())
        self.vnode_disbutes = None
        self.ts = 1537146000000


    def check_spread_functions(self, tbname , col_name):

        spread_sql = f"select spread({col_name}) from {tbname};"

        same_sql = f"select max({col_name})-min({col_name}) from {tbname}"

        tdSql.query(spread_sql)
G
Ganlin Zhao 已提交
24
        spread_result = tdSql.queryResult
25 26 27 28 29

        tdSql.query(same_sql)
        same_result = tdSql.queryResult

        if spread_result !=same_result:
C
cpwu 已提交
30
            tdLog.exit(f" max function work not as expected, sql : {spread_sql} ")
31
        else:
C
cpwu 已提交
32
            tdLog.info(f" max function work as expected, sql : {spread_sql} ")
33 34


C
cpwu 已提交
35
    def prepare_datas_of_distribute(self, dbname="testdb"):
G
Ganlin Zhao 已提交
36

37
        # prepate datas for  20 tables distributed at different vgroups
C
cpwu 已提交
38 39
        tdSql.execute(f"create database if not exists {dbname} keep 3650 duration 1000 vgroups 5")
        tdSql.execute(f" use {dbname}")
40
        tdSql.execute(
C
cpwu 已提交
41
            f'''create table {dbname}.stb1
42 43 44 45 46 47
            (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
            tags (t0 timestamp, t1 int, t2 bigint, t3 smallint, t4 tinyint, t5 float, t6 double, t7 bool, t8 binary(16),t9 nchar(32))
            '''
        )

        tdSql.execute(
C
cpwu 已提交
48 49
            f'''
            create table {dbname}.t1
50 51 52 53
            (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
            '''
        )
        for i in range(20):
C
cpwu 已提交
54
            tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
55 56 57

        for i in range(9):
            tdSql.execute(
C
cpwu 已提交
58
                f"insert into {dbname}.ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
59 60
            )
            tdSql.execute(
C
cpwu 已提交
61
                f"insert into {dbname}.ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
62 63 64 65 66 67
            )

        for i in range(1,21):
            if i ==1 or i == 4:
                continue
            else:
C
cpwu 已提交
68
                tbname = f"ct{i}"
69 70
                for j in range(9):
                    tdSql.execute(
C
cpwu 已提交
71
                f"insert into {dbname}.{tbname} values ( now()-{(i+j)*10}s, {1*(j+i)}, {11111*(j+i)}, {111*(j+i)}, {11*(j)}, {1.11*(j+i)}, {11.11*(j+i)}, {(j+i)%2}, 'binary{j}', 'nchar{j}', now()+{1*j}a )"
72 73
            )

C
cpwu 已提交
74 75 76 77 78 79 80 81
        tdSql.execute(f"insert into {dbname}.ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")

        tdSql.execute(f"insert into {dbname}.ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL  ) ")
82 83

        tdSql.execute(
C
cpwu 已提交
84
            f'''insert into {dbname}.t1 values
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
            ( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
            ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a )
            ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a )
            ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a )
            ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a )
            ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
            ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a )
            ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a )
            ( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" )
            ( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" )
            ( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" )
            ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
            '''
        )

C
cpwu 已提交
100
        tdLog.info(f" prepare data for distributed_aggregate done! ")
101

C
cpwu 已提交
102
    def check_distribute_datas(self, dbname="testdb"):
103
        # get vgroup_ids of all
C
cpwu 已提交
104
        tdSql.query(f"show {dbname}.vgroups ")
105 106 107
        vgroups = tdSql.queryResult

        vnode_tables={}
G
Ganlin Zhao 已提交
108

109 110
        for vgroup_id in vgroups:
            vnode_tables[vgroup_id[0]]=[]
G
Ganlin Zhao 已提交
111

112
        # check sub_table of per vnode ,make sure sub_table has been distributed
C
cpwu 已提交
113
        tdSql.query(f"show {dbname}.tables like 'ct%'")
114 115 116
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
G
Ganlin Zhao 已提交
117
            vnode_tables[table_name[6]].append(table_name[0])
118 119 120 121 122 123 124
        self.vnode_disbutes = vnode_tables

        count = 0
        for k ,v in vnode_tables.items():
            if len(v)>=2:
                count+=1
        if count < 2:
C
cpwu 已提交
125
            tdLog.exit(f" the datas of all not satisfy sub_table has been distributed ")
126

C
cpwu 已提交
127
    def check_spread_distribute_diff_vnode(self,col_name, dbname="testdb"):
G
Ganlin Zhao 已提交
128

129 130 131 132
        vgroup_ids = []
        for k ,v in self.vnode_disbutes.items():
            if len(v)>=2:
                vgroup_ids.append(k)
G
Ganlin Zhao 已提交
133

134
        distribute_tbnames = []
G
Ganlin Zhao 已提交
135

136 137 138 139 140
        for vgroup_id in vgroup_ids:
            vnode_tables = self.vnode_disbutes[vgroup_id]
            distribute_tbnames.append(random.sample(vnode_tables,1)[0])
        tbname_ins = ""
        for tbname in distribute_tbnames:
C
cpwu 已提交
141
            tbname_ins += f"'{tbname}' ,"
142 143

        tbname_filters = tbname_ins[:-1]
G
Ganlin Zhao 已提交
144

C
cpwu 已提交
145
        spread_sql = f"select spread({col_name}) from {dbname}.stb1 where tbname in ({tbname_filters})"
146

C
cpwu 已提交
147
        same_sql = f"select max({col_name}) - min({col_name}) from {dbname}.stb1 where tbname in ({tbname_filters})"
148 149

        tdSql.query(spread_sql)
G
Ganlin Zhao 已提交
150
        spread_result = tdSql.queryResult
151 152 153 154 155

        tdSql.query(same_sql)
        same_result = tdSql.queryResult

        if spread_result !=same_result:
C
cpwu 已提交
156
            tdLog.exit(f" spread function work not as expected, sql : {spread_sql} ")
157
        else:
C
cpwu 已提交
158
            tdLog.info(f" spread function work as expected, sql : {spread_sql} ")
159

C
cpwu 已提交
160
    def check_spread_status(self, dbname="testdb"):
G
Ganlin Zhao 已提交
161 162
        # check max function work status

C
cpwu 已提交
163
        tdSql.query(f"show {dbname}.tables like 'ct%'")
164 165 166
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
C
cpwu 已提交
167
            tablenames.append(f"{dbname}.{table_name[0]}")
168

C
cpwu 已提交
169
        tdSql.query(f"desc {dbname}.stb1")
170
        col_names = tdSql.queryResult
G
Ganlin Zhao 已提交
171

172 173 174 175
        colnames = []
        for col_name in col_names:
            if col_name[1] in ["INT" ,"BIGINT" ,"SMALLINT" ,"TINYINT" , "FLOAT" ,"DOUBLE"]:
                colnames.append(col_name[0])
G
Ganlin Zhao 已提交
176

177 178 179 180
        for tablename in tablenames:
            for colname in colnames:
                self.check_spread_functions(tablename,colname)

G
Ganlin Zhao 已提交
181
        # check max function for different vnode
182 183

        for colname in colnames:
C
cpwu 已提交
184
            if colname.startswith(f"c"):
185 186
                self.check_spread_distribute_diff_vnode(colname)
            else:
G
Ganlin Zhao 已提交
187
                # self.check_spread_distribute_diff_vnode(colname) # bug for tag
188 189
                pass

C
cpwu 已提交
190
    def distribute_agg_query(self, dbname="testdb"):
191
        # basic filter
C
cpwu 已提交
192
        tdSql.query(f"select spread(c1) from {dbname}.stb1 where c1 is null")
193 194
        tdSql.checkRows(0)

C
cpwu 已提交
195
        tdSql.query(f"select spread(c1) from {dbname}.stb1 where t1=1")
196 197
        tdSql.checkData(0,0,8.000000000)

C
cpwu 已提交
198
        tdSql.query(f"select spread(c1+c2) from {dbname}.stb1 where c1 =1 ")
199 200
        tdSql.checkData(0,0,0.000000000)

C
cpwu 已提交
201
        tdSql.query(f"select spread(c1) from {dbname}.stb1 where tbname=\"ct2\"")
202 203
        tdSql.checkData(0,0,8.000000000)

C
cpwu 已提交
204
        tdSql.query(f"select spread(c1) from {dbname}.stb1 partition by tbname")
205 206
        tdSql.checkRows(20)

C
cpwu 已提交
207
        tdSql.query(f"select spread(c1) from {dbname}.stb1 where t1> 4  partition by tbname")
208 209
        tdSql.checkRows(15)

G
Ganlin Zhao 已提交
210
        # union all
C
cpwu 已提交
211
        tdSql.query(f"select spread(c1) from {dbname}.stb1 union all select max(c1)-min(c1) from {dbname}.stb1 ")
212 213 214
        tdSql.checkRows(2)
        tdSql.checkData(0,0,28.000000000)

G
Ganlin Zhao 已提交
215
        # join
216

C
cpwu 已提交
217 218 219 220 221
        tdSql.execute(f" create database if not exists db ")
        tdSql.execute(f" use db ")
        tdSql.execute(f" create stable db.st (ts timestamp , c1 int ,c2 float) tags(t1 int) ")
        tdSql.execute(f" create table db.tb1 using db.st tags(1) ")
        tdSql.execute(f" create table db.tb2 using db.st tags(2) ")
222

G
Ganlin Zhao 已提交
223

224 225
        for i in range(10):
            ts = i*10 + self.ts
C
cpwu 已提交
226 227
            tdSql.execute(f" insert into db.tb1 values({ts},{i},{i}.0)")
            tdSql.execute(f" insert into db.tb2 values({ts},{i},{i}.0)")
228

C
cpwu 已提交
229
        tdSql.query(f"select spread(tb1.c1), spread(tb2.c2) from db.tb1 tb1, db.tb2 tb2 where tb1.ts=tb2.ts")
230 231 232 233
        tdSql.checkRows(1)
        tdSql.checkData(0,0,9.000000000)
        tdSql.checkData(0,0,9.00000)

G
Ganlin Zhao 已提交
234
        # group by
C
cpwu 已提交
235 236
        tdSql.execute(f" use {dbname} ")
        tdSql.query(f" select max(c1),c1  from {dbname}.stb1 group by t1 ")
237
        tdSql.checkRows(20)
C
cpwu 已提交
238
        tdSql.query(f" select max(c1),c1  from {dbname}.stb1 group by c1 ")
239
        tdSql.checkRows(30)
C
cpwu 已提交
240
        tdSql.query(f" select max(c1),c2  from {dbname}.stb1 group by c2 ")
241 242 243
        tdSql.checkRows(31)

        # partition by tbname or partition by tag
C
cpwu 已提交
244
        tdSql.query(f"select spread(c1) from {dbname}.stb1 partition by tbname")
245 246 247
        query_data = tdSql.queryResult

        # nest query for support max
C
cpwu 已提交
248
        tdSql.query(f"select spread(c2+2)+1 from (select max(c1) c2  from {dbname}.stb1)")
249
        tdSql.checkData(0,0,1.000000000)
C
cpwu 已提交
250
        tdSql.query(f"select spread(c1+2)+1  as c2 from (select ts ,c1 ,c2  from {dbname}.stb1)")
251
        tdSql.checkData(0,0,29.000000000)
C
cpwu 已提交
252
        tdSql.query(f"select spread(a+2)+1  as c2 from (select ts ,abs(c1) a ,c2  from {dbname}.stb1)")
253 254 255
        tdSql.checkData(0,0,29.000000000)

        # mixup with other functions
C
cpwu 已提交
256
        tdSql.query(f"select max(c1),count(c1),last(c2,c3),spread(c1) from {dbname}.stb1")
257 258 259 260 261 262 263 264 265 266 267 268 269
        tdSql.checkData(0,0,28)
        tdSql.checkData(0,1,184)
        tdSql.checkData(0,2,-99999)
        tdSql.checkData(0,3,-999)
        tdSql.checkData(0,4,28.000000000)

    def run(self):

        self.prepare_datas_of_distribute()
        self.check_distribute_datas()
        self.check_spread_status()
        self.distribute_agg_query()

G
Ganlin Zhao 已提交
270

271 272
    def stop(self):
        tdSql.close()
C
cpwu 已提交
273
        tdLog.success(f"{__file__} successfully executed")
274 275 276

tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())