random-test-multi-threading-3.py 10.9 KB
Newer Older
S
Shuduo Sang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-

import sys
import random
16
from threading import Thread, Event
S
Shuduo Sang 已提交
17 18 19 20 21 22 23 24 25

from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *

last_tb = ""
last_stb = ""
written = 0
26
last_timestamp = 0
27
colAdded = False
28
killed = False
S
Shuduo Sang 已提交
29 30


31
class Test (Thread):
32
    def __init__(self, threadId, name, events, q):
33
        Thread.__init__(self)
S
Shuduo Sang 已提交
34 35
        self.threadId = threadId
        self.name = name
36
        self.dataEvent, self.dbEvent, self.queryEvent = events
37
        self.q = q
S
Shuduo Sang 已提交
38 39 40 41 42

    def create_table(self):
        tdLog.info("create_table")
        global last_tb
        global written
43
        global killed
S
Shuduo Sang 已提交
44 45 46 47 48 49 50 51 52 53

        current_tb = "tb%d" % int(round(time.time() * 1000))

        if (current_tb == last_tb):
            return
        else:
            tdLog.info("will create table %s" % current_tb)

            try:
                tdSql.execute(
54
                    'create table %s (ts timestamp, speed int, c2 nchar(10))' %
S
Shuduo Sang 已提交
55 56 57
                    current_tb)
                last_tb = current_tb
                written = 0
58
                killed = False
S
Shuduo Sang 已提交
59
            except Exception as e:
60 61 62 63 64 65
                tdLog.info("killed: %d error: %s" % (killed, e.args[0]))
                if killed and (e.args[0] == 'network unavailable'):
                    tdLog.info("database killed, expect failed")
                    return 0
                return -1
        return 0
S
Shuduo Sang 已提交
66 67 68 69 70

    def insert_data(self):
        tdLog.info("insert_data")
        global last_tb
        global written
71
        global last_timestamp
S
Shuduo Sang 已提交
72 73 74 75 76

        if (last_tb == ""):
            tdLog.info("no table, create first")
            self.create_table()

77 78
        start_time = 1500000000000

S
Shuduo Sang 已提交
79 80 81 82 83 84
        tdLog.info("will insert data to table")
        for i in range(0, 10):
            insertRows = 1000
            tdLog.info("insert %d rows to %s" % (insertRows, last_tb))

            for j in range(0, insertRows):
85 86
                if (last_tb == ""):
                    tdLog.info("no table, return")
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
                    return 0
                try:
                    tdSql.execute(
                        'insert into %s values (%d + %da, %d, "test")' %
                        (last_tb, start_time, last_timestamp, last_timestamp))
                    written = written + 1
                    last_timestamp = last_timestamp + 1
                except Exception as e:
                    if killed:
                        tdLog.info(
                            "database killed, expect failed %s" %
                            e.args[0])
                        return 0
                    tdLog.info(repr(e))
                    return -1
        return 0
S
Shuduo Sang 已提交
103 104 105 106

    def query_data(self):
        tdLog.info("query_data")
        global last_tb
107
        global killed
S
Shuduo Sang 已提交
108

109
        if not killed and last_tb != "":
S
Shuduo Sang 已提交
110 111 112
            tdLog.info("query data from table")
            tdSql.query("select * from %s" % last_tb)
            tdSql.checkRows(written)
113
        return 0
S
Shuduo Sang 已提交
114 115 116 117 118 119

    def create_stable(self):
        tdLog.info("create_stable")
        global last_tb
        global last_stb
        global written
120
        global last_timestamp
S
Shuduo Sang 已提交
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137

        current_stb = "stb%d" % int(round(time.time() * 1000))

        if (current_stb == last_stb):
            return
        else:
            tdLog.info("will create stable %s" % current_stb)
            tdSql.execute(
                'create table %s(ts timestamp, c1 int, c2 nchar(10)) tags (t1 int, t2 nchar(10))' %
                current_stb)
            last_stb = current_stb

            current_tb = "tb%d" % int(round(time.time() * 1000))
            tdSql.execute(
                "create table %s using %s tags (1, '表1')" %
                (current_tb, last_stb))
            last_tb = current_tb
138 139 140 141
            written = 0

            start_time = 1500000000000

S
Shuduo Sang 已提交
142
            tdSql.execute(
143 144
                "insert into %s values (%d+%da, 27, '我是nchar字符串')" %
                (last_tb, start_time, last_timestamp))
S
Shuduo Sang 已提交
145
            written = written + 1
146
            last_timestamp = last_timestamp + 1
147
        return 0
S
Shuduo Sang 已提交
148 149 150 151

    def drop_stable(self):
        tdLog.info("drop_stable")
        global last_stb
S
Shuduo Sang 已提交
152 153
        global last_tb
        global written
S
Shuduo Sang 已提交
154 155 156 157 158 159 160 161

        if (last_stb == ""):
            tdLog.info("no super table")
            return
        else:
            tdLog.info("will drop last super table")
            tdSql.execute('drop table %s' % last_stb)
            last_stb = ""
S
Shuduo Sang 已提交
162 163
            last_tb = ""
            written = 0
164
        return 0
S
Shuduo Sang 已提交
165

166 167 168 169 170 171 172 173 174 175
    def alter_table_to_add_col(self):
        tdLog.info("alter_table_to_add_col")
        global last_stb
        global colAdded

        if last_stb != "" and colAdded == False:
            tdSql.execute(
                "alter table %s add column col binary(20)" %
                last_stb)
            colAdded = True
176
        return 0
177 178 179 180 181 182 183 184 185

    def alter_table_to_drop_col(self):
        tdLog.info("alter_table_to_drop_col")
        global last_stb
        global colAdded

        if last_stb != "" and colAdded:
            tdSql.execute("alter table %s drop column col" % last_stb)
            colAdded = False
186
        return 0
187

S
Shuduo Sang 已提交
188 189 190 191
    def restart_database(self):
        tdLog.info("restart_database")
        global last_tb
        global written
192
        global killed
S
Shuduo Sang 已提交
193 194

        tdDnodes.stop(1)
195
        killed = True
S
Shuduo Sang 已提交
196
        tdDnodes.start(1)
197 198 199
        tdLog.sleep(10)
        killed = False
        return 0
S
Shuduo Sang 已提交
200 201 202 203 204

    def force_restart_database(self):
        tdLog.info("force_restart_database")
        global last_tb
        global written
205
        global killed
S
Shuduo Sang 已提交
206 207

        tdDnodes.forcestop(1)
208 209 210
        last_tb = ""
        written = 0
        killed = True
S
Shuduo Sang 已提交
211
        tdDnodes.start(1)
212 213 214
#        tdLog.sleep(10)
        killed = False
        return 0
S
Shuduo Sang 已提交
215 216 217 218 219 220 221 222 223 224 225 226

    def drop_table(self):
        tdLog.info("drop_table")
        global last_tb
        global written

        for i in range(0, 10):
            if (last_tb != ""):
                tdLog.info("drop last_tb %s" % last_tb)
                tdSql.execute("drop table %s" % last_tb)
                last_tb = ""
                written = 0
227
        return 0
S
Shuduo Sang 已提交
228 229 230 231 232 233 234 235 236 237 238

    def query_data_from_stable(self):
        tdLog.info("query_data_from_stable")
        global last_stb

        if (last_stb == ""):
            tdLog.info("no super table")
            return
        else:
            tdLog.info("will query data from super table")
            tdSql.execute('select * from %s' % last_stb)
239
        return 0
S
Shuduo Sang 已提交
240 241 242 243 244 245 246 247 248

    def reset_query_cache(self):
        tdLog.info("reset_query_cache")
        global last_tb
        global written

        tdLog.info("reset query cache")
        tdSql.execute("reset query cache")
        tdLog.sleep(1)
249
        return 0
S
Shuduo Sang 已提交
250 251 252 253 254 255

    def reset_database(self):
        tdLog.info("reset_database")
        global last_tb
        global last_stb
        global written
256
        global killed
S
Shuduo Sang 已提交
257 258

        tdDnodes.forcestop(1)
259
        killed = True
S
Shuduo Sang 已提交
260
        tdDnodes.deploy(1)
S
Shuduo Sang 已提交
261 262
        tdDnodes.start(1)
        tdSql.prepare()
263 264
        killed = False
        return 0
S
Shuduo Sang 已提交
265 266 267 268

    def delete_datafiles(self):
        tdLog.info("delete_data_files")
        global last_tb
S
Shuduo Sang 已提交
269
        global last_stb
S
Shuduo Sang 已提交
270
        global written
271
        global killed
S
Shuduo Sang 已提交
272 273

        dnodesDir = tdDnodes.getDnodesRootDir()
274
        tdDnodes.forcestop(1)
275
        killed = True
S
Shuduo Sang 已提交
276
        dataDir = dnodesDir + '/dnode1/data/*'
S
Shuduo Sang 已提交
277 278
        deleteCmd = 'rm -rf %s' % dataDir
        os.system(deleteCmd)
S
Shuduo Sang 已提交
279 280 281
        last_tb = ""
        last_stb = ""
        written = 0
S
Shuduo Sang 已提交
282

283 284 285 286 287
        tdDnodes.start(1)
        tdSql.prepare()
        killed = False
        return 0

S
Shuduo Sang 已提交
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
    def run(self):
        dataOp = {
            1: self.insert_data,
        }

        dbOp = {
            1: self.create_table,
            2: self.create_stable,
            3: self.restart_database,
            4: self.force_restart_database,
            5: self.drop_table,
            6: self.reset_query_cache,
            7: self.reset_database,
            8: self.delete_datafiles,
            9: self.drop_stable,
303 304
            10: self.alter_table_to_add_col,
            11: self.alter_table_to_drop_col,
S
Shuduo Sang 已提交
305 306 307 308 309 310 311 312 313
        }

        queryOp = {
            1: self.query_data,
            2: self.query_data_from_stable,
        }

        if (self.threadId == 1):
            while True:
314
                self.dataEvent.wait()
S
Shuduo Sang 已提交
315
                tdLog.notice("first thread")
316
                randDataOp = random.randint(1, 1)
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332
                ret1 = dataOp.get(randDataOp, lambda: "ERROR")()

                if ret1 == -1:
                    self.q.put(-1)
                    tdLog.exit("first thread failed")
                else:
                    self.q.put(1)

                if (self.q.get() != -2):
                    self.dataEvent.clear()
                    self.queryEvent.clear()
                    self.dbEvent.set()
                else:
                    self.q.put(-1)
                    tdLog.exit("second thread failed, first thread exit too")

S
Shuduo Sang 已提交
333 334
        elif (self.threadId == 2):
            while True:
335
                self.dbEvent.wait()
S
Shuduo Sang 已提交
336
                tdLog.notice("second thread")
337
                randDbOp = random.randint(1, 11)
S
Shuduo Sang 已提交
338
                dbOp.get(randDbOp, lambda: "ERROR")()
339 340 341 342
                self.dbEvent.clear()
                self.dataEvent.clear()
                self.queryEvent.set()

S
Shuduo Sang 已提交
343 344
        elif (self.threadId == 3):
            while True:
345
                self.queryEvent.wait()
S
Shuduo Sang 已提交
346
                tdLog.notice("third thread")
347
                randQueryOp = random.randint(1, 2)
S
Shuduo Sang 已提交
348
                queryOp.get(randQueryOp, lambda: "ERROR")()
349 350 351
                self.queryEvent.clear()
                self.dbEvent.clear()
                self.dataEvent.set()
S
Shuduo Sang 已提交
352 353 354 355 356 357 358 359 360 361


class TDTestCase:
    def init(self, conn, logSql):
        tdLog.debug("start to execute %s" % __file__)
        tdSql.init(conn.cursor(), logSql)

    def run(self):
        tdSql.prepare()

362 363 364 365 366 367 368 369
        events = [Event() for _ in range(3)]
        events[0].set()
        events[1].clear()
        events[1].clear()

        test1 = Test(1, "data operation", events)
        test2 = Test(2, "db operation", events)
        test3 = Test(3, "query operation", events)
S
Shuduo Sang 已提交
370 371 372 373

        test1.start()
        test2.start()
        test3.start()
374

S
Shuduo Sang 已提交
375 376 377 378
        test1.join()
        test2.join()
        test3.join()

379 380 381 382
        while not q.empty():
            if (q.get() != 0):
                tdLog.exit("failed to end of test")

S
Shuduo Sang 已提交
383 384 385 386 387 388 389 390 391
        tdLog.info("end of test")

    def stop(self):
        tdSql.close()
        tdLog.success("%s successfully executed" % __file__)


tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())