提交 8e2c3ed3 编写于 作者: L liuyq-617

fix some errors

上级 3bb2a769
...@@ -17,6 +17,7 @@ import json ...@@ -17,6 +17,7 @@ import json
import time import time
import random import random
import requests import requests
import argparse
from requests.auth import HTTPBasicAuth from requests.auth import HTTPBasicAuth
func_list=['avg','count','twa','sum','stddev','leastsquares','min', func_list=['avg','count','twa','sum','stddev','leastsquares','min',
'max','first','last','top','bottom','percentile','apercentile', 'max','first','last','top','bottom','percentile','apercentile',
...@@ -32,19 +33,33 @@ condition_list=[ ...@@ -32,19 +33,33 @@ condition_list=[
] ]
where_list = ['_c0>now-10d',' <50'," like \'%a%\'"] where_list = ['_c0>now-10d',' <50'," like \'%a%\'"]
class ConcurrentInquiry: class ConcurrentInquiry:
def __init__(self,n_Therads=25,r_Therads=25): # def __init__(self,ts=1500000001000,host='127.0.0.1',user='root',password='taosdata',dbname='test',
# stb_prefix='st',subtb_prefix='t',n_Therads=10,r_Therads=10,probabilities=0.05,loop=5,
# stableNum = 2,subtableNum = 1000,insertRows = 100):
def __init__(self,ts,host,user,password,dbname,
stb_prefix,subtb_prefix,n_Therads,r_Therads,probabilities,loop,
stableNum ,subtableNum ,insertRows ):
self.n_numOfTherads = n_Therads self.n_numOfTherads = n_Therads
self.r_numOfTherads = r_Therads self.r_numOfTherads = r_Therads
self.ts=1500000001000 self.ts=ts
self.dbname='test' self.host = host
self.user = user
self.password = password
self.dbname=dbname
self.stb_prefix = stb_prefix
self.subtb_prefix = subtb_prefix
self.stb_list=[] self.stb_list=[]
self.subtb_list=[] self.subtb_list=[]
self.stb_stru_list=[] self.stb_stru_list=[]
self.subtb_stru_list=[] self.subtb_stru_list=[]
self.stb_tag_list=[] self.stb_tag_list=[]
self.subtb_tag_list=[] self.subtb_tag_list=[]
self.probabilities = [0.05,0.95] self.probabilities = [probabilities,1-probabilities]
self.ifjoin = [0,1] self.ifjoin = [0,1]
self.loop = loop
self.stableNum = stableNum
self.subtableNum = subtableNum
self.insertRows = insertRows
def SetThreadsNum(self,num): def SetThreadsNum(self,num):
self.numOfTherads=num self.numOfTherads=num
...@@ -88,9 +103,9 @@ class ConcurrentInquiry: ...@@ -88,9 +103,9 @@ class ConcurrentInquiry:
self.subtb_tag_list.append(tag) self.subtb_tag_list.append(tag)
def get_full(self): #获取所有的表、表结构 def get_full(self): #获取所有的表、表结构
host = "127.0.0.1" host = self.host
user = "root" user = self.user
password = "taosdata" password = self.password
conn = taos.connect( conn = taos.connect(
host, host,
user, user,
...@@ -117,7 +132,7 @@ class ConcurrentInquiry: ...@@ -117,7 +132,7 @@ class ConcurrentInquiry:
return 'where '+random.choice([' and ',' or ']).join(l) return 'where '+random.choice([' and ',' or ']).join(l)
def con_interval(self,tlist,col_list,tag_list): def con_interval(self,tlist,col_list,tag_list):
interval = 'interval(' + str(random.randint(0,100)) + random.choice(['a','s','d','w','n','y']) + ')' interval = 'interval(' + str(random.randint(0,20)) + random.choice(['a','s','d','w','n','y']) + ')'
return interval return interval
def con_limit(self,tlist,col_list,tag_list): def con_limit(self,tlist,col_list,tag_list):
...@@ -133,7 +148,7 @@ class ConcurrentInquiry: ...@@ -133,7 +148,7 @@ class ConcurrentInquiry:
def con_group(self,tlist,col_list,tag_list): def con_group(self,tlist,col_list,tag_list):
rand_tag = random.randint(0,5) rand_tag = random.randint(0,5)
rand_col = random.randint(0,1) rand_col = random.randint(0,1)
return 'group by '+','.join(random.sample(col_list,rand_col))+','.join(random.sample(tag_list,rand_tag)) return 'group by '+','.join(random.sample(col_list,rand_col) + random.sample(tag_list,rand_tag))
def con_order(self,tlist,col_list,tag_list): def con_order(self,tlist,col_list,tag_list):
return 'order by '+random.choice(tlist) return 'order by '+random.choice(tlist)
...@@ -165,8 +180,10 @@ class ConcurrentInquiry: ...@@ -165,8 +180,10 @@ class ConcurrentInquiry:
random.shuffle(func_list) random.shuffle(func_list)
sel_col_list=[] sel_col_list=[]
col_rand=random.randint(0,len(col_list)) col_rand=random.randint(0,len(col_list))
loop = 0
for i,j in zip(col_list[0:col_rand],func_list): #决定每个被查询col的函数 for i,j in zip(col_list[0:col_rand],func_list): #决定每个被查询col的函数
alias = ' as '+ str(i) alias = ' as '+ 'taos%d ' % loop
loop += 1
pick_func = '' pick_func = ''
if j == 'leastsquares': if j == 'leastsquares':
pick_func=j+'('+i+',1,1)' pick_func=j+'('+i+',1,1)'
...@@ -185,7 +202,7 @@ class ConcurrentInquiry: ...@@ -185,7 +202,7 @@ class ConcurrentInquiry:
for i in sel_con: for i in sel_con:
sel_con_list.append(i(tlist,col_list,tag_list)) #获取对应的条件函数 sel_con_list.append(i(tlist,col_list,tag_list)) #获取对应的条件函数
sql+=' '.join(sel_con_list) # condition sql+=' '.join(sel_con_list) # condition
print(sql) #print(sql)
return sql return sql
def gen_query_join(self): #生成join查询语句 def gen_query_join(self): #生成join查询语句
...@@ -236,8 +253,6 @@ class ConcurrentInquiry: ...@@ -236,8 +253,6 @@ class ConcurrentInquiry:
else: else:
join_section = ''.join(random.choices(col_intersection+tag_intersection)) join_section = ''.join(random.choices(col_intersection+tag_intersection))
sql += 'where t1._c0 = t2._c0 and ' + 't1.' + join_section + '=t2.' + join_section sql += 'where t1._c0 = t2._c0 and ' + 't1.' + join_section + '=t2.' + join_section
print(sql)
return sql return sql
def random_pick(self): def random_pick(self):
...@@ -248,16 +263,48 @@ class ConcurrentInquiry: ...@@ -248,16 +263,48 @@ class ConcurrentInquiry:
if x < cumulative_probability:break if x < cumulative_probability:break
return item return item
def gen_data(self):
stableNum = self.stableNum
subtableNum = self.subtableNum
insertRows = self.insertRows
t0 = self.ts
host = self.host
user = self.user
password = self.password
conn = taos.connect(
host,
user,
password,
)
cl = conn.cursor()
cl.execute("drop database if exists %s;" %self.dbname)
cl.execute("create database if not exists %s;" %self.dbname)
cl.execute("use %s" % self.dbname)
for k in range(stableNum):
sql="create table %s (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool,c8 binary(20),c9 nchar(20)) \
tags(t1 int, t2 float, t3 bigint, t4 smallint, t5 tinyint, t6 double, t7 bool,t8 binary(20),t9 nchar(20))" % (self.stb_prefix+str(k))
cl.execute(sql)
for j in range(subtableNum):
sql = "create table %s using %s tags(%d,%d,%d,%d,%d,%d,%d,'%s','%s')" % \
(self.subtb_prefix+str(k)+'_'+str(j),self.stb_prefix+str(k),j,j/2.0,j%41,j%51,j%53,j*1.0,j%2,'taos'+str(j),'涛思'+str(j))
print(sql)
cl.execute(sql)
for i in range(insertRows):
ret = cl.execute(
"insert into %s values (%d , %d,%d,%d,%d,%d,%d,%d,'%s','%s')" %
(self.subtb_prefix+str(k)+'_'+str(j),t0+i,i%100,i/2.0,i%41,i%51,i%53,i*1.0,i%2,'taos'+str(i),'涛思'+str(i)))
cl.close()
conn.close()
def rest_query(self,sql): #rest 接口 def rest_query(self,sql): #rest 接口
host = "127.0.0.1" host = self.host
user = "root" user = self.user
password = "taosdata" password = self.password
port =6041 port =6041
url = "http://{}:{}/rest/sql".format(host, port ) url = "http://{}:{}/rest/sql".format(host, port )
try: try:
r = requests.post(url, r = requests.post(url,
data = 'use test', data = 'use %s' % self.dbname,
auth = HTTPBasicAuth('root', 'taosdata')) auth = HTTPBasicAuth('root', 'taosdata'))
r = requests.post(url, r = requests.post(url,
data = sql, data = sql,
...@@ -287,20 +334,20 @@ class ConcurrentInquiry: ...@@ -287,20 +334,20 @@ class ConcurrentInquiry:
def query_thread_n(self,threadID): #使用原生python接口查询 def query_thread_n(self,threadID): #使用原生python接口查询
host = "127.0.0.1" host = self.host
user = "root" user = self.user
password = "taosdata" password = self.password
conn = taos.connect( conn = taos.connect(
host, host,
user, user,
password, password,
) )
cl = conn.cursor() cl = conn.cursor()
cl.execute("use test;") cl.execute("use %s;" % self.dbname)
print("Thread %d: starting" % threadID) print("Thread %d: starting" % threadID)
loop = self.loop
while True: while loop:
try: try:
if self.random_pick(): if self.random_pick():
...@@ -314,33 +361,40 @@ class ConcurrentInquiry: ...@@ -314,33 +361,40 @@ class ConcurrentInquiry:
end = time.time() end = time.time()
print("time cost :",end-start) print("time cost :",end-start)
except Exception as e: except Exception as e:
print('-'*40)
print( print(
"Failure thread%d, sql: %s,exception: %s" % "Failure thread%d, sql: %s \nexception: %s" %
(threadID, str(sql),str(e))) (threadID, str(sql),str(e)))
#exit(-1) #exit(-1)
loop -= 1
if loop == 0: break
cl.close()
conn.close()
print("Thread %d: finishing" % threadID) print("Thread %d: finishing" % threadID)
def query_thread_r(self,threadID): #使用rest接口查询 def query_thread_r(self,threadID): #使用rest接口查询
print("Thread %d: starting" % threadID) print("Thread %d: starting" % threadID)
while True: loop = self.loop
try: while loop:
if self.random_pick(): try:
sql=self.gen_query_sql() if self.random_pick():
else: sql=self.gen_query_sql()
sql=self.gen_query_join() else:
print("sql is ",sql) sql=self.gen_query_join()
start = time.time() print("sql is ",sql)
self.rest_query(sql) start = time.time()
end = time.time() self.rest_query(sql)
print("time cost :",end-start) end = time.time()
except Exception as e: print("time cost :",end-start)
print( except Exception as e:
"Failure thread%d, sql: %s,exception: %s" % print('-'*40)
(threadID, str(sql),str(e))) print(
#exit(-1) "Failure thread%d, sql: %s \nexception: %s" %
(threadID, str(sql),str(e)))
#exit(-1)
loop -= 1
if loop == 0: break
print("Thread %d: finishing" % threadID) print("Thread %d: finishing" % threadID)
...@@ -355,10 +409,124 @@ class ConcurrentInquiry: ...@@ -355,10 +409,124 @@ class ConcurrentInquiry:
thread = threading.Thread(target=self.query_thread_r, args=(i,)) thread = threading.Thread(target=self.query_thread_r, args=(i,))
threads.append(thread) threads.append(thread)
thread.start() thread.start()
if len(sys.argv)>1:
q = ConcurrentInquiry(n_Therads=sys.argv[1],r_Therads=sys.argv[2]) parser = argparse.ArgumentParser()
else: parser.add_argument(
q = ConcurrentInquiry() '-H',
'--host-name',
action='store',
default='127.0.0.1',
type=str,
help='host name to be connected (default: 127.0.0.1)')
parser.add_argument(
'-S',
'--ts',
action='store',
default=1500000000000,
type=int,
help='insert data from timestamp (default: 1500000000000)')
parser.add_argument(
'-d',
'--db-name',
action='store',
default='test',
type=str,
help='Database name to be created (default: test)')
parser.add_argument(
'-t',
'--number-of-native-threads',
action='store',
default=10,
type=int,
help='Number of native threads (default: 10)')
parser.add_argument(
'-T',
'--number-of-rest-threads',
action='store',
default=10,
type=int,
help='Number of rest threads (default: 10)')
parser.add_argument(
'-r',
'--number-of-records',
action='store',
default=100,
type=int,
help='Number of record to be created for each table (default: 100)')
parser.add_argument(
'-c',
'--create-table',
action='store',
default='0',
type=int,
help='whether gen data (default: 0)')
parser.add_argument(
'-p',
'--subtb-name-prefix',
action='store',
default='t',
type=str,
help='subtable-name-prefix (default: t)')
parser.add_argument(
'-P',
'--stb-name-prefix',
action='store',
default='st',
type=str,
help='stable-name-prefix (default: st)')
parser.add_argument(
'-b',
'--probabilities',
action='store',
default='0.05',
type=float,
help='probabilities of join (default: 0.05)')
parser.add_argument(
'-l',
'--loop-per-thread',
action='store',
default='100',
type=int,
help='loop per thread (default: 100)')
parser.add_argument(
'-u',
'--user',
action='store',
default='root',
type=str,
help='user name')
parser.add_argument(
'-w',
'--password',
action='store',
default='root',
type=str,
help='user name')
parser.add_argument(
'-n',
'--number-of-tables',
action='store',
default=1000,
type=int,
help='Number of subtales per stable (default: 1000)')
parser.add_argument(
'-N',
'--number-of-stables',
action='store',
default=2,
type=int,
help='Number of stables (default: 2)')
args = parser.parse_args()
q = ConcurrentInquiry(
args.ts,args.host_name,args.user,args.password,args.db_name,
args.stb_name_prefix,args.subtb_name_prefix,args.number_of_native_threads,args.number_of_rest_threads,
args.probabilities,args.loop_per_thread,args.number_of_stables,args.number_of_tables ,args.number_of_records )
if args.create_table:
q.gen_data()
q.get_full() q.get_full()
#q.gen_query_sql() #q.gen_query_sql()
q.run() q.run()
...@@ -255,17 +255,24 @@ class TDDnode: ...@@ -255,17 +255,24 @@ class TDDnode:
tdLog.exit(cmd) tdLog.exit(cmd)
self.running = 1 self.running = 1
tdLog.debug("dnode:%d is running with %s " % (self.index, cmd)) tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))
time.sleep(0.1) if self.valgrind == 0:
key = 'from offline to online' time.sleep(0.1)
bkey = bytes(key,encoding="utf8") key = 'from offline to online'
logFile = self.logDir + "/taosdlog.0" bkey = bytes(key,encoding="utf8")
popen = subprocess.Popen('tail -f ' + logFile, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) logFile = self.logDir + "/taosdlog.0"
while True: popen = subprocess.Popen('tail -f ' + logFile, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
line = popen.stdout.readline().strip() pid = popen.pid
if bkey in line: print('Popen.pid:' + str(pid))
popen.kill() while True:
break line = popen.stdout.readline().strip()
tdLog.debug("the dnode:%d has been started." % (self.index)) if bkey in line:
print(line)
popen.kill()
break
tdLog.debug("the dnode:%d has been started." % (self.index))
else:
tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index))
time.sleep(5)
# time.sleep(5) # time.sleep(5)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册