From 51965cdf342946dd48bee3ab446041af37e0fc6c Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Thu, 4 Jun 2020 11:55:48 +0800 Subject: [PATCH] add 3 threads case. --- .../random-test-multi-threading-3.py | 281 ++++++++++++++++++ 1 file changed, 281 insertions(+) create mode 100644 tests/pytest/random-test/random-test-multi-threading-3.py diff --git a/tests/pytest/random-test/random-test-multi-threading-3.py b/tests/pytest/random-test/random-test-multi-threading-3.py new file mode 100644 index 0000000000..c7b39ca1a1 --- /dev/null +++ b/tests/pytest/random-test/random-test-multi-threading-3.py @@ -0,0 +1,281 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import random +import threading + +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import * + +last_tb = "" +last_stb = "" +written = 0 + + +class Test (threading.Thread): + def __init__(self, threadId, name): + threading.Thread.__init__(self) + self.threadId = threadId + self.name = name + + self.threadLock = threading.Lock() + + def create_table(self): + tdLog.info("create_table") + global last_tb + global written + + current_tb = "tb%d" % int(round(time.time() * 1000)) + + if (current_tb == last_tb): + return + else: + tdLog.info("will create table %s" % current_tb) + + try: + tdSql.execute( + 'create table %s (ts timestamp, speed int)' % + current_tb) + last_tb = current_tb + written = 0 + except Exception as e: + tdLog.info(repr(e)) + + def insert_data(self): + tdLog.info("insert_data") + global last_tb + global written + + if (last_tb == ""): + tdLog.info("no table, create first") + self.create_table() + + tdLog.info("will insert data to table") + for i in range(0, 10): + insertRows = 1000 + tdLog.info("insert %d rows to %s" % (insertRows, last_tb)) + + for j in range(0, insertRows): + ret = tdSql.execute( + 'insert into %s values (now + %dm, %d)' % + (last_tb, j, j)) + written = written + 1 + + def query_data(self): + tdLog.info("query_data") + global last_tb + global written + + if (written > 0): + tdLog.info("query data from table") + tdSql.query("select * from %s" % last_tb) + tdSql.checkRows(written) + + def create_stable(self): + tdLog.info("create_stable") + global last_tb + global last_stb + global written + + current_stb = "stb%d" % int(round(time.time() * 1000)) + + if (current_stb == last_stb): + return + else: + tdLog.info("will create stable %s" % current_stb) + tdSql.execute( + 'create table %s(ts timestamp, c1 int, c2 nchar(10)) tags (t1 int, t2 nchar(10))' % + current_stb) + last_stb = current_stb + + current_tb = "tb%d" % int(round(time.time() * 1000)) + tdSql.execute( + "create table %s using %s tags (1, '表1')" % + (current_tb, last_stb)) + last_tb = current_tb + tdSql.execute( + "insert into %s values (now, 27, '我是nchar字符串')" % + last_tb) + written = written + 1 + + + def drop_stable(self): + tdLog.info("drop_stable") + global last_stb + + if (last_stb == ""): + tdLog.info("no super table") + return + else: + tdLog.info("will drop last super table") + tdSql.execute('drop table %s' % last_stb) + last_stb = "" + + def restart_database(self): + tdLog.info("restart_database") + global last_tb + global written + + tdDnodes.stop(1) + tdDnodes.start(1) + + def force_restart_database(self): + tdLog.info("force_restart_database") + global last_tb + global written + + tdDnodes.forcestop(1) + tdDnodes.start(1) + + def drop_table(self): + tdLog.info("drop_table") + global last_tb + global written + + for i in range(0, 10): + if (last_tb != ""): + tdLog.info("drop last_tb %s" % last_tb) + tdSql.execute("drop table %s" % last_tb) + last_tb = "" + written = 0 + + + def query_data_from_stable(self): + tdLog.info("query_data_from_stable") + global last_stb + + if (last_stb == ""): + tdLog.info("no super table") + return + else: + tdLog.info("will query data from super table") + tdSql.execute('select * from %s' % last_stb) + + + def reset_query_cache(self): + tdLog.info("reset_query_cache") + global last_tb + global written + + tdLog.info("reset query cache") + tdSql.execute("reset query cache") + tdLog.sleep(1) + + def reset_database(self): + tdLog.info("reset_database") + global last_tb + global last_stb + global written + + tdDnodes.forcestop(1) + tdDnodes.deploy(1) + last_tb = "" + last_stb = "" + written = 0 + tdDnodes.start(1) + tdSql.prepare() + + def delete_datafiles(self): + tdLog.info("delete_data_files") + global last_tb + global written + + dnodesDir = tdDnodes.getDnodesRootDir() + dataDir = dnodesDir + '/dnode1/*' + deleteCmd = 'rm -rf %s' % dataDir + os.system(deleteCmd) + + last_tb = "" + written = 0 + tdDnodes.start(1) + tdSql.prepare() + + def run(self): + dataOp = { + 1: self.insert_data, + 2: self.query_data, + 3: self.query_data_from_stable, + } + + dbOp = { + 1: self.create_table, + 2: self.create_stable, + 3: self.restart_database, + 4: self.force_restart_database, + 5: self.drop_table, + 6: self.reset_query_cache, + 7: self.reset_database, + 8: self.delete_datafiles, + 9: self.drop_stable, + } + + queryOp = { + 1: self.query_data, + 2: self.query_data_from_stable, + } + + if (self.threadId == 1): + while True: + self.threadLock.acquire() + tdLog.notice("first thread") + randDataOp = random.randint(1, 3) + dataOp.get(randDataOp , lambda: "ERROR")() + self.threadLock.release() + + elif (self.threadId == 2): + while True: + tdLog.notice("second thread") + self.threadLock.acquire() + randDbOp = random.randint(1, 9) + dbOp.get(randDbOp, lambda: "ERROR")() + self.threadLock.release() + elif (self.threadId == 3): + while True: + tdLog.notice("third thread") + self.threadLock.acquire() + randQueryOp = random.randint(1, 9) + queryOp.get(randQueryOp, lambda: "ERROR")() + self.threadLock.release() + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + def run(self): + tdSql.prepare() + + test1 = Test(1, "data operation") + test2 = Test(2, "db operation") + test2 = Test(3, "query operation") + + test1.start() + test2.start() + test3.start() + test1.join() + test2.join() + test3.join() + + tdLog.info("end of test") + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) -- GitLab