提交 304b1374 编写于 作者: L liuyq-617

[TD-2841]<test>sql record & replay

上级 3d407ab5
...@@ -40,7 +40,7 @@ class ConcurrentInquiry: ...@@ -40,7 +40,7 @@ class ConcurrentInquiry:
# stableNum = 2,subtableNum = 1000,insertRows = 100): # stableNum = 2,subtableNum = 1000,insertRows = 100):
def __init__(self,ts,host,user,password,dbname, def __init__(self,ts,host,user,password,dbname,
stb_prefix,subtb_prefix,n_Therads,r_Therads,probabilities,loop, stb_prefix,subtb_prefix,n_Therads,r_Therads,probabilities,loop,
stableNum ,subtableNum ,insertRows ,mix_table): stableNum ,subtableNum ,insertRows ,mix_table, replay):
self.n_numOfTherads = n_Therads self.n_numOfTherads = n_Therads
self.r_numOfTherads = r_Therads self.r_numOfTherads = r_Therads
self.ts=ts self.ts=ts
...@@ -65,6 +65,7 @@ class ConcurrentInquiry: ...@@ -65,6 +65,7 @@ class ConcurrentInquiry:
self.mix_table = mix_table self.mix_table = mix_table
self.max_ts = datetime.datetime.now() self.max_ts = datetime.datetime.now()
self.min_ts = datetime.datetime.now() - datetime.timedelta(days=5) self.min_ts = datetime.datetime.now() - datetime.timedelta(days=5)
self.replay = replay
def SetThreadsNum(self,num): def SetThreadsNum(self,num):
self.numOfTherads=num self.numOfTherads=num
...@@ -412,7 +413,7 @@ class ConcurrentInquiry: ...@@ -412,7 +413,7 @@ class ConcurrentInquiry:
) )
cl = conn.cursor() cl = conn.cursor()
cl.execute("use %s;" % self.dbname) cl.execute("use %s;" % self.dbname)
fo = open('bak_sql_n_%d'%threadID,'w+')
print("Thread %d: starting" % threadID) print("Thread %d: starting" % threadID)
loop = self.loop loop = self.loop
while loop: while loop:
...@@ -423,6 +424,7 @@ class ConcurrentInquiry: ...@@ -423,6 +424,7 @@ class ConcurrentInquiry:
else: else:
sql=self.gen_query_join() sql=self.gen_query_join()
print("sql is ",sql) print("sql is ",sql)
fo.write(sql+'\n')
start = time.time() start = time.time()
cl.execute(sql) cl.execute(sql)
cl.fetchall() cl.fetchall()
...@@ -438,13 +440,49 @@ class ConcurrentInquiry: ...@@ -438,13 +440,49 @@ class ConcurrentInquiry:
exit(-1) exit(-1)
loop -= 1 loop -= 1
if loop == 0: break if loop == 0: break
fo.close()
cl.close() cl.close()
conn.close() conn.close()
print("Thread %d: finishing" % threadID) print("Thread %d: finishing" % threadID)
def query_thread_nr(self,threadID): #使用原生python接口进行重放
host = self.host
user = self.user
password = self.password
conn = taos.connect(
host,
user,
password,
)
cl = conn.cursor()
cl.execute("use %s;" % self.dbname)
replay_sql = []
with open('bak_sql_n_%d'%threadID,'r') as f:
replay_sql = f.readlines()
print("Replay Thread %d: starting" % threadID)
for sql in replay_sql:
try:
print("sql is ",sql)
start = time.time()
cl.execute(sql)
cl.fetchall()
end = time.time()
print("time cost :",end-start)
except Exception as e:
print('-'*40)
print(
"Failure thread%d, sql: %s \nexception: %s" %
(threadID, str(sql),str(e)))
err_uec='Unable to establish connection'
if err_uec in str(e) and loop >0:
exit(-1)
cl.close()
conn.close()
print("Replay Thread %d: finishing" % threadID)
def query_thread_r(self,threadID): #使用rest接口查询 def query_thread_r(self,threadID): #使用rest接口查询
print("Thread %d: starting" % threadID) print("Thread %d: starting" % threadID)
fo = open('bak_sql_r_%d'%threadID,'w+')
loop = self.loop loop = self.loop
while loop: while loop:
try: try:
...@@ -453,6 +491,7 @@ class ConcurrentInquiry: ...@@ -453,6 +491,7 @@ class ConcurrentInquiry:
else: else:
sql=self.gen_query_join() sql=self.gen_query_join()
print("sql is ",sql) print("sql is ",sql)
fo.write(sql+'\n')
start = time.time() start = time.time()
self.rest_query(sql) self.rest_query(sql)
end = time.time() end = time.time()
...@@ -467,20 +506,53 @@ class ConcurrentInquiry: ...@@ -467,20 +506,53 @@ class ConcurrentInquiry:
exit(-1) exit(-1)
loop -= 1 loop -= 1
if loop == 0: break if loop == 0: break
fo.close()
print("Thread %d: finishing" % threadID) print("Thread %d: finishing" % threadID)
def query_thread_rr(self,threadID): #使用rest接口重放
print("Replay Thread %d: starting" % threadID)
replay_sql = []
with open('bak_sql_r_%d'%threadID,'r') as f:
replay_sql = f.readlines()
for sql in replay_sql:
try:
print("sql is ",sql)
start = time.time()
self.rest_query(sql)
end = time.time()
print("time cost :",end-start)
except Exception as e:
print('-'*40)
print(
"Failure thread%d, sql: %s \nexception: %s" %
(threadID, str(sql),str(e)))
err_uec='Unable to establish connection'
if err_uec in str(e) and loop >0:
exit(-1)
print("Replay Thread %d: finishing" % threadID)
def run(self): def run(self):
print(self.n_numOfTherads,self.r_numOfTherads) print(self.n_numOfTherads,self.r_numOfTherads)
threads = [] threads = []
for i in range(self.n_numOfTherads): if self.replay: #whether replay
thread = threading.Thread(target=self.query_thread_n, args=(i,)) for i in range(self.n_numOfTherads):
threads.append(thread) thread = threading.Thread(target=self.query_thread_nr, args=(i,))
thread.start() threads.append(thread)
for i in range(self.r_numOfTherads): thread.start()
thread = threading.Thread(target=self.query_thread_r, args=(i,)) for i in range(self.r_numOfTherads):
threads.append(thread) thread = threading.Thread(target=self.query_thread_rr, args=(i,))
thread.start() threads.append(thread)
thread.start()
else:
for i in range(self.n_numOfTherads):
thread = threading.Thread(target=self.query_thread_n, args=(i,))
threads.append(thread)
thread.start()
for i in range(self.r_numOfTherads):
thread = threading.Thread(target=self.query_thread_r, args=(i,))
threads.append(thread)
thread.start()
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument( parser.add_argument(
...@@ -595,13 +667,20 @@ parser.add_argument( ...@@ -595,13 +667,20 @@ parser.add_argument(
default=0, default=0,
type=int, type=int,
help='0:stable & substable ,1:subtable ,2:stable (default: 0)') help='0:stable & substable ,1:subtable ,2:stable (default: 0)')
parser.add_argument(
'-R',
'--replay',
action='store',
default=0,
type=int,
help='0:not replay ,1:replay (default: 0)')
args = parser.parse_args() args = parser.parse_args()
q = ConcurrentInquiry( q = ConcurrentInquiry(
args.ts,args.host_name,args.user,args.password,args.db_name, args.ts,args.host_name,args.user,args.password,args.db_name,
args.stb_name_prefix,args.subtb_name_prefix,args.number_of_native_threads,args.number_of_rest_threads, args.stb_name_prefix,args.subtb_name_prefix,args.number_of_native_threads,args.number_of_rest_threads,
args.probabilities,args.loop_per_thread,args.number_of_stables,args.number_of_tables ,args.number_of_records, args.probabilities,args.loop_per_thread,args.number_of_stables,args.number_of_tables ,args.number_of_records,
args.mix_stable_subtable ) args.mix_stable_subtable, args.replay )
if args.create_table: if args.create_table:
q.gen_data() q.gen_data()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册