distribute_agg_stddev.py 12.5 KB
Newer Older
1 2 3 4 5 6 7 8 9
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
import random ,os ,sys
import platform
import math

class TDTestCase:
C
cpwu 已提交
10
    updatecfgDict = {"maxTablesPerVnode":2 ,"minTablesPerVnode":2,"tableIncStepPerVnode":2 }
11

12
    def init(self, conn, logSql, replicaVar=1):
13 14 15 16 17 18 19 20 21 22 23
        tdLog.debug("start to execute %s" % __file__)
        tdSql.init(conn.cursor())
        self.vnode_disbutes = None
        self.ts = 1537146000000


    def check_stddev_functions(self, tbname , col_name):

        stddev_sql = f"select stddev({col_name}) from {tbname};"

        same_sql = f"select {col_name} from {tbname} where {col_name} is not null "
G
Ganlin Zhao 已提交
24

25 26 27 28 29 30 31
        tdSql.query(same_sql)
        pre_data = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None]
        if (platform.system().lower() == 'windows' and pre_data.dtype == 'int32'):
            pre_data = np.array(pre_data, dtype = 'int64')
        pre_avg = np.sum(pre_data)/len(pre_data)

        # Calculate variance
G
Ganlin Zhao 已提交
32
        stddev_result = 0
33 34 35 36 37 38
        for num in tdSql.queryResult:
            stddev_result += (num-pre_avg)*(num-pre_avg)/len(tdSql.queryResult)

        stddev_result = math.sqrt(stddev_result)

        tdSql.query(stddev_sql)
G
Ganlin Zhao 已提交
39

W
wenzhouwww@live.cn 已提交
40 41 42 43
        if -0.0001 < tdSql.queryResult[0][0]-stddev_result < 0.0001:
            tdLog.info(" sql:%s; row:0 col:0 data:%d , expect:%d"%(stddev_sql,tdSql.queryResult[0][0],stddev_result))
        else:
            tdLog.exit(" sql:%s; row:0 col:0 data:%d , expect:%d"%(stddev_sql,tdSql.queryResult[0][0],stddev_result))
44

C
cpwu 已提交
45
    def prepare_datas_of_distribute(self, dbname="testdb"):
G
Ganlin Zhao 已提交
46

47
        # prepate datas for  20 tables distributed at different vgroups
C
cpwu 已提交
48 49
        tdSql.execute(f"create database if not exists {dbname} keep 3650 duration 1000 vgroups 5")
        tdSql.execute(f" use {dbname}")
50
        tdSql.execute(
C
cpwu 已提交
51
            f'''create table {dbname}.stb1
52 53 54 55 56 57
            (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
            tags (t0 timestamp, t1 int, t2 bigint, t3 smallint, t4 tinyint, t5 float, t6 double, t7 bool, t8 binary(16),t9 nchar(32))
            '''
        )

        tdSql.execute(
C
cpwu 已提交
58 59
            f'''
            create table {dbname}.t1
60 61 62 63
            (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
            '''
        )
        for i in range(20):
C
cpwu 已提交
64
            tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
65 66 67

        for i in range(9):
            tdSql.execute(
C
cpwu 已提交
68
                f"insert into {dbname}.ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
69 70
            )
            tdSql.execute(
C
cpwu 已提交
71
                f"insert into {dbname}.ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
72 73 74 75 76 77
            )

        for i in range(1,21):
            if i ==1 or i == 4:
                continue
            else:
C
cpwu 已提交
78
                tbname = f"ct{i}"
79 80
                for j in range(9):
                    tdSql.execute(
C
cpwu 已提交
81
                f"insert into {dbname}.{tbname} values ( now()-{(i+j)*10}s, {1*(j+i)}, {11111*(j+i)}, {111*(j+i)}, {11*(j)}, {1.11*(j+i)}, {11.11*(j+i)}, {(j+i)%2}, 'binary{j}', 'nchar{j}', now()+{1*j}a )"
82 83
            )

C
cpwu 已提交
84 85 86 87 88 89 90 91
        tdSql.execute(f"insert into {dbname}.ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")

        tdSql.execute(f"insert into {dbname}.ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL  ) ")
92 93

        tdSql.execute(
C
cpwu 已提交
94
            f'''insert into {dbname}.t1 values
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
            ( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
            ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a )
            ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a )
            ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a )
            ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a )
            ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
            ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a )
            ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a )
            ( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" )
            ( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" )
            ( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" )
            ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
            '''
        )

C
cpwu 已提交
110
        tdLog.info(f" prepare data for distributed_aggregate done! ")
111

C
cpwu 已提交
112
    def check_distribute_datas(self, dbname="testdb"):
113
        # get vgroup_ids of all
C
cpwu 已提交
114
        tdSql.query(f"show {dbname}.vgroups ")
115 116 117
        vgroups = tdSql.queryResult

        vnode_tables={}
G
Ganlin Zhao 已提交
118

119 120
        for vgroup_id in vgroups:
            vnode_tables[vgroup_id[0]]=[]
G
Ganlin Zhao 已提交
121

122
        # check sub_table of per vnode ,make sure sub_table has been distributed
X
Xiaoyu Wang 已提交
123
        tdSql.query(f"select * from information_schema.ins_tables where db_name = '{dbname}' and table_name like 'ct%'")
124 125 126
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
G
Ganlin Zhao 已提交
127
            vnode_tables[table_name[6]].append(table_name[0])
128 129 130 131 132 133 134
        self.vnode_disbutes = vnode_tables

        count = 0
        for k ,v in vnode_tables.items():
            if len(v)>=2:
                count+=1
        if count < 2:
C
cpwu 已提交
135
            tdLog.exit(f" the datas of all not satisfy sub_table has been distributed ")
136

C
cpwu 已提交
137
    def check_stddev_distribute_diff_vnode(self,col_name, dbname="testdb"):
G
Ganlin Zhao 已提交
138

139 140 141 142
        vgroup_ids = []
        for k ,v in self.vnode_disbutes.items():
            if len(v)>=2:
                vgroup_ids.append(k)
G
Ganlin Zhao 已提交
143

144
        distribute_tbnames = []
G
Ganlin Zhao 已提交
145

146 147 148 149 150 151 152 153
        for vgroup_id in vgroup_ids:
            vnode_tables = self.vnode_disbutes[vgroup_id]
            distribute_tbnames.append(random.sample(vnode_tables,1)[0])
        tbname_ins = ""
        for tbname in distribute_tbnames:
            tbname_ins += "'%s' ,"%tbname

        tbname_filters = tbname_ins[:-1]
G
Ganlin Zhao 已提交
154

C
cpwu 已提交
155
        stddev_sql = f"select stddev({col_name}) from {dbname}.stb1 where tbname in ({tbname_filters});"
156

C
cpwu 已提交
157
        same_sql = f"select {col_name}  from {dbname}.stb1 where tbname in ({tbname_filters}) and {col_name} is not null "
158 159 160 161 162 163 164 165

        tdSql.query(same_sql)
        pre_data = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None]
        if (platform.system().lower() == 'windows' and pre_data.dtype == 'int32'):
            pre_data = np.array(pre_data, dtype = 'int64')
        pre_avg = np.sum(pre_data)/len(pre_data)

        # Calculate variance
G
Ganlin Zhao 已提交
166
        stddev_result = 0
167 168 169 170 171 172 173 174
        for num in tdSql.queryResult:
            stddev_result += (num-pre_avg)*(num-pre_avg)/len(tdSql.queryResult)

        stddev_result = math.sqrt(stddev_result)

        tdSql.query(stddev_sql)
        tdSql.checkData(0,0,stddev_result)

C
cpwu 已提交
175
    def check_stddev_status(self, dbname="testdb"):
G
Ganlin Zhao 已提交
176 177
        # check max function work status

C
cpwu 已提交
178
        tdSql.query(f"show {dbname}.tables like 'ct%'")
179 180 181
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
C
cpwu 已提交
182
            tablenames.append(f"{dbname}.{table_name[0]}")
183

C
cpwu 已提交
184
        tdSql.query(f"desc {dbname}.stb1")
185
        col_names = tdSql.queryResult
G
Ganlin Zhao 已提交
186

187 188 189 190
        colnames = []
        for col_name in col_names:
            if col_name[1] in ["INT" ,"BIGINT" ,"SMALLINT" ,"TINYINT" , "FLOAT" ,"DOUBLE"]:
                colnames.append(col_name[0])
G
Ganlin Zhao 已提交
191

192 193
        for tablename in tablenames:
            for colname in colnames:
W
wenzhouwww@live.cn 已提交
194 195
                if colname.startswith("c"):
                    self.check_stddev_functions(tablename,colname)
196

G
Ganlin Zhao 已提交
197
        # check max function for different vnode
198 199 200 201

        for colname in colnames:
            if colname.startswith("c"):
                self.check_stddev_distribute_diff_vnode(colname)
G
Ganlin Zhao 已提交
202

C
cpwu 已提交
203
    def distribute_agg_query(self, dbname="testdb"):
204
        # basic filter
C
cpwu 已提交
205
        tdSql.query(f"select stddev(c1) from {dbname}.stb1 ")
206 207
        tdSql.checkData(0,0,6.694663959)

C
cpwu 已提交
208
        tdSql.query(f"select stddev(a) from (select stddev(c1) a  from {dbname}.stb1 partition by tbname) ")
209 210
        tdSql.checkData(0,0,0.156797505)

C
cpwu 已提交
211
        tdSql.query(f"select stddev(c1) from {dbname}.stb1 where t1=1")
212 213
        tdSql.checkData(0,0,2.581988897)

C
cpwu 已提交
214
        tdSql.query(f"select stddev(c1+c2) from {dbname}.stb1 where c1 =1 ")
215 216
        tdSql.checkData(0,0,0.000000000)

C
cpwu 已提交
217
        tdSql.query(f"select stddev(c1) from {dbname}.stb1 where tbname=\"ct2\"")
218 219
        tdSql.checkData(0,0,2.581988897)

C
cpwu 已提交
220
        tdSql.query(f"select stddev(c1) from {dbname}.stb1 partition by tbname")
221 222
        tdSql.checkRows(20)

C
cpwu 已提交
223
        tdSql.query(f"select stddev(c1) from {dbname}.stb1 where t1> 4  partition by tbname")
224 225
        tdSql.checkRows(15)

G
Ganlin Zhao 已提交
226
        # union all
C
cpwu 已提交
227
        tdSql.query(f"select stddev(c1) from {dbname}.stb1 union all select stddev(c1) from {dbname}.stb1 ")
228 229 230
        tdSql.checkRows(2)
        tdSql.checkData(0,0,6.694663959)

C
cpwu 已提交
231
        tdSql.query(f"select stddev(a) from (select stddev(c1) a from {dbname}.stb1 union all select stddev(c1) a  from {dbname}.stb1)")
232 233 234
        tdSql.checkRows(1)
        tdSql.checkData(0,0,0.000000000)

G
Ganlin Zhao 已提交
235
        # join
236 237 238

        tdSql.execute(" create database if not exists db ")
        tdSql.execute(" use db ")
C
cpwu 已提交
239 240 241
        tdSql.execute(" create stable db.st (ts timestamp , c1 int ,c2 float) tags(t1 int) ")
        tdSql.execute(" create table db.tb1 using db.st tags(1) ")
        tdSql.execute(" create table db.tb2 using db.st tags(2) ")
242

G
Ganlin Zhao 已提交
243

244 245
        for i in range(10):
            ts = i*10 + self.ts
C
cpwu 已提交
246 247
            tdSql.execute(f" insert into db.tb1 values({ts},{i},{i}.0)")
            tdSql.execute(f" insert into db.tb2 values({ts},{i},{i}.0)")
248

C
cpwu 已提交
249
        tdSql.query("select stddev(tb1.c1), stddev(tb2.c2) from db.tb1 tb1, db.tb2 tb2 where tb1.ts=tb2.ts")
250 251 252 253
        tdSql.checkRows(1)
        tdSql.checkData(0,0,2.872281323)
        tdSql.checkData(0,1,2.872281323)

G
Ganlin Zhao 已提交
254
        # group by
C
cpwu 已提交
255
        tdSql.execute(f" use {dbname} ")
256 257

        # partition by tbname or partition by tag
C
cpwu 已提交
258
        tdSql.query(f"select stddev(c1) from {dbname}.stb1 partition by tbname")
259 260 261
        tdSql.checkRows(20)

        # nest query for support max
C
cpwu 已提交
262
        tdSql.query(f"select stddev(c2+2)+1 from (select stddev(c1) c2  from {dbname}.stb1)")
263
        tdSql.checkData(0,0,1.000000000)
C
cpwu 已提交
264
        tdSql.query(f"select stddev(c1+2)  as c2 from (select ts ,c1 ,c2  from {dbname}.stb1)")
265
        tdSql.checkData(0,0,6.694663959)
C
cpwu 已提交
266
        tdSql.query(f"select stddev(a+2)  as c2 from (select ts ,abs(c1) a ,c2  from {dbname}.stb1)")
267 268 269
        tdSql.checkData(0,0,6.694663959)

        # mixup with other functions
C
cpwu 已提交
270
        tdSql.query(f"select max(c1),count(c1),last(c2,c3),sum(c1+c2),avg(c1),stddev(c1) from {dbname}.stb1")
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
        tdSql.checkData(0,0,28)
        tdSql.checkData(0,1,184)
        tdSql.checkData(0,2,-99999)
        tdSql.checkData(0,3,-999)
        tdSql.checkData(0,4,28202310.000000000)
        tdSql.checkData(0,5,14.086956522)
        tdSql.checkData(0,6,6.694663959)

    def run(self):

        self.prepare_datas_of_distribute()
        self.check_distribute_datas()
        self.check_stddev_status()
        self.distribute_agg_query()

G
Ganlin Zhao 已提交
286

287 288 289 290 291 292
    def stop(self):
        tdSql.close()
        tdLog.success("%s successfully executed" % __file__)

tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())