提交 2591b988 编写于 作者: S Shuduo Sang

support queue in multi-thread version.

上级 d26d41e7
......@@ -24,6 +24,7 @@ last_tb = ""
last_stb = ""
written = 0
last_timestamp = 0
colAdded = False
class Test (Thread):
......@@ -140,6 +141,26 @@ class Test (Thread):
last_tb = ""
written = 0
def alter_table_to_add_col(self):
tdLog.info("alter_table_to_add_col")
global last_stb
global colAdded
if last_stb != "" and colAdded == False:
tdSql.execute(
"alter table %s add column col binary(20)" %
last_stb)
colAdded = True
def alter_table_to_drop_col(self):
tdLog.info("alter_table_to_drop_col")
global last_stb
global colAdded
if last_stb != "" and colAdded:
tdSql.execute("alter table %s drop column col" % last_stb)
colAdded = False
def restart_database(self):
tdLog.info("restart_database")
global last_tb
......@@ -235,6 +256,8 @@ class Test (Thread):
7: self.reset_database,
8: self.delete_datafiles,
9: self.drop_stable,
10: self.alter_table_to_add_col,
11: self.alter_table_to_drop_col,
}
queryOp = {
......@@ -256,7 +279,7 @@ class Test (Thread):
while True:
self.dbEvent.wait()
tdLog.notice("second thread")
randDbOp = random.randint(1, 9)
randDbOp = random.randint(1, 11)
dbOp.get(randDbOp, lambda: "ERROR")()
self.dbEvent.clear()
self.dataEvent.clear()
......
......@@ -14,6 +14,7 @@
import sys
import random
import threading
import queue
from util.log import *
from util.cases import *
......@@ -24,13 +25,16 @@ last_tb = ""
last_stb = ""
written = 0
last_timestamp = 0
colAdded = False
killed = False
class Test (threading.Thread):
def __init__(self, threadId, name):
def __init__(self, threadId, name, q):
threading.Thread.__init__(self)
self.threadId = threadId
self.name = name
self.q = q
self.threadLock = threading.Lock()
......@@ -38,11 +42,12 @@ class Test (threading.Thread):
tdLog.info("create_table")
global last_tb
global written
global killed
current_tb = "tb%d" % int(round(time.time() * 1000))
if (current_tb == last_tb):
return
return 0
else:
tdLog.info("will create table %s" % current_tb)
......@@ -52,8 +57,14 @@ class Test (threading.Thread):
current_tb)
last_tb = current_tb
written = 0
killed = False
except Exception as e:
tdLog.info(repr(e))
tdLog.info("killed: %d error: %s" % (killed, e.args[0]))
if killed and (e.args[0] == 'network unavailable'):
tdLog.info("database killed, expect failed")
return 0
return -1
return 0
def insert_data(self):
tdLog.info("insert_data")
......@@ -75,22 +86,34 @@ class Test (threading.Thread):
for j in range(0, insertRows):
if (last_tb == ""):
tdLog.info("no table, return")
return
tdSql.execute(
'insert into %s values (%d + %da, %d, "test")' %
(last_tb, start_time, last_timestamp, last_timestamp))
written = written + 1
last_timestamp = last_timestamp + 1
return 0
try:
tdSql.execute(
'insert into %s values (%d + %da, %d, "test")' %
(last_tb, start_time, last_timestamp, last_timestamp))
written = written + 1
last_timestamp = last_timestamp + 1
except Exception as e:
if killed:
tdLog.info(
"database killed, expect failed %s" %
e.args[0])
return 0
tdLog.info(repr(e))
return -1
return 0
def query_data(self):
tdLog.info("query_data")
global last_tb
global written
global killed
if (written > 0):
if not killed and last_tb != "":
tdLog.info("query data from table")
tdSql.query("select * from %s" % last_tb)
tdSql.checkRows(written)
return 0
def create_stable(self):
tdLog.info("create_stable")
......@@ -101,9 +124,7 @@ class Test (threading.Thread):
current_stb = "stb%d" % int(round(time.time() * 1000))
if (current_stb == last_stb):
return
else:
if (current_stb != last_stb):
tdLog.info("will create stable %s" % current_stb)
tdLog.info(
'create table %s(ts timestamp, c1 int, c2 nchar(10)) tags (t1 int, t2 nchar(10))' %
......@@ -131,6 +152,8 @@ class Test (threading.Thread):
written = written + 1
last_timestamp = last_timestamp + 1
return 0
def drop_stable(self):
tdLog.info("drop_stable")
global last_stb
......@@ -139,31 +162,63 @@ class Test (threading.Thread):
if (last_stb == ""):
tdLog.info("no super table")
return
else:
tdLog.info("will drop last super table")
tdLog.info("will drop last super table %s" % last_stb)
tdSql.execute('drop table %s' % last_stb)
last_stb = ""
last_tb = ""
written = 0
return 0
def alter_table_to_add_col(self):
tdLog.info("alter_table_to_add_col")
global last_stb
global colAdded
if last_stb != "" and colAdded == False:
tdSql.execute(
"alter table %s add column col binary(20)" %
last_stb)
colAdded = True
return 0
def alter_table_to_drop_col(self):
tdLog.info("alter_table_to_drop_col")
global last_stb
global colAdded
if last_stb != "" and not colAdded:
tdSql.execute("alter table %s drop column col" % last_stb)
colAdded = False
return 0
def restart_database(self):
tdLog.info("restart_database")
global last_tb
global written
global killed
tdDnodes.stop(1)
killed = True
tdDnodes.start(1)
# tdLog.sleep(5)
tdLog.sleep(10)
killed = False
return 0
def force_restart_database(self):
tdLog.info("force_restart_database")
global last_tb
global written
global killed
tdDnodes.forcestop(1)
last_tb = ""
written = 0
killed = True
tdDnodes.start(1)
# tdLog.sleep(10)
killed = False
return 0
def drop_table(self):
tdLog.info("drop_table")
......@@ -176,6 +231,7 @@ class Test (threading.Thread):
tdSql.execute("drop table %s" % last_tb)
last_tb = ""
written = 0
return 0
def query_data_from_stable(self):
tdLog.info("query_data_from_stable")
......@@ -183,10 +239,10 @@ class Test (threading.Thread):
if (last_stb == ""):
tdLog.info("no super table")
return
else:
tdLog.info("will query data from super table")
tdSql.execute('select * from %s' % last_stb)
return 0
def reset_query_cache(self):
tdLog.info("reset_query_cache")
......@@ -196,38 +252,44 @@ class Test (threading.Thread):
tdLog.info("reset query cache")
tdSql.execute("reset query cache")
# tdLog.sleep(1)
return 0
def reset_database(self):
tdLog.info("reset_database")
global last_tb
global last_stb
global written
global killed
tdDnodes.forcestop(1)
killed = True
tdDnodes.deploy(1)
tdDnodes.start(1)
tdSql.prepare()
last_tb = ""
last_stb = ""
written = 0
killed = False
return 0
def delete_datafiles(self):
tdLog.info("delete_data_files")
global last_tb
global last_stb
global written
global killed
dnodesDir = tdDnodes.getDnodesRootDir()
tdDnodes.forcestop(1)
dataDir = dnodesDir + '/dnode1/data/*'
deleteCmd = 'rm -rf %s' % dataDir
os.system(deleteCmd)
tdDnodes.start(1)
tdSql.prepare()
last_tb = ""
last_stb = ""
written = 0
killed = True
tdDnodes.start(1)
tdSql.prepare()
killed = False
return 0
def run(self):
dataOp = {
......@@ -246,6 +308,8 @@ class Test (threading.Thread):
7: self.reset_database,
8: self.delete_datafiles,
9: self.drop_stable,
10: self.alter_table_to_add_col,
11: self.alter_table_to_drop_col,
}
if (self.threadId == 1):
......@@ -253,16 +317,38 @@ class Test (threading.Thread):
self.threadLock.acquire()
tdLog.notice("first thread")
randDataOp = random.randint(1, 3)
dataOp.get(randDataOp, lambda: "ERROR")()
self.threadLock.release()
ret1 = dataOp.get(randDataOp, lambda: "ERROR")()
if ret1 == -1:
self.q.put(-1)
tdLog.exit("first thread failed")
else:
self.q.put(1)
if (self.q.get() != -2):
self.threadLock.release()
else:
self.q.put(-1)
tdLog.exit("second thread failed, first thread exit too")
elif (self.threadId == 2):
while True:
tdLog.notice("second thread")
self.threadLock.acquire()
randDbOp = random.randint(1, 9)
dbOp.get(randDbOp, lambda: "ERROR")()
self.threadLock.release()
tdLog.notice("second thread")
randDbOp = random.randint(1, 11)
ret2 = dbOp.get(randDbOp, lambda: "ERROR")()
if ret2 == -1:
self.q.put(-2)
tdLog.exit("second thread failed")
else:
self.q.put(2)
if (self.q.get() != -1):
self.threadLock.release()
else:
self.q.put(-2)
tdLog.exit("first thread failed, second exit too")
class TDTestCase:
......@@ -273,14 +359,19 @@ class TDTestCase:
def run(self):
tdSql.prepare()
test1 = Test(1, "data operation")
test2 = Test(2, "db operation")
q = queue.Queue()
test1 = Test(1, "data operation", q)
test2 = Test(2, "db operation", q)
test1.start()
test2.start()
test1.join()
test2.join()
while not q.empty():
if (q.get() != 0):
tdLog.exit("failed to end of test")
tdLog.info("end of test")
def stop(self):
......
......@@ -126,7 +126,7 @@ class Test:
def delete_datafiles(self):
tdLog.info("delete data files")
dnodesDir = tdDnodes.getDnodesRootDir()
dataDir = dnodesDir + '/dnode1/*'
dataDir = dnodesDir + '/dnode1/data/*'
deleteCmd = 'rm -rf %s' % dataDir
os.system(deleteCmd)
......
......@@ -41,16 +41,12 @@ class TDSql:
def prepare(self):
tdLog.info("prepare database:db")
s = 'reset query cache'
print(s)
self.cursor.execute(s)
s = 'drop database if exists db'
print(s)
self.cursor.execute(s)
s = 'create database db'
print(s)
self.cursor.execute(s)
s = 'use db'
print(s)
self.cursor.execute(s)
def error(self, sql):
......@@ -74,7 +70,6 @@ class TDSql:
def query(self, sql):
self.sql = sql
print(sql)
self.cursor.execute(sql)
self.queryResult = self.cursor.fetchall()
self.queryRows = len(self.queryResult)
......@@ -191,7 +186,6 @@ class TDSql:
def execute(self, sql):
self.sql = sql
print(sql)
self.affectedRows = self.cursor.execute(sql)
return self.affectedRows
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册