distribute_agg_apercentile.py 7.4 KB
Newer Older
1 2 3 4
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
C
cpwu 已提交
5
import random
6 7 8 9


class TDTestCase:

C
cpwu 已提交
10
    updatecfgDict = {"maxTablesPerVnode":2 ,"minTablesPerVnode":2,"tableIncStepPerVnode":2 }
11 12 13 14 15 16
    def init(self, conn, logSql):
        tdLog.debug("start to execute %s" % __file__)
        tdSql.init(conn.cursor())
        self.vnode_disbutes = None
        self.ts = 1537146000000

C
cpwu 已提交
17 18
    def prepare_datas_of_distribute(self, dbname="testdb"):

19
        # prepate datas for  20 tables distributed at different vgroups
C
cpwu 已提交
20 21
        tdSql.execute(f"create database if not exists {dbname} keep 3650 duration 1000 vgroups 5")
        tdSql.execute(f" use {dbname} ")
22
        tdSql.execute(
C
cpwu 已提交
23
            f'''create table {dbname}.stb1
24 25 26 27 28 29
            (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
            tags (t0 timestamp, t1 int, t2 bigint, t3 smallint, t4 tinyint, t5 float, t6 double, t7 bool, t8 binary(16),t9 nchar(32))
            '''
        )

        for i in range(20):
C
cpwu 已提交
30
            tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
31 32 33

        for i in range(9):
            tdSql.execute(
C
cpwu 已提交
34
                f"insert into {dbname}.ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
35 36
            )
            tdSql.execute(
C
cpwu 已提交
37
                f"insert into {dbname}.ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
38 39 40 41 42 43
            )

        for i in range(1,21):
            if i ==1 or i == 4:
                continue
            else:
C
cpwu 已提交
44
                tbname = f"{dbname}.ct{i}"
45 46 47 48
                for j in range(9):
                    tdSql.execute(
                f"insert into {tbname} values ( now()-{(i+j)*10}s, {1*(j+i)}, {11111*(j+i)}, {111*(j+i)}, {11*(j)}, {1.11*(j+i)}, {11.11*(j+i)}, {(j+i)%2}, 'binary{j}', 'nchar{j}', now()+{1*j}a )"
            )
C
cpwu 已提交
49 50 51 52
        tdSql.execute(f"insert into {dbname}.ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )")
        tdSql.execute(f"insert into {dbname}.ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
53

C
cpwu 已提交
54 55 56
        tdSql.execute(f"insert into {dbname}.ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
        tdSql.execute(f"insert into {dbname}.ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL  ) ")
57 58 59

        tdLog.info(" prepare data for distributed_aggregate done! ")

C
cpwu 已提交
60
    def check_distribute_datas(self, dbname="testdb"):
61
        # get vgroup_ids of all
C
cpwu 已提交
62
        tdSql.query(f"show {dbname}.vgroups ")
63 64 65
        vgroups = tdSql.queryResult

        vnode_tables={}
C
cpwu 已提交
66

67 68
        for vgroup_id in vgroups:
            vnode_tables[vgroup_id[0]]=[]
C
cpwu 已提交
69

70 71

        # check sub_table of per vnode ,make sure sub_table has been distributed
X
Xiaoyu Wang 已提交
72
        tdSql.query(f"select * from information_schema.ins_tables where db_name = '{dbname}' and table_name like 'ct%'")
73 74 75
        table_names = tdSql.queryResult
        tablenames = []
        for table_name in table_names:
C
cpwu 已提交
76
            vnode_tables[table_name[6]].append(table_name[0])
77 78 79 80 81 82 83 84
        self.vnode_disbutes = vnode_tables

        count = 0
        for k ,v in vnode_tables.items():
            if len(v)>=2:
                count+=1
        if count < 2:
            tdLog.exit(" the datas of all not satisfy sub_table has been distributed ")
C
cpwu 已提交
85 86

    def distribute_agg_query(self, dbname="testdb"):
87
        # basic filter
C
cpwu 已提交
88
        tdSql.query(f"select apercentile(c1 , 20) from {dbname}.stb1 where c1 is null")
89
        tdSql.checkRows(1)
90

C
cpwu 已提交
91
        tdSql.query(f"select apercentile(c1 , 20) from {dbname}.stb1 where t1=1")
92 93
        tdSql.checkData(0,0,2.800000000)

C
cpwu 已提交
94
        tdSql.query(f"select apercentile(c1+c2 ,100) from {dbname}.stb1 where c1 =1 ")
95 96
        tdSql.checkData(0,0,11112.000000000)

C
cpwu 已提交
97
        tdSql.query(f"select apercentile(c1 ,10 ) from {dbname}.stb1 where tbname=\"ct2\"")
98 99
        tdSql.checkData(0,0,2.000000000)

C
cpwu 已提交
100
        tdSql.query(f"select apercentile(c1,20) from {dbname}.stb1 partition by tbname")
101 102
        tdSql.checkRows(20)

C
cpwu 已提交
103
        tdSql.query(f"select apercentile(c1,20) from {dbname}.stb1 where t1> 4  partition by tbname")
104 105
        tdSql.checkRows(15)

C
cpwu 已提交
106 107
        # union all
        tdSql.query(f"select apercentile(c1,20) from {dbname}.stb1 union all select apercentile(c1,20) from {dbname}.stb1 ")
108 109 110
        tdSql.checkRows(2)
        tdSql.checkData(0,0,7.389181281)

C
cpwu 已提交
111
        # join
112 113 114

        tdSql.execute(" create database if not exists db ")
        tdSql.execute(" use db ")
C
cpwu 已提交
115 116 117 118
        tdSql.execute(" create stable db.st (ts timestamp , c1 int ,c2 float) tags(t1 int) ")
        tdSql.execute(" create table db.tb1 using db.st tags(1) ")
        tdSql.execute(" create table db.tb2 using db.st tags(2) ")

119 120 121

        for i in range(10):
            ts = i*10 + self.ts
C
cpwu 已提交
122 123
            tdSql.execute(f" insert into db.tb1 values({ts},{i},{i}.0)")
            tdSql.execute(f" insert into db.tb2 values({ts},{i},{i}.0)")
124

C
cpwu 已提交
125
        tdSql.query(f"select apercentile(tb1.c1,100), apercentile(tb2.c2,100) from db.tb1 tb1, db.tb2 tb2 where tb1.ts=tb2.ts")
126 127 128 129
        tdSql.checkRows(1)
        tdSql.checkData(0,0,9.000000000)
        tdSql.checkData(0,0,9.000000000)

C
cpwu 已提交
130 131 132
        # group by
        tdSql.execute(f"use {dbname} ")
        tdSql.query(f" select max(c1),c1  from {dbname}.stb1 group by t1 ")
133
        tdSql.checkRows(20)
C
cpwu 已提交
134
        tdSql.query(f" select max(c1),c1  from {dbname}.stb1 group by c1 ")
135
        tdSql.checkRows(30)
C
cpwu 已提交
136
        tdSql.query(f" select max(c1),c2  from {dbname}.stb1 group by c2 ")
137 138 139
        tdSql.checkRows(31)

        # partition by tbname or partition by tag
C
cpwu 已提交
140
        tdSql.query(f"select apercentile(c1 ,10)from {dbname}.stb1 partition by tbname")
141 142 143
        query_data = tdSql.queryResult

        # nest query for support max
G
Ganlin Zhao 已提交
144 145
        #tdSql.query(f"select apercentile(c2+2,10)+1 from (select max(c1) c2  from {dbname}.stb1)")
        #tdSql.checkData(0,0,31.000000000)
C
cpwu 已提交
146
        tdSql.query(f"select apercentile(c1+2,10)+1  as c2 from (select ts ,c1 ,c2  from {dbname}.stb1)")
147
        tdSql.checkData(0,0,7.560701700)
C
cpwu 已提交
148
        tdSql.query(f"select apercentile(a+2,10)+1  as c2 from (select ts ,abs(c1) a ,c2  from {dbname}.stb1)")
149 150 151
        tdSql.checkData(0,0,7.560701700)

        # mixup with other functions
C
cpwu 已提交
152
        tdSql.query(f"select max(c1),count(c1),last(c2,c3),spread(c1), apercentile(c1,10) from {dbname}.stb1")
153 154 155 156 157 158 159 160 161 162 163 164 165
        tdSql.checkData(0,0,28)
        tdSql.checkData(0,1,184)
        tdSql.checkData(0,2,-99999)
        tdSql.checkData(0,3,-999)
        tdSql.checkData(0,4,28.000000000)
        tdSql.checkData(0,5,4.560701700)

    def run(self):

        self.prepare_datas_of_distribute()
        self.check_distribute_datas()
        self.distribute_agg_query()

C
cpwu 已提交
166

167 168 169 170 171 172
    def stop(self):
        tdSql.close()
        tdLog.success("%s successfully executed" % __file__)

tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())