未验证 提交 9e47c5e8 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2437 from taosdata/feature/sangshuduo/multi-thread-random-test

Feature/sangshuduo/multi thread random test
......@@ -24,19 +24,23 @@ last_tb = ""
last_stb = ""
written = 0
last_timestamp = 0
colAdded = False
killed = False
class Test (Thread):
def __init__(self, threadId, name, events):
def __init__(self, threadId, name, events, q):
Thread.__init__(self)
self.threadId = threadId
self.name = name
self.dataEvent, self.dbEvent, self.queryEvent = events
self.q = q
def create_table(self):
tdLog.info("create_table")
global last_tb
global written
global killed
current_tb = "tb%d" % int(round(time.time() * 1000))
......@@ -51,8 +55,14 @@ class Test (Thread):
current_tb)
last_tb = current_tb
written = 0
killed = False
except Exception as e:
tdLog.info(repr(e))
tdLog.info("killed: %d error: %s" % (killed, e.args[0]))
if killed and (e.args[0] == 'network unavailable'):
tdLog.info("database killed, expect failed")
return 0
return -1
return 0
def insert_data(self):
tdLog.info("insert_data")
......@@ -74,22 +84,33 @@ class Test (Thread):
for j in range(0, insertRows):
if (last_tb == ""):
tdLog.info("no table, return")
return
tdSql.execute(
'insert into %s values (%d + %da, %d, "test")' %
(last_tb, start_time, last_timestamp, last_timestamp))
written = written + 1
last_timestamp = last_timestamp + 1
return 0
try:
tdSql.execute(
'insert into %s values (%d + %da, %d, "test")' %
(last_tb, start_time, last_timestamp, last_timestamp))
written = written + 1
last_timestamp = last_timestamp + 1
except Exception as e:
if killed:
tdLog.info(
"database killed, expect failed %s" %
e.args[0])
return 0
tdLog.info(repr(e))
return -1
return 0
def query_data(self):
tdLog.info("query_data")
global last_tb
global written
global killed
if (written > 0):
if not killed and last_tb != "":
tdLog.info("query data from table")
tdSql.query("select * from %s" % last_tb)
tdSql.checkRows(written)
return 0
def create_stable(self):
tdLog.info("create_stable")
......@@ -123,6 +144,7 @@ class Test (Thread):
(last_tb, start_time, last_timestamp))
written = written + 1
last_timestamp = last_timestamp + 1
return 0
def drop_stable(self):
tdLog.info("drop_stable")
......@@ -139,22 +161,57 @@ class Test (Thread):
last_stb = ""
last_tb = ""
written = 0
return 0
def alter_table_to_add_col(self):
tdLog.info("alter_table_to_add_col")
global last_stb
global colAdded
if last_stb != "" and colAdded == False:
tdSql.execute(
"alter table %s add column col binary(20)" %
last_stb)
colAdded = True
return 0
def alter_table_to_drop_col(self):
tdLog.info("alter_table_to_drop_col")
global last_stb
global colAdded
if last_stb != "" and colAdded:
tdSql.execute("alter table %s drop column col" % last_stb)
colAdded = False
return 0
def restart_database(self):
tdLog.info("restart_database")
global last_tb
global written
global killed
tdDnodes.stop(1)
killed = True
tdDnodes.start(1)
tdLog.sleep(10)
killed = False
return 0
def force_restart_database(self):
tdLog.info("force_restart_database")
global last_tb
global written
global killed
tdDnodes.forcestop(1)
last_tb = ""
written = 0
killed = True
tdDnodes.start(1)
# tdLog.sleep(10)
killed = False
return 0
def drop_table(self):
tdLog.info("drop_table")
......@@ -167,6 +224,7 @@ class Test (Thread):
tdSql.execute("drop table %s" % last_tb)
last_tb = ""
written = 0
return 0
def query_data_from_stable(self):
tdLog.info("query_data_from_stable")
......@@ -178,6 +236,7 @@ class Test (Thread):
else:
tdLog.info("will query data from super table")
tdSql.execute('select * from %s' % last_stb)
return 0
def reset_query_cache(self):
tdLog.info("reset_query_cache")
......@@ -187,39 +246,45 @@ class Test (Thread):
tdLog.info("reset query cache")
tdSql.execute("reset query cache")
tdLog.sleep(1)
return 0
def reset_database(self):
tdLog.info("reset_database")
global last_tb
global last_stb
global written
global killed
tdDnodes.forcestop(1)
killed = True
tdDnodes.deploy(1)
tdDnodes.start(1)
tdSql.prepare()
last_tb = ""
last_stb = ""
written = 0
killed = False
return 0
def delete_datafiles(self):
tdLog.info("delete_data_files")
global last_tb
global last_stb
global written
global killed
dnodesDir = tdDnodes.getDnodesRootDir()
tdDnodes.forcestop(1)
killed = True
dataDir = dnodesDir + '/dnode1/data/*'
deleteCmd = 'rm -rf %s' % dataDir
os.system(deleteCmd)
tdDnodes.start(1)
tdSql.prepare()
last_tb = ""
last_stb = ""
written = 0
tdDnodes.start(1)
tdSql.prepare()
killed = False
return 0
def run(self):
dataOp = {
1: self.insert_data,
......@@ -235,6 +300,8 @@ class Test (Thread):
7: self.reset_database,
8: self.delete_datafiles,
9: self.drop_stable,
10: self.alter_table_to_add_col,
11: self.alter_table_to_drop_col,
}
queryOp = {
......@@ -247,16 +314,28 @@ class Test (Thread):
self.dataEvent.wait()
tdLog.notice("first thread")
randDataOp = random.randint(1, 1)
dataOp.get(randDataOp, lambda: "ERROR")()
self.dataEvent.clear()
self.queryEvent.clear()
self.dbEvent.set()
ret1 = dataOp.get(randDataOp, lambda: "ERROR")()
if ret1 == -1:
self.q.put(-1)
tdLog.exit("first thread failed")
else:
self.q.put(1)
if (self.q.get() != -2):
self.dataEvent.clear()
self.queryEvent.clear()
self.dbEvent.set()
else:
self.q.put(-1)
tdLog.exit("second thread failed, first thread exit too")
elif (self.threadId == 2):
while True:
self.dbEvent.wait()
tdLog.notice("second thread")
randDbOp = random.randint(1, 9)
randDbOp = random.randint(1, 11)
dbOp.get(randDbOp, lambda: "ERROR")()
self.dbEvent.clear()
self.dataEvent.clear()
......@@ -298,6 +377,10 @@ class TDTestCase:
test2.join()
test3.join()
while not q.empty():
if (q.get() != 0):
tdLog.exit("failed to end of test")
tdLog.info("end of test")
def stop(self):
......
......@@ -14,6 +14,7 @@
import sys
import random
import threading
import queue
from util.log import *
from util.cases import *
......@@ -24,13 +25,16 @@ last_tb = ""
last_stb = ""
written = 0
last_timestamp = 0
colAdded = False
killed = False
class Test (threading.Thread):
def __init__(self, threadId, name):
def __init__(self, threadId, name, q):
threading.Thread.__init__(self)
self.threadId = threadId
self.name = name
self.q = q
self.threadLock = threading.Lock()
......@@ -38,11 +42,12 @@ class Test (threading.Thread):
tdLog.info("create_table")
global last_tb
global written
global killed
current_tb = "tb%d" % int(round(time.time() * 1000))
if (current_tb == last_tb):
return
return 0
else:
tdLog.info("will create table %s" % current_tb)
......@@ -52,8 +57,14 @@ class Test (threading.Thread):
current_tb)
last_tb = current_tb
written = 0
killed = False
except Exception as e:
tdLog.info(repr(e))
tdLog.info("killed: %d error: %s" % (killed, e.args[0]))
if killed and (e.args[0] == 'network unavailable'):
tdLog.info("database killed, expect failed")
return 0
return -1
return 0
def insert_data(self):
tdLog.info("insert_data")
......@@ -75,22 +86,34 @@ class Test (threading.Thread):
for j in range(0, insertRows):
if (last_tb == ""):
tdLog.info("no table, return")
return
tdSql.execute(
'insert into %s values (%d + %da, %d, "test")' %
(last_tb, start_time, last_timestamp, last_timestamp))
written = written + 1
last_timestamp = last_timestamp + 1
return 0
try:
tdSql.execute(
'insert into %s values (%d + %da, %d, "test")' %
(last_tb, start_time, last_timestamp, last_timestamp))
written = written + 1
last_timestamp = last_timestamp + 1
except Exception as e:
if killed:
tdLog.info(
"database killed, expect failed %s" %
e.args[0])
return 0
tdLog.info(repr(e))
return -1
return 0
def query_data(self):
tdLog.info("query_data")
global last_tb
global written
global killed
if (written > 0):
if not killed and last_tb != "":
tdLog.info("query data from table")
tdSql.query("select * from %s" % last_tb)
tdSql.checkRows(written)
return 0
def create_stable(self):
tdLog.info("create_stable")
......@@ -101,9 +124,7 @@ class Test (threading.Thread):
current_stb = "stb%d" % int(round(time.time() * 1000))
if (current_stb == last_stb):
return
else:
if (current_stb != last_stb):
tdLog.info("will create stable %s" % current_stb)
tdLog.info(
'create table %s(ts timestamp, c1 int, c2 nchar(10)) tags (t1 int, t2 nchar(10))' %
......@@ -131,6 +152,8 @@ class Test (threading.Thread):
written = written + 1
last_timestamp = last_timestamp + 1
return 0
def drop_stable(self):
tdLog.info("drop_stable")
global last_stb
......@@ -139,31 +162,63 @@ class Test (threading.Thread):
if (last_stb == ""):
tdLog.info("no super table")
return
else:
tdLog.info("will drop last super table")
tdLog.info("will drop last super table %s" % last_stb)
tdSql.execute('drop table %s' % last_stb)
last_stb = ""
last_tb = ""
written = 0
return 0
def alter_table_to_add_col(self):
tdLog.info("alter_table_to_add_col")
global last_stb
global colAdded
if last_stb != "" and not colAdded:
tdSql.execute(
"alter table %s add column col binary(20)" %
last_stb)
colAdded = True
return 0
def alter_table_to_drop_col(self):
tdLog.info("alter_table_to_drop_col")
global last_stb
global colAdded
if last_stb != "" and colAdded:
tdSql.execute("alter table %s drop column col" % last_stb)
colAdded = False
return 0
def restart_database(self):
tdLog.info("restart_database")
global last_tb
global written
global killed
tdDnodes.stop(1)
killed = True
tdDnodes.start(1)
# tdLog.sleep(5)
tdLog.sleep(10)
killed = False
return 0
def force_restart_database(self):
tdLog.info("force_restart_database")
global last_tb
global written
global killed
tdDnodes.forcestop(1)
last_tb = ""
written = 0
killed = True
tdDnodes.start(1)
# tdLog.sleep(10)
killed = False
return 0
def drop_table(self):
tdLog.info("drop_table")
......@@ -176,6 +231,7 @@ class Test (threading.Thread):
tdSql.execute("drop table %s" % last_tb)
last_tb = ""
written = 0
return 0
def query_data_from_stable(self):
tdLog.info("query_data_from_stable")
......@@ -183,10 +239,10 @@ class Test (threading.Thread):
if (last_stb == ""):
tdLog.info("no super table")
return
else:
tdLog.info("will query data from super table")
tdSql.execute('select * from %s' % last_stb)
return 0
def reset_query_cache(self):
tdLog.info("reset_query_cache")
......@@ -196,39 +252,45 @@ class Test (threading.Thread):
tdLog.info("reset query cache")
tdSql.execute("reset query cache")
# tdLog.sleep(1)
return 0
def reset_database(self):
tdLog.info("reset_database")
global last_tb
global last_stb
global written
global killed
tdDnodes.forcestop(1)
killed = True
tdDnodes.deploy(1)
tdDnodes.start(1)
tdSql.prepare()
last_tb = ""
last_stb = ""
written = 0
killed = False
return 0
def delete_datafiles(self):
tdLog.info("delete_data_files")
global last_tb
global last_stb
global written
global killed
dnodesDir = tdDnodes.getDnodesRootDir()
tdDnodes.forcestop(1)
killed = True
dataDir = dnodesDir + '/dnode1/data/*'
deleteCmd = 'rm -rf %s' % dataDir
os.system(deleteCmd)
tdDnodes.start(1)
tdSql.prepare()
last_tb = ""
last_stb = ""
written = 0
tdDnodes.start(1)
tdSql.prepare()
killed = False
return 0
def run(self):
dataOp = {
1: self.insert_data,
......@@ -246,6 +308,8 @@ class Test (threading.Thread):
7: self.reset_database,
8: self.delete_datafiles,
9: self.drop_stable,
10: self.alter_table_to_add_col,
11: self.alter_table_to_drop_col,
}
if (self.threadId == 1):
......@@ -253,16 +317,38 @@ class Test (threading.Thread):
self.threadLock.acquire()
tdLog.notice("first thread")
randDataOp = random.randint(1, 3)
dataOp.get(randDataOp, lambda: "ERROR")()
self.threadLock.release()
ret1 = dataOp.get(randDataOp, lambda: "ERROR")()
if ret1 == -1:
self.q.put(-1)
tdLog.exit("first thread failed")
else:
self.q.put(1)
if (self.q.get() != -2):
self.threadLock.release()
else:
self.q.put(-1)
tdLog.exit("second thread failed, first thread exit too")
elif (self.threadId == 2):
while True:
tdLog.notice("second thread")
self.threadLock.acquire()
randDbOp = random.randint(1, 9)
dbOp.get(randDbOp, lambda: "ERROR")()
self.threadLock.release()
tdLog.notice("second thread")
randDbOp = random.randint(1, 11)
ret2 = dbOp.get(randDbOp, lambda: "ERROR")()
if ret2 == -1:
self.q.put(-2)
tdLog.exit("second thread failed")
else:
self.q.put(2)
if (self.q.get() != -1):
self.threadLock.release()
else:
self.q.put(-2)
tdLog.exit("first thread failed, second exit too")
class TDTestCase:
......@@ -273,14 +359,19 @@ class TDTestCase:
def run(self):
tdSql.prepare()
test1 = Test(1, "data operation")
test2 = Test(2, "db operation")
q = queue.Queue()
test1 = Test(1, "data operation", q)
test2 = Test(2, "db operation", q)
test1.start()
test2.start()
test1.join()
test2.join()
while not q.empty():
if (q.get() != 0):
tdLog.exit("failed to end of test")
tdLog.info("end of test")
def stop(self):
......
......@@ -25,6 +25,7 @@ class Test:
self.last_tb = ""
self.last_stb = ""
self.written = 0
self.colAdded = False
def create_table(self):
tdLog.info("create_table")
......@@ -39,6 +40,7 @@ class Test:
current_tb)
self.last_tb = current_tb
self.written = 0
self.colAdded = False
def insert_data(self):
tdLog.info("insert_data")
......@@ -50,28 +52,52 @@ class Test:
insertRows = 10
tdLog.info("insert %d rows to %s" % (insertRows, self.last_tb))
for i in range(0, insertRows):
ret = tdSql.execute(
'insert into %s values (now + %dm, %d, "%s")' %
(self.last_tb, i, i, "->" + str(i)))
if self.colAdded:
ret = tdSql.execute(
'insert into %s values (now + %dm, %d, "%s", "%s")' %
(self.last_tb, i, i, "->" + str(i)), "col")
else:
ret = tdSql.execute(
'insert into %s values (now + %dm, %d, "%s")' %
(self.last_tb, i, i, "->" + str(i)))
self.written = self.written + 1
tdLog.info("insert earlier data")
tdSql.execute(
'insert into %s values (now - 5m , 10, " - 5m")' %
self.last_tb)
self.written = self.written + 1
tdSql.execute(
'insert into %s values (now - 6m , 10, " - 6m")' %
self.last_tb)
self.written = self.written + 1
tdSql.execute(
'insert into %s values (now - 7m , 10, " - 7m")' %
self.last_tb)
self.written = self.written + 1
tdSql.execute(
'insert into %s values (now - 8m , 10, " - 8m")' %
self.last_tb)
self.written = self.written + 1
if self.colAdded:
tdSql.execute(
'insert into %s values (now - 5m , 10, " - 5m", "col")' %
self.last_tb)
self.written = self.written + 1
tdSql.execute(
'insert into %s values (now - 6m , 10, " - 6m", "col")' %
self.last_tb)
self.written = self.written + 1
tdSql.execute(
'insert into %s values (now - 7m , 10, " - 7m", "col")' %
self.last_tb)
self.written = self.written + 1
tdSql.execute(
'insert into %s values (now - 8m , 10, " - 8m", "col")' %
self.last_tb)
self.written = self.written + 1
else:
tdSql.execute(
'insert into %s values (now - 5m , 10, " - 5m")' %
self.last_tb)
self.written = self.written + 1
tdSql.execute(
'insert into %s values (now - 6m , 10, " - 6m")' %
self.last_tb)
self.written = self.written + 1
tdSql.execute(
'insert into %s values (now - 7m , 10, " - 7m")' %
self.last_tb)
self.written = self.written + 1
tdSql.execute(
'insert into %s values (now - 8m , 10, " - 8m")' %
self.last_tb)
self.written = self.written + 1
def query_data(self):
tdLog.info("query_data")
......@@ -88,21 +114,48 @@ class Test:
return
else:
tdLog.info("will create stable %s" % current_stb)
db = "db"
tdSql.execute("drop database if exists %s" % (db))
tdSql.execute("reset query cache")
tdSql.execute("create database %s maxrows 200 maxtables 30" % (db))
tdSql.execute("use %s" % (db))
tdSql.execute(
'create table %s(ts timestamp, c1 int, c2 nchar(10)) tags (t1 int, t2 nchar(10))' %
current_stb)
self.last_stb = current_stb
self.colAdded = False
for k in range(1, 300):
current_tb = "tb%d" % int(round(time.time() * 1000))
sqlcmd = "create table %s using %s tags (1, 'test')" % (
current_tb, self.last_stb)
tdSql.execute(sqlcmd)
self.last_tb = current_tb
self.written = 0
for j in range(1, 100):
tdSql.execute(
"insert into %s values (now + %da, 27, 'wsnchar')" %
(self.last_tb, j))
self.written = self.written + 1
def alter_table_to_add_col(self):
tdLog.info("alter_table_to_add_col")
if self.last_stb != "" and not self.colAdded:
tdSql.execute(
"alter table %s add column col binary(20)" %
self.last_stb)
self.colAdded = True
current_tb = "tb%d" % int(round(time.time() * 1000))
sqlcmd = "create table %s using %s tags (1, 'test')" %(current_tb, self.last_stb)
tdSql.execute(sqlcmd)
self.last_tb = current_tb
self.written = 0
def alter_table_to_drop_col(self):
tdLog.info("alter_table_to_drop_col")
tdSql.execute(
"insert into %s values (now, 27, 'wsnchar')" %
self.last_tb)
self.written = self.written + 1
if self.last_stb != "" and self.colAdded:
tdSql.execute("alter table %s drop column col" % self.last_stb)
self.colAdded = False
def drop_stable(self):
tdLog.info("drop_stable")
......@@ -126,16 +179,16 @@ class Test:
tdSql.execute('select * from %s' % self.last_stb)
def restart_database(self):
tdLog.info("restart_databae")
tdLog.info("restart_database")
tdDnodes.stop(1)
tdDnodes.start(1)
tdLog.sleep(5)
tdLog.sleep(10)
def force_restart_database(self):
tdLog.info("force_restart_database")
tdDnodes.forcestop(1)
tdDnodes.start(1)
tdLog.sleep(5)
tdLog.sleep(10)
tdSql.prepare()
self.last_tb = ""
self.last_stb = ""
......@@ -161,7 +214,7 @@ class Test:
self.last_tb = ""
self.written = 0
tdDnodes.start(1)
tdLog.sleep(5)
tdLog.sleep(10)
tdSql.prepare()
self.last_tb = ""
self.last_stb = ""
......@@ -209,10 +262,12 @@ class TDTestCase:
10: test.delete_datafiles,
11: test.query_data_from_stable,
12: test.drop_stable,
13: test.alter_table_to_add_col,
14: test.alter_table_to_drop_col,
}
for x in range(1, 1000):
r = random.randint(1, 12)
r = random.randint(1, 14)
tdLog.notice("iteration %d run func %d" % (x, r))
switch.get(r, lambda: "ERROR")()
......
......@@ -126,7 +126,7 @@ class Test:
def delete_datafiles(self):
tdLog.info("delete data files")
dnodesDir = tdDnodes.getDnodesRootDir()
dataDir = dnodesDir + '/dnode1/*'
dataDir = dnodesDir + '/dnode1/data/*'
deleteCmd = 'rm -rf %s' % dataDir
os.system(deleteCmd)
......
......@@ -41,16 +41,12 @@ class TDSql:
def prepare(self):
tdLog.info("prepare database:db")
s = 'reset query cache'
print(s)
self.cursor.execute(s)
s = 'drop database if exists db'
print(s)
self.cursor.execute(s)
s = 'create database db'
print(s)
self.cursor.execute(s)
s = 'use db'
print(s)
self.cursor.execute(s)
def error(self, sql):
......@@ -74,7 +70,6 @@ class TDSql:
def query(self, sql):
self.sql = sql
print(sql)
self.cursor.execute(sql)
self.queryResult = self.cursor.fetchall()
self.queryRows = len(self.queryResult)
......@@ -191,7 +186,6 @@ class TDSql:
def execute(self, sql):
self.sql = sql
print(sql)
self.affectedRows = self.cursor.execute(sql)
return self.affectedRows
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册