diff --git a/tests/pytest/concurrent_inquiry.py b/tests/pytest/concurrent_inquiry.py index 52ae33cfa7918d1b138657d6446bcf36f0de6ee9..03a7fdb86a06839342daebb2df29b6781d8951dd 100644 --- a/tests/pytest/concurrent_inquiry.py +++ b/tests/pytest/concurrent_inquiry.py @@ -17,6 +17,7 @@ import json import time import random import requests +import argparse from requests.auth import HTTPBasicAuth func_list=['avg','count','twa','sum','stddev','leastsquares','min', 'max','first','last','top','bottom','percentile','apercentile', @@ -32,19 +33,33 @@ condition_list=[ ] where_list = ['_c0>now-10d',' <50'," like \'%a%\'"] class ConcurrentInquiry: - def __init__(self,n_Therads=25,r_Therads=25): + # def __init__(self,ts=1500000001000,host='127.0.0.1',user='root',password='taosdata',dbname='test', + # stb_prefix='st',subtb_prefix='t',n_Therads=10,r_Therads=10,probabilities=0.05,loop=5, + # stableNum = 2,subtableNum = 1000,insertRows = 100): + def __init__(self,ts,host,user,password,dbname, + stb_prefix,subtb_prefix,n_Therads,r_Therads,probabilities,loop, + stableNum ,subtableNum ,insertRows ): self.n_numOfTherads = n_Therads self.r_numOfTherads = r_Therads - self.ts=1500000001000 - self.dbname='test' + self.ts=ts + self.host = host + self.user = user + self.password = password + self.dbname=dbname + self.stb_prefix = stb_prefix + self.subtb_prefix = subtb_prefix self.stb_list=[] self.subtb_list=[] self.stb_stru_list=[] self.subtb_stru_list=[] self.stb_tag_list=[] self.subtb_tag_list=[] - self.probabilities = [0.05,0.95] + self.probabilities = [probabilities,1-probabilities] self.ifjoin = [0,1] + self.loop = loop + self.stableNum = stableNum + self.subtableNum = subtableNum + self.insertRows = insertRows def SetThreadsNum(self,num): self.numOfTherads=num @@ -88,9 +103,9 @@ class ConcurrentInquiry: self.subtb_tag_list.append(tag) def get_full(self): #获取所有的表、表结构 - host = "127.0.0.1" - user = "root" - password = "taosdata" + host = self.host + user = self.user + password = self.password conn = taos.connect( host, user, @@ -117,7 +132,7 @@ class ConcurrentInquiry: return 'where '+random.choice([' and ',' or ']).join(l) def con_interval(self,tlist,col_list,tag_list): - interval = 'interval(' + str(random.randint(0,100)) + random.choice(['a','s','d','w','n','y']) + ')' + interval = 'interval(' + str(random.randint(0,20)) + random.choice(['a','s','d','w','n','y']) + ')' return interval def con_limit(self,tlist,col_list,tag_list): @@ -133,7 +148,7 @@ class ConcurrentInquiry: def con_group(self,tlist,col_list,tag_list): rand_tag = random.randint(0,5) rand_col = random.randint(0,1) - return 'group by '+','.join(random.sample(col_list,rand_col))+','.join(random.sample(tag_list,rand_tag)) + return 'group by '+','.join(random.sample(col_list,rand_col) + random.sample(tag_list,rand_tag)) def con_order(self,tlist,col_list,tag_list): return 'order by '+random.choice(tlist) @@ -165,8 +180,10 @@ class ConcurrentInquiry: random.shuffle(func_list) sel_col_list=[] col_rand=random.randint(0,len(col_list)) + loop = 0 for i,j in zip(col_list[0:col_rand],func_list): #决定每个被查询col的函数 - alias = ' as '+ str(i) + alias = ' as '+ 'taos%d ' % loop + loop += 1 pick_func = '' if j == 'leastsquares': pick_func=j+'('+i+',1,1)' @@ -185,7 +202,7 @@ class ConcurrentInquiry: for i in sel_con: sel_con_list.append(i(tlist,col_list,tag_list)) #获取对应的条件函数 sql+=' '.join(sel_con_list) # condition - print(sql) + #print(sql) return sql def gen_query_join(self): #生成join查询语句 @@ -236,8 +253,6 @@ class ConcurrentInquiry: else: join_section = ''.join(random.choices(col_intersection+tag_intersection)) sql += 'where t1._c0 = t2._c0 and ' + 't1.' + join_section + '=t2.' + join_section - - print(sql) return sql def random_pick(self): @@ -248,16 +263,48 @@ class ConcurrentInquiry: if x < cumulative_probability:break return item - + def gen_data(self): + stableNum = self.stableNum + subtableNum = self.subtableNum + insertRows = self.insertRows + t0 = self.ts + host = self.host + user = self.user + password = self.password + conn = taos.connect( + host, + user, + password, + ) + cl = conn.cursor() + cl.execute("drop database if exists %s;" %self.dbname) + cl.execute("create database if not exists %s;" %self.dbname) + cl.execute("use %s" % self.dbname) + for k in range(stableNum): + sql="create table %s (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool,c8 binary(20),c9 nchar(20)) \ + tags(t1 int, t2 float, t3 bigint, t4 smallint, t5 tinyint, t6 double, t7 bool,t8 binary(20),t9 nchar(20))" % (self.stb_prefix+str(k)) + cl.execute(sql) + for j in range(subtableNum): + sql = "create table %s using %s tags(%d,%d,%d,%d,%d,%d,%d,'%s','%s')" % \ + (self.subtb_prefix+str(k)+'_'+str(j),self.stb_prefix+str(k),j,j/2.0,j%41,j%51,j%53,j*1.0,j%2,'taos'+str(j),'涛思'+str(j)) + print(sql) + cl.execute(sql) + for i in range(insertRows): + ret = cl.execute( + "insert into %s values (%d , %d,%d,%d,%d,%d,%d,%d,'%s','%s')" % + (self.subtb_prefix+str(k)+'_'+str(j),t0+i,i%100,i/2.0,i%41,i%51,i%53,i*1.0,i%2,'taos'+str(i),'涛思'+str(i))) + cl.close() + conn.close() + def rest_query(self,sql): #rest 接口 - host = "127.0.0.1" - user = "root" - password = "taosdata" + host = self.host + user = self.user + password = self.password port =6041 url = "http://{}:{}/rest/sql".format(host, port ) try: r = requests.post(url, - data = 'use test', + data = 'use %s' % self.dbname, auth = HTTPBasicAuth('root', 'taosdata')) r = requests.post(url, data = sql, @@ -287,20 +334,20 @@ class ConcurrentInquiry: def query_thread_n(self,threadID): #使用原生python接口查询 - host = "127.0.0.1" - user = "root" - password = "taosdata" + host = self.host + user = self.user + password = self.password conn = taos.connect( host, user, password, ) cl = conn.cursor() - cl.execute("use test;") + cl.execute("use %s;" % self.dbname) print("Thread %d: starting" % threadID) - - while True: + loop = self.loop + while loop: try: if self.random_pick(): @@ -314,33 +361,40 @@ class ConcurrentInquiry: end = time.time() print("time cost :",end-start) except Exception as e: + print('-'*40) print( - "Failure thread%d, sql: %s,exception: %s" % + "Failure thread%d, sql: %s \nexception: %s" % (threadID, str(sql),str(e))) #exit(-1) + loop -= 1 + if loop == 0: break - + cl.close() + conn.close() print("Thread %d: finishing" % threadID) def query_thread_r(self,threadID): #使用rest接口查询 print("Thread %d: starting" % threadID) - while True: - try: - if self.random_pick(): - sql=self.gen_query_sql() - else: - sql=self.gen_query_join() - print("sql is ",sql) - start = time.time() - self.rest_query(sql) - end = time.time() - print("time cost :",end-start) - except Exception as e: - print( - "Failure thread%d, sql: %s,exception: %s" % - (threadID, str(sql),str(e))) - #exit(-1) - + loop = self.loop + while loop: + try: + if self.random_pick(): + sql=self.gen_query_sql() + else: + sql=self.gen_query_join() + print("sql is ",sql) + start = time.time() + self.rest_query(sql) + end = time.time() + print("time cost :",end-start) + except Exception as e: + print('-'*40) + print( + "Failure thread%d, sql: %s \nexception: %s" % + (threadID, str(sql),str(e))) + #exit(-1) + loop -= 1 + if loop == 0: break print("Thread %d: finishing" % threadID) @@ -355,10 +409,124 @@ class ConcurrentInquiry: thread = threading.Thread(target=self.query_thread_r, args=(i,)) threads.append(thread) thread.start() -if len(sys.argv)>1: - q = ConcurrentInquiry(n_Therads=sys.argv[1],r_Therads=sys.argv[2]) -else: - q = ConcurrentInquiry() + +parser = argparse.ArgumentParser() +parser.add_argument( + '-H', + '--host-name', + action='store', + default='127.0.0.1', + type=str, + help='host name to be connected (default: 127.0.0.1)') +parser.add_argument( + '-S', + '--ts', + action='store', + default=1500000000000, + type=int, + help='insert data from timestamp (default: 1500000000000)') +parser.add_argument( + '-d', + '--db-name', + action='store', + default='test', + type=str, + help='Database name to be created (default: test)') +parser.add_argument( + '-t', + '--number-of-native-threads', + action='store', + default=10, + type=int, + help='Number of native threads (default: 10)') +parser.add_argument( + '-T', + '--number-of-rest-threads', + action='store', + default=10, + type=int, + help='Number of rest threads (default: 10)') +parser.add_argument( + '-r', + '--number-of-records', + action='store', + default=100, + type=int, + help='Number of record to be created for each table (default: 100)') +parser.add_argument( + '-c', + '--create-table', + action='store', + default='0', + type=int, + help='whether gen data (default: 0)') +parser.add_argument( + '-p', + '--subtb-name-prefix', + action='store', + default='t', + type=str, + help='subtable-name-prefix (default: t)') +parser.add_argument( + '-P', + '--stb-name-prefix', + action='store', + default='st', + type=str, + help='stable-name-prefix (default: st)') +parser.add_argument( + '-b', + '--probabilities', + action='store', + default='0.05', + type=float, + help='probabilities of join (default: 0.05)') +parser.add_argument( + '-l', + '--loop-per-thread', + action='store', + default='100', + type=int, + help='loop per thread (default: 100)') +parser.add_argument( + '-u', + '--user', + action='store', + default='root', + type=str, + help='user name') +parser.add_argument( + '-w', + '--password', + action='store', + default='root', + type=str, + help='user name') +parser.add_argument( + '-n', + '--number-of-tables', + action='store', + default=1000, + type=int, + help='Number of subtales per stable (default: 1000)') +parser.add_argument( + '-N', + '--number-of-stables', + action='store', + default=2, + type=int, + help='Number of stables (default: 2)') + +args = parser.parse_args() +q = ConcurrentInquiry( + args.ts,args.host_name,args.user,args.password,args.db_name, + args.stb_name_prefix,args.subtb_name_prefix,args.number_of_native_threads,args.number_of_rest_threads, + args.probabilities,args.loop_per_thread,args.number_of_stables,args.number_of_tables ,args.number_of_records ) + +if args.create_table: + q.gen_data() q.get_full() + #q.gen_query_sql() q.run() + diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index ed4fcf754fb00d5bae3f8927f091f744b5b33551..38e9e01870ea91bfab107ad8039cdb7bc34cf148 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -255,17 +255,24 @@ class TDDnode: tdLog.exit(cmd) self.running = 1 tdLog.debug("dnode:%d is running with %s " % (self.index, cmd)) - time.sleep(0.1) - key = 'from offline to online' - bkey = bytes(key,encoding="utf8") - logFile = self.logDir + "/taosdlog.0" - popen = subprocess.Popen('tail -f ' + logFile, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) - while True: - line = popen.stdout.readline().strip() - if bkey in line: - popen.kill() - break - tdLog.debug("the dnode:%d has been started." % (self.index)) + if self.valgrind == 0: + time.sleep(0.1) + key = 'from offline to online' + bkey = bytes(key,encoding="utf8") + logFile = self.logDir + "/taosdlog.0" + popen = subprocess.Popen('tail -f ' + logFile, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + pid = popen.pid + print('Popen.pid:' + str(pid)) + while True: + line = popen.stdout.readline().strip() + if bkey in line: + print(line) + popen.kill() + break + tdLog.debug("the dnode:%d has been started." % (self.index)) + else: + tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index)) + time.sleep(5) # time.sleep(5)