create_retentions.py 15.8 KB
Newer Older
C
cpwu 已提交
1 2
from datetime import datetime
import time
C
cpwu 已提交
3 4 5 6 7

from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
C
cpwu 已提交
8
from util.common import *
C
cpwu 已提交
9 10 11

PRIMARY_COL = "ts"

C
cpwu 已提交
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
INT_COL = "c_int"
BINT_COL = "c_bint"
SINT_COL = "c_sint"
TINT_COL = "c_tint"
FLOAT_COL = "c_float"
DOUBLE_COL = "c_double"
BOOL_COL = "c_bool"
TINT_UN_COL = "c_utint"
SINT_UN_COL = "c_usint"
BINT_UN_COL = "c_ubint"
INT_UN_COL = "c_uint"
BINARY_COL = "c_binary"
NCHAR_COL = "c_nchar"
TS_COL = "c_ts"

INT_TAG = "t_int"

TAG_COL = [INT_TAG]
C
cpwu 已提交
30

C
cpwu 已提交
31 32
## insert data args:
TIME_STEP = 10000
C
cpwu 已提交
33
NOW = int(datetime.timestamp(datetime.now()) * 1000)
C
cpwu 已提交
34

C
cpwu 已提交
35 36
# init db/table
DBNAME  = "db"
C
cpwu 已提交
37 38 39 40
DB1     = "db1"
DB2     = "db2"
DB3     = "db3"
DB4     = "db4"
C
cpwu 已提交
41 42 43 44
STBNAME = "stb1"
CTBNAME = "ct1"
NTBNAME = "nt1"

C
cpwu 已提交
45 46 47 48
class TDTestCase:

    def init(self, conn, logSql):
        tdLog.debug(f"start to excute {__file__}")
C
cpwu 已提交
49
        tdSql.init(conn.cursor(), False)
C
cpwu 已提交
50 51 52 53

    @property
    def create_databases_sql_err(self):
        return [
C
cpwu 已提交
54 55 56 57 58 59 60
            "create database db1 retentions 0s:1d",
            "create database db3 retentions 1s:0d",
            "create database db1 retentions 1s:1y",
            "create database db1 retentions 1s:1n",
            "create database db2 retentions 1w:1d ;",
            "create database db5 retentions 1s:1d,3s:3d,2s:2d",
            "create database db1 retentions 1s:1n,2s:2d,3s:3d,4s:4d",
C
cpwu 已提交
61 62 63 64 65
        ]

    @property
    def create_databases_sql_current(self):
        return [
C
cpwu 已提交
66 67
            f"create database {DB1} retentions 1s:1d",
            f"create database {DB2} retentions 1s:1d,2m:2d,3h:3d",
C
cpwu 已提交
68 69 70 71 72 73 74 75 76 77
        ]

    @property
    def alter_database_sql(self):
        return [
            "alter database db1 retentions 99h:99d",
            "alter database db2 retentions 97h:97d,98h:98d,99h:99d,",
        ]

    @property
C
cpwu 已提交
78
    def create_stable_sql_err(self, dbname=DB2):
C
cpwu 已提交
79
        return [
C
cpwu 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93
            f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(ceil) watermark 1s max_delay 1m",
            f"create stable {dbname}.stb12 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(count) watermark  1min",
            f"create stable {dbname}.stb13 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay -1s",
            f"create stable {dbname}.stb14 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark -1m",
            f"create stable {dbname}.stb15 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) watermark 1m ",
            f"create stable {dbname}.stb16 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) max_delay 1m ",
            f"create stable {dbname}.stb21 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} binary(16)) tags (tag1 int) rollup(avg) watermark 1s",
            f"create stable {dbname}.stb22 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) tags (tag1 int) rollup(avg) max_delay 1m",
            f"create table  {dbname}.ntb_1 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) rollup(avg) watermark 1s max_delay 1s",
            f"create table  {dbname}.ntb_2 ({PRIMARY_COL} timestamp, {INT_COL} int) " ,
            f"create stable {dbname}.stb23 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) tags (tag1 int) " ,
            f"create stable {dbname}.stb24 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) " ,
            f"create stable {dbname}.stb25 ({PRIMARY_COL} timestamp, {INT_COL} int) " ,
            f"create stable {dbname}.stb26 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) " ,
C
cpwu 已提交
94

C
cpwu 已提交
95
            # watermark, max_delay: [0, 900000], [ms, s, m, ?]
C
cpwu 已提交
96 97 98 99 100 101 102 103
            f"create stable stb17 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 1u",
            f"create stable stb18 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 1b",
            f"create stable stb19 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 900001ms",
            f"create stable stb20 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 16m",
            f"create stable stb27 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 901s",
            f"create stable stb28 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 1h",
            f"create stable stb29 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 0.2h",
            f"create stable stb30 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 0.002d",
C
cpwu 已提交
104

C
cpwu 已提交
105 106 107
        ]

    @property
C
cpwu 已提交
108
    def create_stable_sql_current(self):
C
cpwu 已提交
109
        return [
C
cpwu 已提交
110
            f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(avg)",
C
cpwu 已提交
111 112 113
            f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 5s max_delay 1m",
            f"create stable stb3 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(max) watermark 5s max_delay 1m",
            f"create stable stb4 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(sum) watermark 5s max_delay 1m",
C
cpwu 已提交
114 115 116
            f"create stable stb5 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(last) watermark 5s max_delay 1m",
            f"create stable stb6 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(first) watermark 5s max_delay 1m",
            f"create stable stb7 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(first) watermark 5s max_delay 1m sma({INT_COL})",
C
cpwu 已提交
117 118
        ]

C
cpwu 已提交
119
    def test_create_stb(self, db=DB2):
C
cpwu 已提交
120
        tdSql.execute(f"use {db}")
C
cpwu 已提交
121 122 123 124 125
        for err_sql in self.create_stable_sql_err:
            tdSql.error(err_sql)
        for cur_sql in self.create_stable_sql_current:
            tdSql.execute(cur_sql)
        tdSql.query("show stables")
C
cpwu 已提交
126
        # assert "rollup" in tdSql.description
C
cpwu 已提交
127
        tdSql.checkRows(len(self.create_stable_sql_current))
C
cpwu 已提交
128

C
cpwu 已提交
129
        tdSql.execute("use db")  # because db is a noraml database, not a rollup database, should not be able to create a rollup stable
C
cpwu 已提交
130
        tdSql.error(f"create stable db.nor_db_rollup_stb ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) watermark 5s max_delay 1m")
C
cpwu 已提交
131

C
cpwu 已提交
132 133 134 135 136 137

    def test_create_databases(self):
        for err_sql in self.create_databases_sql_err:
            tdSql.error(err_sql)
        for cur_sql in self.create_databases_sql_current:
            tdSql.execute(cur_sql)
C
cpwu 已提交
138
            # tdSql.query("show databases")
C
cpwu 已提交
139 140 141 142 143
        for alter_sql in self.alter_database_sql:
            tdSql.error(alter_sql)

    def all_test(self):
        self.test_create_databases()
C
cpwu 已提交
144
        self.test_create_stb()
C
cpwu 已提交
145

C
cpwu 已提交
146
    def __create_tb(self, stb=STBNAME, ctb_num=20, ntbnum=1, rsma=False, dbname=DBNAME, rsma_type="sum"):
C
cpwu 已提交
147
        tdLog.printNoPrefix("==========step: create table")
C
cpwu 已提交
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
        if rsma:
            if rsma_type.lower().strip() in ("last", "first"):
                create_stb_sql = f'''create table {dbname}.{stb}(
                        ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
                        {FLOAT_COL} float, {DOUBLE_COL} double, {TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
                        {INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned, {BINARY_COL} binary(16)
                    ) tags ({INT_TAG} int) rollup({rsma_type}) watermark 5s,5s max_delay 5s,5s
                    '''
            else:
                create_stb_sql = f'''create table {dbname}.{stb}(
                        ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
                        {FLOAT_COL} float, {DOUBLE_COL} double, {TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
                        {INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
                    ) tags ({INT_TAG} int) rollup({rsma_type}) watermark 5s,5s max_delay 5s,5s
                    '''
            tdSql.execute(create_stb_sql)
        else:
            create_stb_sql = f'''create table {dbname}.{stb}(
C
cpwu 已提交
166 167 168 169 170
                    ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
                    {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
                    {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
                    {TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
                    {INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
C
cpwu 已提交
171
                ) tags ({INT_TAG} int)
C
cpwu 已提交
172
                '''
C
cpwu 已提交
173 174 175 176 177 178 179 180 181 182 183 184
            tdSql.execute(create_stb_sql)

            for i in range(ntbnum):
                create_ntb_sql = f'''create table {dbname}.nt{i+1}(
                        ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
                        {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
                        {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
                        {TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
                        {INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
                    )
                    '''
                tdSql.execute(create_ntb_sql)
C
cpwu 已提交
185

C
cpwu 已提交
186
        for i in range(ctb_num):
C
cpwu 已提交
187
            tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.{stb} tags ( {i+1} )')
C
cpwu 已提交
188

C
cpwu 已提交
189 190 191 192 193
    def __insert_data(self, rows, ctb_num=20, dbname=DBNAME, rsma=False, rsma_type="sum"):
        tdLog.printNoPrefix("==========step: start inser data into tables now.....")
        # from ...pytest.util.common import DataSet
        data = DataSet()
        data.get_order_set(rows)
C
cpwu 已提交
194

C
cpwu 已提交
195
        for i in range(rows):
C
cpwu 已提交
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
            if rsma:
                if rsma_type.lower().strip() in ("last", "first"):
                    row_data = f'''
                        {data.int_data[i]}, {data.bint_data[i]}, {data.sint_data[i]}, {data.tint_data[i]}, {data.float_data[i]}, {data.double_data[i]},
                        {data.utint_data[i]}, {data.usint_data[i]}, {data.uint_data[i]}, {data.ubint_data[i]}, '{data.vchar_data[i]}'
                    '''
                else:
                    row_data = f'''
                        {data.int_data[i]}, {data.bint_data[i]}, {data.sint_data[i]}, {data.tint_data[i]}, {data.float_data[i]}, {data.double_data[i]},
                        {data.utint_data[i]}, {data.usint_data[i]}, {data.uint_data[i]}, {data.ubint_data[i]}
                    '''
            else:
                row_data = f'''
                    {data.int_data[i]}, {data.bint_data[i]}, {data.sint_data[i]}, {data.tint_data[i]}, {data.float_data[i]}, {data.double_data[i]},
                    {data.bool_data[i]}, '{data.vchar_data[i]}', '{data.nchar_data[i]}', {data.ts_data[i]}, {data.utint_data[i]},
                    {data.usint_data[i]}, {data.uint_data[i]}, {data.ubint_data[i]}
                '''
                tdSql.execute( f"insert into {dbname}.{NTBNAME} values ( {NOW - i * int(TIME_STEP * 1.2)}, {row_data} )" )


            for j in range(ctb_num):
                tdSql.execute( f"insert into {dbname}.ct{j+1} values ( {NOW - i * TIME_STEP}, {row_data} )" )
C
cpwu 已提交
218 219 220 221


    def run(self):
        self.rows = 10
C
cpwu 已提交
222
        tdSql.prepare(dbname=DBNAME)
C
cpwu 已提交
223

C
cpwu 已提交
224
        tdLog.printNoPrefix("==========step0:all check")
C
cpwu 已提交
225
        self.all_test()
C
cpwu 已提交
226 227 228 229

        tdLog.printNoPrefix("==========step1:create table in normal database")
        tdSql.prepare()
        self.__create_tb()
C
cpwu 已提交
230
        self.__insert_data(rows=self.rows)
C
cpwu 已提交
231 232

        tdLog.printNoPrefix("==========step2:create table in rollup database")
C
cpwu 已提交
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
        tdLog.printNoPrefix("==========step2.1 : rolluo func is not last/first")
        tdSql.prepare(dbname=DB3, **{"retentions": "1s:1d, 3s:3d, 5s:5d"})

        db3_ctb_num = 10
        self.__create_tb(rsma=True, dbname=DB3, ctb_num=db3_ctb_num, stb=STBNAME)
        self.__insert_data(rows=self.rows, rsma=True, dbname=DB3, ctb_num=db3_ctb_num)
        time.sleep(6)
        tdSql.query(f"select count(*) from {DB3}.{STBNAME} where ts > now()-5m")
        tdSql.checkRows(1)
        tdSql.checkData(0, 0, self.rows * db3_ctb_num)
        tdSql.execute(f"flush database {DB3}")
        tdSql.query(f"select count(*) from {DB3}.{STBNAME} where ts > now()-5m")
        tdSql.checkData(0, 0, self.rows * db3_ctb_num)
        tdSql.checkRows(1)
        tdSql.query(f"select {INT_COL} from {DB3}.{CTBNAME} where ts > now()-4d")
        tdSql.checkData(0, 0, self.rows-1)
        tdSql.query(f"select {INT_COL} from {DB3}.{CTBNAME} where ts > now()-6d")
        tdSql.checkData(0, 0, self.rows-1)

        # from ...pytest.util.sql import tdSql

        tdLog.printNoPrefix("==========step2.1.1 : alter stb schemaL drop column")
        tdSql.query(f"select {BINT_COL} from {DB3}.{STBNAME}")
        tdSql.execute(f"alter stable {DB3}.stb1 drop column {BINT_COL}")
        # TODO not support alter stable schema anymore
        # tdSql.error(f"alter stable {DB3}.stb1 drop column {BINT_COL}")
        tdSql.error(f"select {BINT_COL} from {DB3}.{STBNAME}")


        tdLog.printNoPrefix("==========step2.1.2 : alter stb schemaL add num_column")
        # TODO not support alter stable schema anymore
        # tdSql.error(f"alter stable {DB3}.stb1 add column {INT_COL}_1 int")
        tdSql.error(f"select {INT_COL}_1 from {DB3}.{STBNAME}")
        tdSql.execute(f"alter stable {DB3}.stb1 add column {INT_COL}_1 int")
        tdSql.query(f"select count({INT_COL}_1) from {DB3}.{STBNAME} where _c0 > now-5m")
        tdSql.checkData(0, 0, 0)
        tdSql.execute(f"insert into {DB3}.{CTBNAME} ({PRIMARY_COL}, {INT_COL}, {INT_COL}_1) values({NOW}+20s, 111, 112)")
        time.sleep(7)
        tdSql.query(f"select _rowts, {INT_COL}, {INT_COL}_1 from {DB3}.{CTBNAME} where _c0 > now()-1h and _c0>{NOW}")
        tdSql.checkRows(1)
        tdSql.checkData(0, 1, 111)
        tdSql.checkData(0, 2, 112)

        tdSql.query(f"select _rowts, {INT_COL}, {INT_COL}_1 from {DB3}.{CTBNAME} where _c0 > now()-2d and _c0>{NOW}")
        tdSql.checkRows(1)
        tdSql.checkData(0, 1, 111)
        tdSql.checkData(0, 2, 112)
        tdSql.query(f"select _rowts, {INT_COL}, {INT_COL}_1 from {DB3}.{CTBNAME} where _c0 > now()-7d and _c0>{NOW}")
        tdSql.checkRows(1)
        tdSql.checkData(0, 1, 111)
        tdSql.checkData(0, 2, 112)
        tdLog.printNoPrefix("==========step2.1.3 : drop child-table")
        tdSql.execute(f"drop table {DB3}.{CTBNAME} ")


        tdLog.printNoPrefix("==========step2.2 : rolluo func is  last/first")
        tdSql.prepare(dbname=DB4, **{"retentions": "1s:1d, 2m:3d, 3m:5d"})

        db4_ctb_num = 10
        tdSql.execute(f"use {DB4}")
        self.__create_tb(rsma=True, dbname=DB4, ctb_num=db4_ctb_num, rsma_type="last")
        self.__insert_data(rows=self.rows, rsma=True, dbname=DB4, ctb_num=db4_ctb_num, rsma_type="last")
        time.sleep(7)
        tdSql.query(f"select count(*) from {DB4}.stb1 where ts > now()-5m")
        tdSql.checkRows(1)
        tdSql.checkData(0, 0, self.rows * db4_ctb_num)
        tdSql.execute(f"flush database {DB4}")
        tdSql.query(f"select count(*) from {DB4}.stb1 where ts > now()-5m")
        tdSql.checkRows(1)
        tdSql.checkData(0, 0, self.rows * db4_ctb_num)
        tdSql.query(f"select {INT_COL} from {DB4}.ct1 where ts > now()-4d")
        tdSql.checkRows_range([1,2])
        # tdSql.checkData(0, 0, self.rows-1)
        tdSql.query(f"select {INT_COL} from {DB4}.ct1 where ts > now()-6d")
        tdSql.checkRows_range([1,2])
        # tdSql.checkData(0, 0, self.rows-1)
        # return
C
cpwu 已提交
310

C
cpwu 已提交
311 312 313
        tdSql.execute(f"drop database if exists {DB1} ")
        tdSql.execute(f"drop database if exists {DB2} ")
        # self.all_test()
C
cpwu 已提交
314

C
cpwu 已提交
315 316 317
        # tdDnodes.stop(1)
        # tdDnodes.start(1)
        tdSql.execute(f"flush database {DBNAME}")
C
cpwu 已提交
318 319 320 321 322 323 324 325 326 327

        tdLog.printNoPrefix("==========step4:after wal, all check again ")
        self.all_test()

    def stop(self):
        tdSql.close()
        tdLog.success(f"{__file__} successfully executed")

tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())