diff --git a/tests/pytest/random-test/random-test-multi-threading-3.py b/tests/pytest/random-test/random-test-multi-threading-3.py index 4d1ef3b11d90cb8cbb72094b8f6a7d9ae091bc59..db85ce2fe058837ecfe12599ddbba418d35e473d 100644 --- a/tests/pytest/random-test/random-test-multi-threading-3.py +++ b/tests/pytest/random-test/random-test-multi-threading-3.py @@ -13,7 +13,7 @@ import sys import random -import threading +from threading import Thread, Event from util.log import * from util.cases import * @@ -23,15 +23,15 @@ from util.dnodes import * last_tb = "" last_stb = "" written = 0 +last_timestamp = 0 -class Test (threading.Thread): - def __init__(self, threadId, name): - threading.Thread.__init__(self) +class Test (Thread): + def __init__(self, threadId, name, events): + Thread.__init__(self) self.threadId = threadId self.name = name - - self.threadLock = threading.Lock() + self.dataEvent, self.dbEvent, self.queryEvent = events def create_table(self): tdLog.info("create_table") @@ -47,7 +47,7 @@ class Test (threading.Thread): try: tdSql.execute( - 'create table %s (ts timestamp, speed int)' % + 'create table %s (ts timestamp, speed int, c2 nchar(10))' % current_tb) last_tb = current_tb written = 0 @@ -58,21 +58,28 @@ class Test (threading.Thread): tdLog.info("insert_data") global last_tb global written + global last_timestamp if (last_tb == ""): tdLog.info("no table, create first") self.create_table() + start_time = 1500000000000 + tdLog.info("will insert data to table") for i in range(0, 10): insertRows = 1000 tdLog.info("insert %d rows to %s" % (insertRows, last_tb)) for j in range(0, insertRows): - ret = tdSql.execute( - 'insert into %s values (now + %dm, %d)' % - (last_tb, j, j)) + if (last_tb == ""): + tdLog.info("no table, return") + return + tdSql.execute( + 'insert into %s values (%d + %da, %d, "test")' % + (last_tb, start_time, last_timestamp, last_timestamp)) written = written + 1 + last_timestamp = last_timestamp + 1 def query_data(self): tdLog.info("query_data") @@ -89,6 +96,7 @@ class Test (threading.Thread): global last_tb global last_stb global written + global last_timestamp current_stb = "stb%d" % int(round(time.time() * 1000)) @@ -106,10 +114,15 @@ class Test (threading.Thread): "create table %s using %s tags (1, '表1')" % (current_tb, last_stb)) last_tb = current_tb + written = 0 + + start_time = 1500000000000 + tdSql.execute( - "insert into %s values (now, 27, '我是nchar字符串')" % - last_tb) + "insert into %s values (%d+%da, 27, '我是nchar字符串')" % + (last_tb, start_time, last_timestamp)) written = written + 1 + last_timestamp = last_timestamp + 1 def drop_stable(self): tdLog.info("drop_stable") @@ -179,15 +192,16 @@ class Test (threading.Thread): tdDnodes.forcestop(1) tdDnodes.deploy(1) + tdDnodes.start(1) + tdSql.prepare() last_tb = "" last_stb = "" written = 0 - tdDnodes.start(1) - tdSql.prepare() def delete_datafiles(self): tdLog.info("delete_data_files") global last_tb + global last_stb global written dnodesDir = tdDnodes.getDnodesRootDir() @@ -195,16 +209,15 @@ class Test (threading.Thread): deleteCmd = 'rm -rf %s' % dataDir os.system(deleteCmd) - last_tb = "" - written = 0 tdDnodes.start(1) tdSql.prepare() + last_tb = "" + last_stb = "" + written = 0 def run(self): dataOp = { 1: self.insert_data, - 2: self.query_data, - 3: self.query_data_from_stable, } dbOp = { @@ -226,26 +239,33 @@ class Test (threading.Thread): if (self.threadId == 1): while True: - self.threadLock.acquire() + self.dataEvent.wait() tdLog.notice("first thread") - randDataOp = random.randint(1, 3) + randDataOp = random.randint(1, 1) dataOp.get(randDataOp, lambda: "ERROR")() - self.threadLock.release() + self.dataEvent.clear() + self.queryEvent.clear() + self.dbEvent.set() elif (self.threadId == 2): while True: + self.dbEvent.wait() tdLog.notice("second thread") - self.threadLock.acquire() randDbOp = random.randint(1, 9) dbOp.get(randDbOp, lambda: "ERROR")() - self.threadLock.release() + self.dbEvent.clear() + self.dataEvent.clear() + self.queryEvent.set() + elif (self.threadId == 3): while True: + self.queryEvent.wait() tdLog.notice("third thread") - self.threadLock.acquire() randQueryOp = random.randint(1, 9) queryOp.get(randQueryOp, lambda: "ERROR")() - self.threadLock.release() + self.queryEvent.clear() + self.dbEvent.clear() + self.dataEvent.set() class TDTestCase: @@ -256,13 +276,19 @@ class TDTestCase: def run(self): tdSql.prepare() - test1 = Test(1, "data operation") - test2 = Test(2, "db operation") - test2 = Test(3, "query operation") + events = [Event() for _ in range(3)] + events[0].set() + events[1].clear() + events[1].clear() + + test1 = Test(1, "data operation", events) + test2 = Test(2, "db operation", events) + test3 = Test(3, "query operation", events) test1.start() test2.start() test3.start() + test1.join() test2.join() test3.join() diff --git a/tests/pytest/random-test/random-test-multi-threading.py b/tests/pytest/random-test/random-test-multi-threading.py index 1c06f3a1ddd5bb527bd79fb8e15d003e576a466e..7d1a8a155dac8b427ec184b544632c747a6eca6a 100644 --- a/tests/pytest/random-test/random-test-multi-threading.py +++ b/tests/pytest/random-test/random-test-multi-threading.py @@ -23,6 +23,7 @@ from util.dnodes import * last_tb = "" last_stb = "" written = 0 +last_timestamp = 0 class Test (threading.Thread): @@ -47,7 +48,7 @@ class Test (threading.Thread): try: tdSql.execute( - 'create table %s (ts timestamp, speed int)' % + 'create table %s (ts timestamp, speed int, c1 nchar(10))' % current_tb) last_tb = current_tb written = 0 @@ -58,21 +59,28 @@ class Test (threading.Thread): tdLog.info("insert_data") global last_tb global written + global last_timestamp if (last_tb == ""): tdLog.info("no table, create first") self.create_table() + start_time = 1500000000000 + tdLog.info("will insert data to table") for i in range(0, 10): insertRows = 1000 tdLog.info("insert %d rows to %s" % (insertRows, last_tb)) for j in range(0, insertRows): - ret = tdSql.execute( - 'insert into %s values (now + %dm, %d)' % - (last_tb, j, j)) + if (last_tb == ""): + tdLog.info("no table, return") + return + tdSql.execute( + 'insert into %s values (%d + %da, %d, "test")' % + (last_tb, start_time, last_timestamp, last_timestamp)) written = written + 1 + last_timestamp = last_timestamp + 1 def query_data(self): tdLog.info("query_data") @@ -89,6 +97,7 @@ class Test (threading.Thread): global last_tb global last_stb global written + global last_timestamp current_stb = "stb%d" % int(round(time.time() * 1000)) @@ -106,10 +115,15 @@ class Test (threading.Thread): "create table %s using %s tags (1, '表1')" % (current_tb, last_stb)) last_tb = current_tb + written = 0 + + start_time = 1500000000000 + tdSql.execute( - "insert into %s values (now, 27, '我是nchar字符串')" % - last_tb) + "insert into %s values (%d+%da, 27, '我是nchar字符串')" % + (last_tb, start_time, last_timestamp)) written = written + 1 + last_timestamp = last_timestamp + 1 def drop_stable(self): tdLog.info("drop_stable") @@ -130,7 +144,7 @@ class Test (threading.Thread): tdDnodes.stop(1) tdDnodes.start(1) - tdLog.sleep(5) +# tdLog.sleep(5) def force_restart_database(self): tdLog.info("force_restart_database") @@ -139,7 +153,7 @@ class Test (threading.Thread): tdDnodes.forcestop(1) tdDnodes.start(1) - tdLog.sleep(10) +# tdLog.sleep(10) def drop_table(self): tdLog.info("drop_table") @@ -171,7 +185,7 @@ class Test (threading.Thread): tdLog.info("reset query cache") tdSql.execute("reset query cache") - tdLog.sleep(1) +# tdLog.sleep(1) def reset_database(self): tdLog.info("reset_database") @@ -181,15 +195,16 @@ class Test (threading.Thread): tdDnodes.forcestop(1) tdDnodes.deploy(1) + tdDnodes.start(1) + tdSql.prepare() last_tb = "" last_stb = "" written = 0 - tdDnodes.start(1) - tdSql.prepare() def delete_datafiles(self): tdLog.info("delete_data_files") global last_tb + global last_stb global written dnodesDir = tdDnodes.getDnodesRootDir() @@ -197,11 +212,12 @@ class Test (threading.Thread): deleteCmd = 'rm -rf %s' % dataDir os.system(deleteCmd) - last_tb = "" - written = 0 tdDnodes.start(1) - tdLog.sleep(10) +# tdLog.sleep(10) tdSql.prepare() + last_tb = "" + last_stb = "" + written = 0 def run(self): dataOp = { diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 4c839d87a3722b50309f1847e96fdfcd96885138..50d054a3015696cd6f202d8b0db4cfec16a833bb 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -160,7 +160,7 @@ class TDDnode: self.cfg("logDir", self.logDir) self.cfg("numOfLogLines", "100000000") self.cfg("mnodeEqualVnodeNum", "0") - self.cfg("clog", "1") + self.cfg("walLevel", "1") self.cfg("statusInterval", "1") self.cfg("numOfTotalVnodes", "64") self.cfg("numOfMnodes", "3")