distribute_agg_apercentile.py 7.3 KB
Newer Older
1 2 3 4
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
C
cpwu 已提交
5
import random
6 7 8 9 10 11 12 13 14 15


class TDTestCase:

    def init(self, conn, logSql):
        tdLog.debug("start to execute %s" % __file__)
        tdSql.init(conn.cursor())
        self.vnode_disbutes = None
        self.ts = 1537146000000

C
cpwu 已提交
16 17
    def prepare_datas_of_distribute(self, dbname="testdb"):

18
        # prepate datas for  20 tables distributed at different vgroups
C
cpwu 已提交
19 20
        tdSql.execute(f"create database if not exists {dbname} keep 3650 duration 1000 vgroups 5")
        tdSql.execute(f" use {dbname} ")
21
        tdSql.execute(
C
cpwu 已提交
22
            f'''create table {dbname}.stb1
23 24 25 26 27 28
            (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
            tags (t0 timestamp, t1 int, t2 bigint, t3 smallint, t4 tinyint, t5 float, t6 double, t7 bool, t8 binary(16),t9 nchar(32))
            '''
        )

        for i in range(20):
C
cpwu 已提交
29
            tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
30 31 32

        for i in range(9):
            tdSql.execute(
C
cpwu 已提交
33
                f"insert into {dbname}.ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
34 35
            )
            tdSql.execute(
C
cpwu 已提交
36
                f"insert into {dbname}.ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
37 38 39 40 41 42
            )

        for i in range(1,21):
            if i ==1 or i == 4:
                continue
            else:
C
cpwu 已提交
43
                tbname = f"{dbname}.ct{i}"
44 45 46 47
                for j in range(9):
                    tdSql.execute(
                f"insert into {tbname} values ( now()-{(i+j)*10}s, {1*(j+i)}, {11111*(j+i)}, {111*(j+i)}, {11*(j)}, {1.11*(j+i)}, {11.11*(j+i)}, {(j+i)%2}, 'binary{j}', 'nchar{j}', now()+{1*j}a )"
            )
C
cpwu 已提交
48 49 50 51
        tdSql.execute(f"insert into {dbname}.ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
52

C
cpwu 已提交
53 54 55
        tdSql.execute(f"insert into {dbname}.ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL  ) ")
56 57 58

        tdLog.info(" prepare data for distributed_aggregate done! ")

C
cpwu 已提交
59
    def check_distribute_datas(self, dbname="testdb"):
60
        # get vgroup_ids of all
C
cpwu 已提交
61
        tdSql.query(f"show {dbname}.vgroups ")
62 63 64
        vgroups = tdSql.queryResult

        vnode_tables={}
C
cpwu 已提交
65

66 67
        for vgroup_id in vgroups:
            vnode_tables[vgroup_id[0]]=[]
C
cpwu 已提交
68

69 70

        # check sub_table of per vnode ,make sure sub_table has been distributed
C
cpwu 已提交
71
        tdSql.query(f"show {dbname}.tables like 'ct%'")
72 73 74
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
C
cpwu 已提交
75
            vnode_tables[table_name[6]].append(table_name[0])
76 77 78 79 80 81 82 83
        self.vnode_disbutes = vnode_tables

        count = 0
        for k ,v in vnode_tables.items():
            if len(v)>=2:
                count+=1
        if count < 2:
            tdLog.exit(" the datas of all not satisfy sub_table has been distributed ")
C
cpwu 已提交
84 85

    def distribute_agg_query(self, dbname="testdb"):
86
        # basic filter
C
cpwu 已提交
87
        tdSql.query(f"select apercentile(c1 , 20) from {dbname}.stb1 where c1 is null")
88 89
        tdSql.checkRows(0)

C
cpwu 已提交
90
        tdSql.query(f"select apercentile(c1 , 20) from {dbname}.stb1 where t1=1")
91 92
        tdSql.checkData(0,0,2.800000000)

C
cpwu 已提交
93
        tdSql.query(f"select apercentile(c1+c2 ,100) from {dbname}.stb1 where c1 =1 ")
94 95
        tdSql.checkData(0,0,11112.000000000)

C
cpwu 已提交
96
        tdSql.query(f"select apercentile(c1 ,10 ) from {dbname}.stb1 where tbname=\"ct2\"")
97 98
        tdSql.checkData(0,0,2.000000000)

C
cpwu 已提交
99
        tdSql.query(f"select apercentile(c1,20) from {dbname}.stb1 partition by tbname")
100 101
        tdSql.checkRows(20)

C
cpwu 已提交
102
        tdSql.query(f"select apercentile(c1,20) from {dbname}.stb1 where t1> 4  partition by tbname")
103 104
        tdSql.checkRows(15)

C
cpwu 已提交
105 106
        # union all
        tdSql.query(f"select apercentile(c1,20) from {dbname}.stb1 union all select apercentile(c1,20) from {dbname}.stb1 ")
107 108 109
        tdSql.checkRows(2)
        tdSql.checkData(0,0,7.389181281)

C
cpwu 已提交
110
        # join
111 112 113

        tdSql.execute(" create database if not exists db ")
        tdSql.execute(" use db ")
C
cpwu 已提交
114 115 116 117
        tdSql.execute(" create stable db.st (ts timestamp , c1 int ,c2 float) tags(t1 int) ")
        tdSql.execute(" create table db.tb1 using db.st tags(1) ")
        tdSql.execute(" create table db.tb2 using db.st tags(2) ")

118 119 120

        for i in range(10):
            ts = i*10 + self.ts
C
cpwu 已提交
121 122
            tdSql.execute(f" insert into db.tb1 values({ts},{i},{i}.0)")
            tdSql.execute(f" insert into db.tb2 values({ts},{i},{i}.0)")
123

C
cpwu 已提交
124
        tdSql.query(f"select apercentile(tb1.c1,100), apercentile(tb2.c2,100) from db.tb1 tb1, db.tb2 tb2 where tb1.ts=tb2.ts")
125 126 127 128
        tdSql.checkRows(1)
        tdSql.checkData(0,0,9.000000000)
        tdSql.checkData(0,0,9.000000000)

C
cpwu 已提交
129 130 131
        # group by
        tdSql.execute(f"use {dbname} ")
        tdSql.query(f" select max(c1),c1  from {dbname}.stb1 group by t1 ")
132
        tdSql.checkRows(20)
C
cpwu 已提交
133
        tdSql.query(f" select max(c1),c1  from {dbname}.stb1 group by c1 ")
134
        tdSql.checkRows(30)
C
cpwu 已提交
135
        tdSql.query(f" select max(c1),c2  from {dbname}.stb1 group by c2 ")
136 137 138
        tdSql.checkRows(31)

        # partition by tbname or partition by tag
C
cpwu 已提交
139
        tdSql.query(f"select apercentile(c1 ,10)from {dbname}.stb1 partition by tbname")
140 141 142
        query_data = tdSql.queryResult

        # nest query for support max
C
cpwu 已提交
143
        tdSql.query(f"select apercentile(c2+2,10)+1 from (select max(c1) c2  from {dbname}.stb1)")
144
        tdSql.checkData(0,0,31.000000000)
C
cpwu 已提交
145
        tdSql.query(f"select apercentile(c1+2,10)+1  as c2 from (select ts ,c1 ,c2  from {dbname}.stb1)")
146
        tdSql.checkData(0,0,7.560701700)
C
cpwu 已提交
147
        tdSql.query(f"select apercentile(a+2,10)+1  as c2 from (select ts ,abs(c1) a ,c2  from {dbname}.stb1)")
148 149 150
        tdSql.checkData(0,0,7.560701700)

        # mixup with other functions
C
cpwu 已提交
151
        tdSql.query(f"select max(c1),count(c1),last(c2,c3),spread(c1), apercentile(c1,10) from {dbname}.stb1")
152 153 154 155 156 157 158 159 160 161 162 163 164
        tdSql.checkData(0,0,28)
        tdSql.checkData(0,1,184)
        tdSql.checkData(0,2,-99999)
        tdSql.checkData(0,3,-999)
        tdSql.checkData(0,4,28.000000000)
        tdSql.checkData(0,5,4.560701700)

    def run(self):

        self.prepare_datas_of_distribute()
        self.check_distribute_datas()
        self.distribute_agg_query()

C
cpwu 已提交
165

166 167 168 169 170 171
    def stop(self):
        tdSql.close()
        tdLog.success("%s successfully executed" % __file__)

tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())