random-test-multi-threading.py 7.0 KB
Newer Older
S
Shuduo Sang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-

import sys
import random
import threading

from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *

last_tb = ""
S
Shuduo Sang 已提交
24
last_stb = ""
S
Shuduo Sang 已提交
25 26 27 28
written = 0


class Test (threading.Thread):
S
Shuduo Sang 已提交
29
    def __init__(self, threadId, name):
S
Shuduo Sang 已提交
30 31 32 33 34 35 36
        threading.Thread.__init__(self)
        self.threadId = threadId
        self.name = name

        self.threadLock = threading.Lock()

    def create_table(self):
S
Shuduo Sang 已提交
37
        tdLog.info("create_table")
S
Shuduo Sang 已提交
38 39 40 41 42 43 44 45
        global last_tb
        global written

        current_tb = "tb%d" % int(round(time.time() * 1000))

        if (current_tb == last_tb):
            return
        else:
S
Shuduo Sang 已提交
46
            tdLog.info("will create table %s" % current_tb)
S
Shuduo Sang 已提交
47 48 49 50 51 52 53
            tdSql.execute(
                'create table %s (ts timestamp, speed int)' %
                current_tb)
            last_tb = current_tb
            written = 0

    def insert_data(self):
S
Shuduo Sang 已提交
54
        tdLog.info("insert_data")
S
Shuduo Sang 已提交
55 56 57
        global last_tb
        global written

S
Shuduo Sang 已提交
58
        if (last_tb == ""):
S
Shuduo Sang 已提交
59 60 61
            tdLog.info("no table, create first")
            self.create_table()

S
Shuduo Sang 已提交
62
        tdLog.info("will insert data to table")
S
Shuduo Sang 已提交
63 64
        for i in range(0, 10):
            insertRows = 1000
S
Shuduo Sang 已提交
65
            tdLog.info("insert %d rows to %s" % (insertRows, last_tb))
S
Shuduo Sang 已提交
66 67 68 69

            for j in range(0, insertRows):
                ret = tdSql.execute(
                    'insert into %s values (now + %dm, %d)' %
S
Shuduo Sang 已提交
70
                    (last_tb, j, j))
S
Shuduo Sang 已提交
71 72 73
                written = written + 1

    def query_data(self):
S
Shuduo Sang 已提交
74
        tdLog.info("query_data")
S
Shuduo Sang 已提交
75 76 77 78 79 80 81 82 83
        global last_tb
        global written

        if (written > 0):
            tdLog.info("query data from table")
            tdSql.query("select * from %s" % last_tb)
            tdSql.checkRows(written)

    def create_stable(self):
S
Shuduo Sang 已提交
84
        tdLog.info("create_stable")
S
Shuduo Sang 已提交
85
        global last_tb
S
Shuduo Sang 已提交
86
        global last_stb
S
Shuduo Sang 已提交
87
        global written
S
Shuduo Sang 已提交
88
        current_stb = "stb%d" % int(round(time.time() * 1000))
S
Shuduo Sang 已提交
89

S
Shuduo Sang 已提交
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
        if (current_stb == last_stb):
            return
        else:
            tdLog.info("will create stable %s" % current_stb)
            tdSql.execute(
                'create table %s(ts timestamp, c1 int, c2 nchar(10)) tags (t1 int, t2 nchar(10))' %
                current_stb)
            last_stb = current_stb

            current_tb = "tb%d" % int(round(time.time() * 1000))
            tdSql.execute(
                "create table %s using %s tags (1, '表1')" %
                (current_tb, last_stb))
            last_tb = current_tb
            tdSql.execute(
                "insert into %s values (now, 27, '我是nchar字符串')" %
                last_tb)
            self.written = self.written + 1


    def drop_stable(self):
        tdLog.info("drop_stable")
        global last_stb

        if (last_stb == ""):
            tdLog.info("no super table")
            return
        else:
            tdLog.info("will drop last super table")
            tdSql.execute('drop table %s' % last_stb)
            last_stb = ""
S
Shuduo Sang 已提交
121 122

    def restart_database(self):
S
Shuduo Sang 已提交
123
        tdLog.info("restart_database")
S
Shuduo Sang 已提交
124 125 126 127 128 129 130
        global last_tb
        global written

        tdDnodes.stop(1)
        tdDnodes.start(1)
        tdLog.sleep(5)

S
Shuduo Sang 已提交
131 132
    def force_restart_database(self):
        tdLog.info("force_restart_database")
S
Shuduo Sang 已提交
133 134 135 136 137 138 139 140
        global last_tb
        global written

        tdDnodes.forcestop(1)
        tdDnodes.start(1)
        tdLog.sleep(5)

    def drop_table(self):
S
Shuduo Sang 已提交
141
        tdLog.info("drop_table")
S
Shuduo Sang 已提交
142 143 144 145
        global last_tb
        global written

        for i in range(0, 10):
S
Shuduo Sang 已提交
146 147 148
            if (last_tb != ""):
                tdLog.info("drop last_tb %s" % last_tb)
                tdSql.execute("drop table %s" % last_tb)
S
Shuduo Sang 已提交
149 150 151
                last_tb = ""
                written = 0
            tdLog.sleep(self.sleepTime)
S
Shuduo Sang 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164


    def query_data_from_stable(self):
        tdLog.info("query_data_from_stable")
        global last_stb

        if (last_stb == ""):
            tdLog.info("no super table")
            return
        else:
            tdLog.info("will query data from super table")
            tdSql.execute('select * from %s' % last_stb)

S
Shuduo Sang 已提交
165 166

    def reset_query_cache(self):
S
Shuduo Sang 已提交
167
        tdLog.info("reset_query_cache")
S
Shuduo Sang 已提交
168 169 170 171 172 173 174 175
        global last_tb
        global written

        tdLog.info("reset query cache")
        tdSql.execute("reset query cache")
        tdLog.sleep(1)

    def reset_database(self):
S
Shuduo Sang 已提交
176
        tdLog.info("reset_database")
S
Shuduo Sang 已提交
177 178 179 180 181 182 183 184 185 186 187
        global last_tb
        global written

        tdDnodes.forcestop(1)
        tdDnodes.deploy(1)
        last_tb = ""
        written = 0
        tdDnodes.start(1)
        tdSql.prepare()

    def delete_datafiles(self):
S
Shuduo Sang 已提交
188
        tdLog.info("delete_data_files")
S
Shuduo Sang 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201 202
        global last_tb
        global written

        dnodesDir = tdDnodes.getDnodesRootDir()
        dataDir = dnodesDir + '/dnode1/*'
        deleteCmd = 'rm -rf %s' % dataDir
        os.system(deleteCmd)

        last_tb = ""
        written = 0
        tdDnodes.start(1)
        tdSql.prepare()

    def run(self):
S
Shuduo Sang 已提交
203 204 205 206 207 208 209
        dataOp = {
            1: self.insert_data,
            2: self.query_data,
            3: self.query_data_from_stable,
        }

        dbOp = {
S
Shuduo Sang 已提交
210
            1: self.create_table,
S
Shuduo Sang 已提交
211 212 213 214 215 216 217 218
            2: self.create_stable,
            3: self.restart_database,
            4: self.force_restart_database,
            5: self.drop_table,
            6: self.reset_query_cache,
            7: self.reset_database,
            8: self.delete_datafiles,
            9: self.drop_stable,
S
Shuduo Sang 已提交
219 220
        }

S
Shuduo Sang 已提交
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
        if (self.threadId == 1):
            while True:
                self.threadLock.acquire()
                tdLog.notice("first thread")
                randDataOp = random.randint(1, 3)
                dataOp.get(randDataOp , lambda: "ERROR")()
                self.threadLock.release()

        elif (self.threadId == 2):
            while True:
                tdLog.notice("second thread")
                self.threadLock.acquire()
                randDbOp = random.randint(1, 9)
                dbOp.get(randDbOp, lambda: "ERROR")()
                self.threadLock.release()
S
Shuduo Sang 已提交
236 237 238 239 240 241 242 243 244 245


class TDTestCase:
    def init(self, conn, logSql):
        tdLog.debug("start to execute %s" % __file__)
        tdSql.init(conn.cursor(), logSql)

    def run(self):
        tdSql.prepare()

S
Shuduo Sang 已提交
246 247
        test1 = Test(1, "data operation")
        test2 = Test(2, "db operation")
S
Shuduo Sang 已提交
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262

        test1.start()
        test2.start()
        test1.join()
        test2.join()

        tdLog.info("end of test")

    def stop(self):
        tdSql.close()
        tdLog.success("%s successfully executed" % __file__)


tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())