distribute_agg_sum.py 11.5 KB
Newer Older
1 2 3 4 5 6 7 8 9
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
import random ,os ,sys
import platform


class TDTestCase:
C
cpwu 已提交
10
    updatecfgDict = {"maxTablesPerVnode":2 ,"minTablesPerVnode":2,"tableIncStepPerVnode":2 }
11

12
    def init(self, conn, logSql, replicaVar=1):
13 14 15 16 17 18 19 20 21 22 23
        tdLog.debug("start to execute %s" % __file__)
        tdSql.init(conn.cursor())
        self.vnode_disbutes = None
        self.ts = 1537146000000


    def check_sum_functions(self, tbname , col_name):

        sum_sql = f"select sum({col_name}) from {tbname};"

        same_sql = f"select {col_name} from {tbname} where {col_name} is not null "
G
Ganlin Zhao 已提交
24

25 26 27 28 29 30 31 32 33
        tdSql.query(same_sql)
        pre_data = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None]
        if (platform.system().lower() == 'windows' and pre_data.dtype == 'int32'):
            pre_data = np.array(pre_data, dtype = 'int64')
        pre_sum = np.sum(pre_data)

        tdSql.query(sum_sql)
        tdSql.checkData(0,0,pre_sum)

C
cpwu 已提交
34
    def prepare_datas_of_distribute(self, dbname="testdb"):
G
Ganlin Zhao 已提交
35

36
        # prepate datas for  20 tables distributed at different vgroups
C
cpwu 已提交
37 38
        tdSql.execute(f"create database if not exists {dbname} keep 3650 duration 1000 vgroups 5")
        tdSql.execute(f" use {dbname}")
39
        tdSql.execute(
C
cpwu 已提交
40
            f'''create table {dbname}.stb1
41 42 43 44 45 46
            (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
            tags (t0 timestamp, t1 int, t2 bigint, t3 smallint, t4 tinyint, t5 float, t6 double, t7 bool, t8 binary(16),t9 nchar(32))
            '''
        )

        tdSql.execute(
C
cpwu 已提交
47 48
            f'''
            create table {dbname}.t1
49 50 51 52
            (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
            '''
        )
        for i in range(20):
C
cpwu 已提交
53
            tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
54 55 56

        for i in range(9):
            tdSql.execute(
C
cpwu 已提交
57
                f"insert into {dbname}.ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
58 59
            )
            tdSql.execute(
C
cpwu 已提交
60
                f"insert into {dbname}.ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
61 62 63 64 65 66
            )

        for i in range(1,21):
            if i ==1 or i == 4:
                continue
            else:
C
cpwu 已提交
67
                tbname = f"ct{i}"
68 69
                for j in range(9):
                    tdSql.execute(
C
cpwu 已提交
70
                f"insert into {dbname}.{tbname} values ( now()-{(i+j)*10}s, {1*(j+i)}, {11111*(j+i)}, {111*(j+i)}, {11*(j)}, {1.11*(j+i)}, {11.11*(j+i)}, {(j+i)%2}, 'binary{j}', 'nchar{j}', now()+{1*j}a )"
71 72
            )

C
cpwu 已提交
73 74 75 76 77 78 79 80
        tdSql.execute(f"insert into {dbname}.ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")

        tdSql.execute(f"insert into {dbname}.ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL  ) ")
81 82

        tdSql.execute(
C
cpwu 已提交
83
            f'''insert into {dbname}.t1 values
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
            ( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
            ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a )
            ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a )
            ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a )
            ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a )
            ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
            ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a )
            ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a )
            ( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" )
            ( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" )
            ( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" )
            ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
            '''
        )

C
cpwu 已提交
99
        tdLog.info(f" prepare data for distributed_aggregate done! ")
100

C
cpwu 已提交
101
    def check_distribute_datas(self, dbname="testdb"):
102
        # get vgroup_ids of all
C
cpwu 已提交
103
        tdSql.query(f"show {dbname}.vgroups ")
104 105 106
        vgroups = tdSql.queryResult

        vnode_tables={}
G
Ganlin Zhao 已提交
107

108 109
        for vgroup_id in vgroups:
            vnode_tables[vgroup_id[0]]=[]
G
Ganlin Zhao 已提交
110

111
        # check sub_table of per vnode ,make sure sub_table has been distributed
X
Xiaoyu Wang 已提交
112
        tdSql.query(f"select * from information_schema.ins_tables where db_name = '{dbname}' and table_name like 'ct%'")
113 114 115
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
G
Ganlin Zhao 已提交
116
            vnode_tables[table_name[6]].append(table_name[0])
117 118 119 120 121 122 123
        self.vnode_disbutes = vnode_tables

        count = 0
        for k ,v in vnode_tables.items():
            if len(v)>=2:
                count+=1
        if count < 2:
C
cpwu 已提交
124
            tdLog.exit(f" the datas of all not satisfy sub_table has been distributed ")
125

C
cpwu 已提交
126
    def check_sum_distribute_diff_vnode(self,col_name, dbname="testdb"):
G
Ganlin Zhao 已提交
127

128 129 130 131
        vgroup_ids = []
        for k ,v in self.vnode_disbutes.items():
            if len(v)>=2:
                vgroup_ids.append(k)
G
Ganlin Zhao 已提交
132

133
        distribute_tbnames = []
G
Ganlin Zhao 已提交
134

135 136 137 138 139 140 141 142
        for vgroup_id in vgroup_ids:
            vnode_tables = self.vnode_disbutes[vgroup_id]
            distribute_tbnames.append(random.sample(vnode_tables,1)[0])
        tbname_ins = ""
        for tbname in distribute_tbnames:
            tbname_ins += "'%s' ,"%tbname

        tbname_filters = tbname_ins[:-1]
G
Ganlin Zhao 已提交
143

C
cpwu 已提交
144
        sum_sql = f"select sum({col_name}) from {dbname}.stb1 where tbname in ({tbname_filters});"
145

C
cpwu 已提交
146
        same_sql = f"select {col_name}  from {dbname}.stb1 where tbname in ({tbname_filters}) and {col_name} is not null "
147 148 149 150 151 152 153 154 155 156

        tdSql.query(same_sql)
        pre_data = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None]
        if (platform.system().lower() == 'windows' and pre_data.dtype == 'int32'):
            pre_data = np.array(pre_data, dtype = 'int64')
        pre_sum = np.sum(pre_data)

        tdSql.query(sum_sql)
        tdSql.checkData(0,0,pre_sum)

C
cpwu 已提交
157
    def check_sum_status(self, dbname="testdb"):
G
Ganlin Zhao 已提交
158 159
        # check max function work status

C
cpwu 已提交
160
        tdSql.query(f"show {dbname}.tables like 'ct%'")
161 162 163
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
C
cpwu 已提交
164
            tablenames.append(f"{dbname}.{table_name[0]}")
165

C
cpwu 已提交
166
        tdSql.query(f"desc {dbname}.stb1")
167
        col_names = tdSql.queryResult
G
Ganlin Zhao 已提交
168

169 170 171 172
        colnames = []
        for col_name in col_names:
            if col_name[1] in ["INT" ,"BIGINT" ,"SMALLINT" ,"TINYINT" , "FLOAT" ,"DOUBLE"]:
                colnames.append(col_name[0])
G
Ganlin Zhao 已提交
173

174 175 176 177
        for tablename in tablenames:
            for colname in colnames:
                self.check_sum_functions(tablename,colname)

G
Ganlin Zhao 已提交
178
        # check max function for different vnode
179 180 181 182 183

        for colname in colnames:
            if colname.startswith("c"):
                self.check_sum_distribute_diff_vnode(colname)

C
cpwu 已提交
184
    def distribute_agg_query(self, dbname="testdb"):
185
        # basic filter
C
cpwu 已提交
186
        tdSql.query(f"select sum(c1) from {dbname}.stb1 ")
187 188
        tdSql.checkData(0,0,2592)

C
cpwu 已提交
189
        tdSql.query(f"select sum(a) from (select sum(c1) a  from {dbname}.stb1 partition by tbname) ")
190 191
        tdSql.checkData(0,0,2592)

C
cpwu 已提交
192
        tdSql.query(f"select sum(c1) from {dbname}.stb1 where t1=1")
193 194
        tdSql.checkData(0,0,54)

C
cpwu 已提交
195
        tdSql.query(f"select sum(c1+c2) from {dbname}.stb1 where c1 =1 ")
196 197
        tdSql.checkData(0,0,22224.000000000)

C
cpwu 已提交
198
        tdSql.query(f"select sum(c1) from {dbname}.stb1 where tbname=\"ct2\"")
199 200
        tdSql.checkData(0,0,54)

C
cpwu 已提交
201
        tdSql.query(f"select sum(c1) from {dbname}.stb1 partition by tbname")
202 203
        tdSql.checkRows(20)

C
cpwu 已提交
204
        tdSql.query(f"select sum(c1) from {dbname}.stb1 where t1> 4  partition by tbname")
205 206
        tdSql.checkRows(15)

G
Ganlin Zhao 已提交
207
        # union all
C
cpwu 已提交
208
        tdSql.query(f"select sum(c1) from {dbname}.stb1 union all select sum(c1) from {dbname}.stb1 ")
209 210 211
        tdSql.checkRows(2)
        tdSql.checkData(0,0,2592)

C
cpwu 已提交
212
        tdSql.query(f"select sum(a) from (select sum(c1) a from {dbname}.stb1 union all select sum(c1) a  from {dbname}.stb1)")
213 214 215
        tdSql.checkRows(1)
        tdSql.checkData(0,0,5184)

G
Ganlin Zhao 已提交
216
        # join
217 218
        tdSql.execute(" create database if not exists db ")
        tdSql.execute(" use db ")
C
cpwu 已提交
219 220 221
        tdSql.execute(" create stable db.st (ts timestamp , c1 int ,c2 float) tags(t1 int) ")
        tdSql.execute(" create table db.tb1 using db.st tags(1) ")
        tdSql.execute(" create table db.tb2 using db.st tags(2) ")
222

G
Ganlin Zhao 已提交
223

224 225
        for i in range(10):
            ts = i*10 + self.ts
C
cpwu 已提交
226 227 228
            tdSql.execute(f" insert into db.tb1 values({ts},{i},{i}.0)")
            tdSql.execute(f" insert into db.tb2 values({ts},{i},{i}.0)")

229

C
cpwu 已提交
230
        tdSql.query("select sum(tb1.c1), sum(tb2.c2) from db.tb1 tb1, db.tb2 tb2 where tb1.ts=tb2.ts")
231 232 233 234
        tdSql.checkRows(1)
        tdSql.checkData(0,0,45)
        tdSql.checkData(0,1,45.000000000)

G
Ganlin Zhao 已提交
235
        # group by
C
cpwu 已提交
236
        tdSql.execute(f"use {dbname} ")
237 238

        # partition by tbname or partition by tag
C
cpwu 已提交
239
        tdSql.query(f"select sum(c1) from {dbname}.stb1 partition by tbname")
240 241 242
        tdSql.checkRows(20)

        # nest query for support max
C
cpwu 已提交
243
        tdSql.query(f"select abs(c2+2)+1 from (select sum(c1) c2  from {dbname}.stb1)")
244
        tdSql.checkData(0,0,2595.000000000)
C
cpwu 已提交
245
        tdSql.query(f"select sum(c1+2)  as c2 from (select ts ,c1 ,c2  from {dbname}.stb1)")
246
        tdSql.checkData(0,0,2960.000000000)
C
cpwu 已提交
247
        tdSql.query(f"select sum(a+2)  as c2 from (select ts ,abs(c1) a ,c2  from {dbname}.stb1)")
248 249 250
        tdSql.checkData(0,0,2960.000000000)

        # mixup with other functions
C
cpwu 已提交
251
        tdSql.query(f"select max(c1),count(c1),last(c2,c3),sum(c1+c2) from {dbname}.stb1")
252 253 254 255 256 257 258 259 260 261 262 263 264
        tdSql.checkData(0,0,28)
        tdSql.checkData(0,1,184)
        tdSql.checkData(0,2,-99999)
        tdSql.checkData(0,3,-999)
        tdSql.checkData(0,4,28202310.000000000)

    def run(self):

        self.prepare_datas_of_distribute()
        self.check_distribute_datas()
        self.check_sum_status()
        self.distribute_agg_query()

G
Ganlin Zhao 已提交
265

266 267 268 269 270 271
    def stop(self):
        tdSql.close()
        tdLog.success("%s successfully executed" % __file__)

tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())