random-test-multi-threading.py 7.2 KB
Newer Older
S
Shuduo Sang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-

import sys
import random
import threading

from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *

last_tb = ""
S
Shuduo Sang 已提交
24
last_stb = ""
S
Shuduo Sang 已提交
25 26 27 28
written = 0


class Test (threading.Thread):
S
Shuduo Sang 已提交
29
    def __init__(self, threadId, name):
S
Shuduo Sang 已提交
30 31 32 33 34 35 36
        threading.Thread.__init__(self)
        self.threadId = threadId
        self.name = name

        self.threadLock = threading.Lock()

    def create_table(self):
S
Shuduo Sang 已提交
37
        tdLog.info("create_table")
S
Shuduo Sang 已提交
38 39 40 41 42 43 44 45
        global last_tb
        global written

        current_tb = "tb%d" % int(round(time.time() * 1000))

        if (current_tb == last_tb):
            return
        else:
S
Shuduo Sang 已提交
46
            tdLog.info("will create table %s" % current_tb)
S
Shuduo Sang 已提交
47 48 49 50 51 52 53 54 55

            try:
                tdSql.execute(
                    'create table %s (ts timestamp, speed int)' %
                    current_tb)
                last_tb = current_tb
                written = 0
            except Exception as e:
                tdLog.info(repr(e))
S
Shuduo Sang 已提交
56 57

    def insert_data(self):
S
Shuduo Sang 已提交
58
        tdLog.info("insert_data")
S
Shuduo Sang 已提交
59 60 61
        global last_tb
        global written

S
Shuduo Sang 已提交
62
        if (last_tb == ""):
S
Shuduo Sang 已提交
63 64 65
            tdLog.info("no table, create first")
            self.create_table()

S
Shuduo Sang 已提交
66
        tdLog.info("will insert data to table")
S
Shuduo Sang 已提交
67 68
        for i in range(0, 10):
            insertRows = 1000
S
Shuduo Sang 已提交
69
            tdLog.info("insert %d rows to %s" % (insertRows, last_tb))
S
Shuduo Sang 已提交
70 71 72 73

            for j in range(0, insertRows):
                ret = tdSql.execute(
                    'insert into %s values (now + %dm, %d)' %
S
Shuduo Sang 已提交
74
                    (last_tb, j, j))
S
Shuduo Sang 已提交
75 76 77
                written = written + 1

    def query_data(self):
S
Shuduo Sang 已提交
78
        tdLog.info("query_data")
S
Shuduo Sang 已提交
79 80 81 82 83 84 85 86 87
        global last_tb
        global written

        if (written > 0):
            tdLog.info("query data from table")
            tdSql.query("select * from %s" % last_tb)
            tdSql.checkRows(written)

    def create_stable(self):
S
Shuduo Sang 已提交
88
        tdLog.info("create_stable")
S
Shuduo Sang 已提交
89
        global last_tb
S
Shuduo Sang 已提交
90
        global last_stb
S
Shuduo Sang 已提交
91
        global written
S
Shuduo Sang 已提交
92

S
Shuduo Sang 已提交
93
        current_stb = "stb%d" % int(round(time.time() * 1000))
S
Shuduo Sang 已提交
94

S
Shuduo Sang 已提交
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
        if (current_stb == last_stb):
            return
        else:
            tdLog.info("will create stable %s" % current_stb)
            tdSql.execute(
                'create table %s(ts timestamp, c1 int, c2 nchar(10)) tags (t1 int, t2 nchar(10))' %
                current_stb)
            last_stb = current_stb

            current_tb = "tb%d" % int(round(time.time() * 1000))
            tdSql.execute(
                "create table %s using %s tags (1, '表1')" %
                (current_tb, last_stb))
            last_tb = current_tb
            tdSql.execute(
                "insert into %s values (now, 27, '我是nchar字符串')" %
                last_tb)
S
Shuduo Sang 已提交
112
            written = written + 1
S
Shuduo Sang 已提交
113 114 115 116 117 118 119 120 121 122 123 124

    def drop_stable(self):
        tdLog.info("drop_stable")
        global last_stb

        if (last_stb == ""):
            tdLog.info("no super table")
            return
        else:
            tdLog.info("will drop last super table")
            tdSql.execute('drop table %s' % last_stb)
            last_stb = ""
S
Shuduo Sang 已提交
125 126

    def restart_database(self):
S
Shuduo Sang 已提交
127
        tdLog.info("restart_database")
S
Shuduo Sang 已提交
128 129 130 131 132 133 134
        global last_tb
        global written

        tdDnodes.stop(1)
        tdDnodes.start(1)
        tdLog.sleep(5)

S
Shuduo Sang 已提交
135 136
    def force_restart_database(self):
        tdLog.info("force_restart_database")
S
Shuduo Sang 已提交
137 138 139 140 141
        global last_tb
        global written

        tdDnodes.forcestop(1)
        tdDnodes.start(1)
S
Shuduo Sang 已提交
142
        tdLog.sleep(10)
S
Shuduo Sang 已提交
143 144

    def drop_table(self):
S
Shuduo Sang 已提交
145
        tdLog.info("drop_table")
S
Shuduo Sang 已提交
146 147 148 149
        global last_tb
        global written

        for i in range(0, 10):
S
Shuduo Sang 已提交
150 151 152
            if (last_tb != ""):
                tdLog.info("drop last_tb %s" % last_tb)
                tdSql.execute("drop table %s" % last_tb)
S
Shuduo Sang 已提交
153 154
                last_tb = ""
                written = 0
S
Shuduo Sang 已提交
155 156 157 158 159 160 161 162 163 164 165 166

    def query_data_from_stable(self):
        tdLog.info("query_data_from_stable")
        global last_stb

        if (last_stb == ""):
            tdLog.info("no super table")
            return
        else:
            tdLog.info("will query data from super table")
            tdSql.execute('select * from %s' % last_stb)

S
Shuduo Sang 已提交
167
    def reset_query_cache(self):
S
Shuduo Sang 已提交
168
        tdLog.info("reset_query_cache")
S
Shuduo Sang 已提交
169 170 171 172 173 174 175 176
        global last_tb
        global written

        tdLog.info("reset query cache")
        tdSql.execute("reset query cache")
        tdLog.sleep(1)

    def reset_database(self):
S
Shuduo Sang 已提交
177
        tdLog.info("reset_database")
S
Shuduo Sang 已提交
178
        global last_tb
S
Shuduo Sang 已提交
179
        global last_stb
S
Shuduo Sang 已提交
180 181 182 183 184
        global written

        tdDnodes.forcestop(1)
        tdDnodes.deploy(1)
        last_tb = ""
S
Shuduo Sang 已提交
185
        last_stb = ""
S
Shuduo Sang 已提交
186 187 188 189 190
        written = 0
        tdDnodes.start(1)
        tdSql.prepare()

    def delete_datafiles(self):
S
Shuduo Sang 已提交
191
        tdLog.info("delete_data_files")
S
Shuduo Sang 已提交
192 193 194 195 196 197 198 199 200 201 202
        global last_tb
        global written

        dnodesDir = tdDnodes.getDnodesRootDir()
        dataDir = dnodesDir + '/dnode1/*'
        deleteCmd = 'rm -rf %s' % dataDir
        os.system(deleteCmd)

        last_tb = ""
        written = 0
        tdDnodes.start(1)
S
Shuduo Sang 已提交
203
        tdLog.sleep(10)
S
Shuduo Sang 已提交
204 205 206
        tdSql.prepare()

    def run(self):
S
Shuduo Sang 已提交
207 208 209 210 211 212 213
        dataOp = {
            1: self.insert_data,
            2: self.query_data,
            3: self.query_data_from_stable,
        }

        dbOp = {
S
Shuduo Sang 已提交
214
            1: self.create_table,
S
Shuduo Sang 已提交
215 216 217 218 219 220 221 222
            2: self.create_stable,
            3: self.restart_database,
            4: self.force_restart_database,
            5: self.drop_table,
            6: self.reset_query_cache,
            7: self.reset_database,
            8: self.delete_datafiles,
            9: self.drop_stable,
S
Shuduo Sang 已提交
223 224
        }

S
Shuduo Sang 已提交
225 226 227 228 229
        if (self.threadId == 1):
            while True:
                self.threadLock.acquire()
                tdLog.notice("first thread")
                randDataOp = random.randint(1, 3)
230
                dataOp.get(randDataOp, lambda: "ERROR")()
S
Shuduo Sang 已提交
231 232 233 234 235 236 237 238 239
                self.threadLock.release()

        elif (self.threadId == 2):
            while True:
                tdLog.notice("second thread")
                self.threadLock.acquire()
                randDbOp = random.randint(1, 9)
                dbOp.get(randDbOp, lambda: "ERROR")()
                self.threadLock.release()
S
Shuduo Sang 已提交
240 241 242 243 244 245 246 247 248 249


class TDTestCase:
    def init(self, conn, logSql):
        tdLog.debug("start to execute %s" % __file__)
        tdSql.init(conn.cursor(), logSql)

    def run(self):
        tdSql.prepare()

S
Shuduo Sang 已提交
250 251
        test1 = Test(1, "data operation")
        test2 = Test(2, "db operation")
S
Shuduo Sang 已提交
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266

        test1.start()
        test2.start()
        test1.join()
        test2.join()

        tdLog.info("end of test")

    def stop(self):
        tdSql.close()
        tdLog.success("%s successfully executed" % __file__)


tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())