concurrent_inquiry.py 10.1 KB
Newer Older
L
liuyq-617 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
import threading
import taos
L
liuyq-617 已提交
15
import sys
L
liuyq-617 已提交
16 17 18
import json
import time
import random
19 20 21 22 23 24 25 26 27 28 29 30 31
import requests
from requests.auth import HTTPBasicAuth
func_list=['avg','count','twa','sum','stddev','leastsquares','min',
'max','first','last','top','bottom','percentile','apercentile',
'last_row','diff','spread']
condition_list=[
    "where _c0 > now -10d ",
    'interval(10s)',
    'limit 10',
    'group by',
    'order by',
    'fill(null)'
    
L
liuyq-617 已提交
32
]
33
where_list = ['_c0>now-10d',' <50'," like \'%a%\'"]
L
liuyq-617 已提交
34
class ConcurrentInquiry:
35 36 37
    def __init__(self,n_Therads=25,r_Therads=25):  
        self.n_numOfTherads = n_Therads
        self.r_numOfTherads = r_Therads
L
liuyq-617 已提交
38
        self.ts=1500000001000
39 40 41 42 43 44 45
        self.dbname='test'
        self.stb_list=[]
        self.subtb_list=[]
        self.stb_stru_list=[]
        self.subtb_stru_list=[]
        self.stb_tag_list=[]
        self.subtb_tag_list=[]
46

L
liuyq-617 已提交
47 48
    def SetThreadsNum(self,num):
        self.numOfTherads=num
49

50 51 52 53 54 55
    def ret_fcol(self,cl,sql):                     #返回结果的第一列
        cl.execute(sql)
        fcol_list=[]
        for data in cl:
            fcol_list.append(data[0])
        return fcol_list
56 57

    def r_stb_list(self,cl):                    #返回超级表列表
58 59
        sql='show '+self.dbname+'.stables'
        self.stb_list=self.ret_fcol(cl,sql)
60 61

    def r_subtb_list(self,cl,stablename):       #每个超级表返回2个子表
62 63
        sql='select tbname from '+self.dbname+'.'+stablename+' limit 2;'
        self.subtb_list+=self.ret_fcol(cl,sql)
64 65

    def cal_struct(self,cl,tbname):             #查看表结构
66 67 68 69 70 71 72 73 74 75
        tb=[]
        tag=[]
        sql='describe '+self.dbname+'.'+tbname+';'
        cl.execute(sql)
        for data in cl:
            if data[3]:
                tag.append(data[0])
            else:
                tb.append(data[0])
        return tb,tag
76 77

    def r_stb_stru(self,cl):                    #获取所有超级表的表结构
78 79 80 81
        for i in self.stb_list:
            tb,tag=self.cal_struct(cl,i)
            self.stb_stru_list.append(tb)
            self.stb_tag_list.append(tag)
82 83

    def r_subtb_stru(self,cl):                  #返回所有子表的表结构
84 85 86 87
        for i in self.subtb_list:
            tb,tag=self.cal_struct(cl,i)
            self.subtb_stru_list.append(tb)
            self.subtb_tag_list.append(tag)
88 89

    def get_full(self):                         #获取所有的表、表结构
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
        host = "127.0.0.1"
        user = "root"
        password = "taosdata"
        conn = taos.connect(
            host,
            user,
            password,
            )
        cl = conn.cursor()
        self.r_stb_list(cl)
        for i in self.stb_list:
            self.r_subtb_list(cl,i)
        self.r_stb_stru(cl)
        self.r_subtb_stru(cl)
        cl.close()
105 106 107 108
        conn.close()  
        
    #query condition
    def con_where(self,tlist):                               
109 110 111 112 113 114 115 116
        l=[]
        for i in range(random.randint(0,len(tlist))):
            c = random.choice(where_list)
            if c == '_c0>now-10d':
                l.append(c)
            else:
                l.append(random.choice(tlist)+c)
        return 'where '+random.choice([' and ',' or ']).join(l)
117 118

    def con_interval(self,tlist):               
119
        return random.choice(['interval(10s)','interval(10d)','interval(1n)'])
120

121 122
    def con_limit(self,tlist):
        return random.choice(['limit 10','limit 10 offset 10','slimit 10','slimit 10 offset 10','limit 10 slimit 10','limit 10 offset 5 slimit 5 soffset 10'])
123
    
124 125
    def con_fill(self,tlist):
        return random.choice(['fill(null)','fill(prev)','fill(none)','fill(LINEAR)'])
126
    
127 128
    def con_group(self,tlist):
        return 'group by '+random.choice(tlist)
129
    
130 131
    def con_order(self,tlist):
        return 'order by '+random.choice(tlist)
132 133 134
    
    def gen_query_sql(self):                        #生成查询语句
        tbi=random.randint(0,len(self.subtb_list)+len(self.stb_list))  #随机决定查询哪张表
135 136 137 138 139 140 141 142 143 144 145 146 147 148
        tbname=''
        col_list=[]
        tag_list=[]
        is_stb=0
        if tbi>len(self.stb_list) :
            tbi=tbi-len(self.stb_list)
            tbname=self.subtb_list[tbi-1]
            col_list=self.subtb_stru_list[tbi-1]
            tag_list=self.subtb_tag_list[tbi-1]
        else:
            tbname=self.stb_list[tbi-1]
            col_list=self.stb_stru_list[tbi-1]
            tag_list=self.stb_tag_list[tbi-1]
            is_stb=1
149
        tlist=col_list+tag_list+['abc']            #增加不存在的域'abc',是否会引起新bug
150 151 152 153 154 155 156 157 158 159
        con_rand=random.randint(0,len(condition_list))
        func_rand=random.randint(0,len(func_list))
        col_rand=random.randint(0,len(col_list))
        tag_rand=random.randint(0,len(tag_list))
        t_rand=random.randint(0,len(tlist))
        sql='select '                                           #select 
        random.shuffle(col_list)
        random.shuffle(func_list)
        sel_col_list=[]
        col_rand=random.randint(0,len(col_list))
160
        for i,j in zip(col_list[0:col_rand],func_list):         #决定每个被查询col的函数
161 162 163 164 165 166 167 168 169 170 171
            if j == 'leastsquares':
                sel_col_list.append(j+'('+i+',1,1)')
            elif j == 'top' or j == 'bottom' or j == 'percentile' or j == 'apercentile':
                sel_col_list.append(j+'('+i+',1)')
            else:
                sel_col_list.append(j+'('+i+')')
        sql=sql+','.join(sel_col_list)+' from '+random.choice(self.stb_list+self.subtb_list)+' '                        #select col & func
        con_func=[self.con_where,self.con_interval,self.con_limit,self.con_group,self.con_order,self.con_fill]
        sel_con=random.sample(con_func,random.randint(0,len(con_func)))
        sel_con_list=[]
        for i in sel_con:
172
            sel_con_list.append(i(tlist))                                  #获取对应的条件函数
173 174 175
        sql+=' '.join(sel_con_list)                                       # condition
        print(sql)
        return sql
176 177

    def rest_query(self,sql):                                       #rest 接口
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
        host = "127.0.0.1"
        user = "root"
        password = "taosdata"
        port =6041
        url = "http://{}:{}/rest/sql".format(host, port )
        try:
            r = requests.post(url, 
                data = 'use test',
                auth = HTTPBasicAuth('root', 'taosdata'))  
            r = requests.post(url, 
                data = sql,
                auth = HTTPBasicAuth('root', 'taosdata'))         
        except:
            print("REST API Failure (TODO: more info here)")
            raise
        rj = r.json()
        if ('status' not in rj):
            raise RuntimeError("No status in REST response")

        if rj['status'] == 'error':  # clearly reported error
            if ('code' not in rj):  # error without code
                raise RuntimeError("REST error return without code")
            errno = rj['code']  # May need to massage this in the future
            # print("Raising programming error with REST return: {}".format(rj))
            raise taos.error.ProgrammingError(
                rj['desc'], errno)  # todo: check existance of 'desc'

        if rj['status'] != 'succ':  # better be this
            raise RuntimeError(
                "Unexpected REST return status: {}".format(
                    rj['status']))

        nRows = rj['rows'] if ('rows' in rj) else 0
        return nRows
212 213

    def query_thread_n(self,threadID):                      #使用原生python接口查询
214
        host = "127.0.0.1"
L
liuyq-617 已提交
215 216 217 218 219 220 221 222
        user = "root"
        password = "taosdata"
        conn = taos.connect(
            host,
            user,
            password,
            )
        cl = conn.cursor()
L
liuyq-617 已提交
223
        cl.execute("use test;")
L
liuyq-617 已提交
224 225 226 227
        
        print("Thread %d: starting" % threadID)
        
        while True:
228
            
L
liuyq-617 已提交
229
                try:
230 231
                    sql=self.gen_query_sql()
                    print("sql is ",sql)
L
liuyq-617 已提交
232
                    start = time.time()
233 234
                    cl.execute(sql)
                    cl.fetchall()
L
liuyq-617 已提交
235 236
                    end = time.time()
                    print("time cost :",end-start)
L
liuyq-617 已提交
237 238 239
                except Exception as e:
                    print(
                "Failure thread%d, sql: %s,exception: %s" %
240 241
                (threadID, str(sql),str(e)))
                    #exit(-1)
L
liuyq-617 已提交
242 243
                    
                
244
        print("Thread %d: finishing" % threadID)
L
liuyq-617 已提交
245
          
246
    def query_thread_r(self,threadID):                      #使用rest接口查询
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
        print("Thread %d: starting" % threadID)
        while True:
                try:
                    sql=self.gen_query_sql()
                    print("sql is ",sql)
                    start = time.time()
                    self.rest_query(sql)
                    end = time.time()
                    print("time cost :",end-start)
                except Exception as e:
                    print(
                "Failure thread%d, sql: %s,exception: %s" %
                (threadID, str(sql),str(e)))
                    #exit(-1)
                    
                
        print("Thread %d: finishing" % threadID)    
L
liuyq-617 已提交
264 265

    def run(self):
266
        print(self.n_numOfTherads,self.r_numOfTherads)  
L
liuyq-617 已提交
267
        threads = []
268 269
        for i in range(self.n_numOfTherads):
            thread = threading.Thread(target=self.query_thread_n, args=(i,))
L
liuyq-617 已提交
270 271
            threads.append(thread)
            thread.start()  
272 273 274 275 276 277 278 279 280 281 282
        for i in range(self.r_numOfTherads):
        # for i in range(1):
            thread = threading.Thread(target=self.query_thread_r, args=(i,))
            threads.append(thread)
            thread.start()
if len(sys.argv)>1:
    q = ConcurrentInquiry(n_Therads=sys.argv[1],r_Therads=sys.argv[2])
else:
    q = ConcurrentInquiry()
q.get_full()
#q.gen_query_sql()
L
liuyq-617 已提交
283
q.run()