diff --git a/tests/pytest/random-test/random-test-multi-threading-3.py b/tests/pytest/random-test/random-test-multi-threading-3.py index 47c4228a8fb2df18ee8e8ce8b407d3d5b3b83ac8..0c8612bc44b5d18b7ef4d699ac5c03044c9d4002 100644 --- a/tests/pytest/random-test/random-test-multi-threading-3.py +++ b/tests/pytest/random-test/random-test-multi-threading-3.py @@ -24,6 +24,7 @@ last_tb = "" last_stb = "" written = 0 last_timestamp = 0 +colAdded = False class Test (Thread): @@ -140,6 +141,26 @@ class Test (Thread): last_tb = "" written = 0 + def alter_table_to_add_col(self): + tdLog.info("alter_table_to_add_col") + global last_stb + global colAdded + + if last_stb != "" and colAdded == False: + tdSql.execute( + "alter table %s add column col binary(20)" % + last_stb) + colAdded = True + + def alter_table_to_drop_col(self): + tdLog.info("alter_table_to_drop_col") + global last_stb + global colAdded + + if last_stb != "" and colAdded: + tdSql.execute("alter table %s drop column col" % last_stb) + colAdded = False + def restart_database(self): tdLog.info("restart_database") global last_tb @@ -235,6 +256,8 @@ class Test (Thread): 7: self.reset_database, 8: self.delete_datafiles, 9: self.drop_stable, + 10: self.alter_table_to_add_col, + 11: self.alter_table_to_drop_col, } queryOp = { @@ -256,7 +279,7 @@ class Test (Thread): while True: self.dbEvent.wait() tdLog.notice("second thread") - randDbOp = random.randint(1, 9) + randDbOp = random.randint(1, 11) dbOp.get(randDbOp, lambda: "ERROR")() self.dbEvent.clear() self.dataEvent.clear() diff --git a/tests/pytest/random-test/random-test-multi-threading.py b/tests/pytest/random-test/random-test-multi-threading.py index 65b6dcd948ca6a789d66f8c20ca3c2397bff0974..ff72aa0ea670769232e59181e554719850959ba9 100644 --- a/tests/pytest/random-test/random-test-multi-threading.py +++ b/tests/pytest/random-test/random-test-multi-threading.py @@ -14,6 +14,7 @@ import sys import random import threading +import queue from util.log import * from util.cases import * @@ -24,13 +25,16 @@ last_tb = "" last_stb = "" written = 0 last_timestamp = 0 +colAdded = False +killed = False class Test (threading.Thread): - def __init__(self, threadId, name): + def __init__(self, threadId, name, q): threading.Thread.__init__(self) self.threadId = threadId self.name = name + self.q = q self.threadLock = threading.Lock() @@ -38,11 +42,12 @@ class Test (threading.Thread): tdLog.info("create_table") global last_tb global written + global killed current_tb = "tb%d" % int(round(time.time() * 1000)) if (current_tb == last_tb): - return + return 0 else: tdLog.info("will create table %s" % current_tb) @@ -52,8 +57,14 @@ class Test (threading.Thread): current_tb) last_tb = current_tb written = 0 + killed = False except Exception as e: - tdLog.info(repr(e)) + tdLog.info("killed: %d error: %s" % (killed, e.args[0])) + if killed and (e.args[0] == 'network unavailable'): + tdLog.info("database killed, expect failed") + return 0 + return -1 + return 0 def insert_data(self): tdLog.info("insert_data") @@ -75,22 +86,34 @@ class Test (threading.Thread): for j in range(0, insertRows): if (last_tb == ""): tdLog.info("no table, return") - return - tdSql.execute( - 'insert into %s values (%d + %da, %d, "test")' % - (last_tb, start_time, last_timestamp, last_timestamp)) - written = written + 1 - last_timestamp = last_timestamp + 1 + return 0 + + try: + tdSql.execute( + 'insert into %s values (%d + %da, %d, "test")' % + (last_tb, start_time, last_timestamp, last_timestamp)) + written = written + 1 + last_timestamp = last_timestamp + 1 + except Exception as e: + if killed: + tdLog.info( + "database killed, expect failed %s" % + e.args[0]) + return 0 + tdLog.info(repr(e)) + return -1 + return 0 def query_data(self): tdLog.info("query_data") global last_tb - global written + global killed - if (written > 0): + if not killed and last_tb != "": tdLog.info("query data from table") tdSql.query("select * from %s" % last_tb) tdSql.checkRows(written) + return 0 def create_stable(self): tdLog.info("create_stable") @@ -101,9 +124,7 @@ class Test (threading.Thread): current_stb = "stb%d" % int(round(time.time() * 1000)) - if (current_stb == last_stb): - return - else: + if (current_stb != last_stb): tdLog.info("will create stable %s" % current_stb) tdLog.info( 'create table %s(ts timestamp, c1 int, c2 nchar(10)) tags (t1 int, t2 nchar(10))' % @@ -131,6 +152,8 @@ class Test (threading.Thread): written = written + 1 last_timestamp = last_timestamp + 1 + return 0 + def drop_stable(self): tdLog.info("drop_stable") global last_stb @@ -139,31 +162,63 @@ class Test (threading.Thread): if (last_stb == ""): tdLog.info("no super table") - return else: - tdLog.info("will drop last super table") + tdLog.info("will drop last super table %s" % last_stb) tdSql.execute('drop table %s' % last_stb) last_stb = "" last_tb = "" written = 0 + return 0 + + def alter_table_to_add_col(self): + tdLog.info("alter_table_to_add_col") + global last_stb + global colAdded + + if last_stb != "" and colAdded == False: + tdSql.execute( + "alter table %s add column col binary(20)" % + last_stb) + colAdded = True + return 0 + + def alter_table_to_drop_col(self): + tdLog.info("alter_table_to_drop_col") + global last_stb + global colAdded + + if last_stb != "" and not colAdded: + tdSql.execute("alter table %s drop column col" % last_stb) + colAdded = False + return 0 def restart_database(self): tdLog.info("restart_database") global last_tb global written + global killed tdDnodes.stop(1) + killed = True tdDnodes.start(1) -# tdLog.sleep(5) + tdLog.sleep(10) + killed = False + return 0 def force_restart_database(self): tdLog.info("force_restart_database") global last_tb global written + global killed tdDnodes.forcestop(1) + last_tb = "" + written = 0 + killed = True tdDnodes.start(1) # tdLog.sleep(10) + killed = False + return 0 def drop_table(self): tdLog.info("drop_table") @@ -176,6 +231,7 @@ class Test (threading.Thread): tdSql.execute("drop table %s" % last_tb) last_tb = "" written = 0 + return 0 def query_data_from_stable(self): tdLog.info("query_data_from_stable") @@ -183,10 +239,10 @@ class Test (threading.Thread): if (last_stb == ""): tdLog.info("no super table") - return else: tdLog.info("will query data from super table") tdSql.execute('select * from %s' % last_stb) + return 0 def reset_query_cache(self): tdLog.info("reset_query_cache") @@ -196,38 +252,44 @@ class Test (threading.Thread): tdLog.info("reset query cache") tdSql.execute("reset query cache") # tdLog.sleep(1) + return 0 def reset_database(self): tdLog.info("reset_database") global last_tb global last_stb global written + global killed tdDnodes.forcestop(1) + killed = True tdDnodes.deploy(1) tdDnodes.start(1) tdSql.prepare() - last_tb = "" - last_stb = "" - written = 0 + killed = False + return 0 def delete_datafiles(self): tdLog.info("delete_data_files") global last_tb global last_stb global written + global killed dnodesDir = tdDnodes.getDnodesRootDir() tdDnodes.forcestop(1) dataDir = dnodesDir + '/dnode1/data/*' deleteCmd = 'rm -rf %s' % dataDir os.system(deleteCmd) - - tdDnodes.start(1) - tdSql.prepare() last_tb = "" last_stb = "" written = 0 + killed = True + + tdDnodes.start(1) + tdSql.prepare() + killed = False + return 0 def run(self): dataOp = { @@ -246,6 +308,8 @@ class Test (threading.Thread): 7: self.reset_database, 8: self.delete_datafiles, 9: self.drop_stable, + 10: self.alter_table_to_add_col, + 11: self.alter_table_to_drop_col, } if (self.threadId == 1): @@ -253,16 +317,38 @@ class Test (threading.Thread): self.threadLock.acquire() tdLog.notice("first thread") randDataOp = random.randint(1, 3) - dataOp.get(randDataOp, lambda: "ERROR")() - self.threadLock.release() + ret1 = dataOp.get(randDataOp, lambda: "ERROR")() + + if ret1 == -1: + self.q.put(-1) + tdLog.exit("first thread failed") + else: + self.q.put(1) + + if (self.q.get() != -2): + self.threadLock.release() + else: + self.q.put(-1) + tdLog.exit("second thread failed, first thread exit too") elif (self.threadId == 2): while True: - tdLog.notice("second thread") self.threadLock.acquire() - randDbOp = random.randint(1, 9) - dbOp.get(randDbOp, lambda: "ERROR")() - self.threadLock.release() + tdLog.notice("second thread") + randDbOp = random.randint(1, 11) + ret2 = dbOp.get(randDbOp, lambda: "ERROR")() + + if ret2 == -1: + self.q.put(-2) + tdLog.exit("second thread failed") + else: + self.q.put(2) + + if (self.q.get() != -1): + self.threadLock.release() + else: + self.q.put(-2) + tdLog.exit("first thread failed, second exit too") class TDTestCase: @@ -273,14 +359,19 @@ class TDTestCase: def run(self): tdSql.prepare() - test1 = Test(1, "data operation") - test2 = Test(2, "db operation") + q = queue.Queue() + test1 = Test(1, "data operation", q) + test2 = Test(2, "db operation", q) test1.start() test2.start() test1.join() test2.join() + while not q.empty(): + if (q.get() != 0): + tdLog.exit("failed to end of test") + tdLog.info("end of test") def stop(self): diff --git a/tests/pytest/stable/query_after_reset.py b/tests/pytest/stable/query_after_reset.py index 2bc171ae5d53af86bb72b142ba54b52c76a9e80c..61f6558b83b95333bf37f1c0df13c3cf8c43d8cb 100644 --- a/tests/pytest/stable/query_after_reset.py +++ b/tests/pytest/stable/query_after_reset.py @@ -126,7 +126,7 @@ class Test: def delete_datafiles(self): tdLog.info("delete data files") dnodesDir = tdDnodes.getDnodesRootDir() - dataDir = dnodesDir + '/dnode1/*' + dataDir = dnodesDir + '/dnode1/data/*' deleteCmd = 'rm -rf %s' % dataDir os.system(deleteCmd) diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 3b86a5334396f65b6a5a0f0685403560f3e14499..e282298b7c0dafded323215a2b324cf77d554948 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -41,16 +41,12 @@ class TDSql: def prepare(self): tdLog.info("prepare database:db") s = 'reset query cache' - print(s) self.cursor.execute(s) s = 'drop database if exists db' - print(s) self.cursor.execute(s) s = 'create database db' - print(s) self.cursor.execute(s) s = 'use db' - print(s) self.cursor.execute(s) def error(self, sql): @@ -74,7 +70,6 @@ class TDSql: def query(self, sql): self.sql = sql - print(sql) self.cursor.execute(sql) self.queryResult = self.cursor.fetchall() self.queryRows = len(self.queryResult) @@ -191,7 +186,6 @@ class TDSql: def execute(self, sql): self.sql = sql - print(sql) self.affectedRows = self.cursor.execute(sql) return self.affectedRows