concurrent_inquiry.py 20.5 KB
Newer Older
L
liuyq-617 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
import threading
import taos
L
liuyq-617 已提交
15
import sys
L
liuyq-617 已提交
16 17 18
import json
import time
import random
19
import requests
L
liuyq-617 已提交
20
import argparse
21 22 23 24 25 26 27 28 29 30 31 32
from requests.auth import HTTPBasicAuth
func_list=['avg','count','twa','sum','stddev','leastsquares','min',
'max','first','last','top','bottom','percentile','apercentile',
'last_row','diff','spread']
condition_list=[
    "where _c0 > now -10d ",
    'interval(10s)',
    'limit 10',
    'group by',
    'order by',
    'fill(null)'
    
L
liuyq-617 已提交
33
]
34
where_list = ['_c0>now-10d',' <50'," like \'%a%\'"]
L
liuyq-617 已提交
35
class ConcurrentInquiry:
L
liuyq-617 已提交
36 37 38 39 40
    # def __init__(self,ts=1500000001000,host='127.0.0.1',user='root',password='taosdata',dbname='test',
    #             stb_prefix='st',subtb_prefix='t',n_Therads=10,r_Therads=10,probabilities=0.05,loop=5,
    #             stableNum = 2,subtableNum = 1000,insertRows = 100):  
    def __init__(self,ts,host,user,password,dbname,
                stb_prefix,subtb_prefix,n_Therads,r_Therads,probabilities,loop,
L
liuyq-617 已提交
41
                stableNum ,subtableNum ,insertRows ,mix_table):  
42 43
        self.n_numOfTherads = n_Therads
        self.r_numOfTherads = r_Therads
L
liuyq-617 已提交
44 45 46 47 48 49 50
        self.ts=ts
        self.host = host
        self.user = user
        self.password = password
        self.dbname=dbname
        self.stb_prefix = stb_prefix
        self.subtb_prefix = subtb_prefix
51 52 53 54 55 56
        self.stb_list=[]
        self.subtb_list=[]
        self.stb_stru_list=[]
        self.subtb_stru_list=[]
        self.stb_tag_list=[]
        self.subtb_tag_list=[]
L
liuyq-617 已提交
57
        self.probabilities = [probabilities,1-probabilities]
58
        self.ifjoin = [0,1]
L
liuyq-617 已提交
59 60 61 62
        self.loop = loop
        self.stableNum = stableNum
        self.subtableNum = subtableNum
        self.insertRows = insertRows
L
liuyq-617 已提交
63
        self.mix_table = mix_table
L
liuyq-617 已提交
64 65
    def SetThreadsNum(self,num):
        self.numOfTherads=num
66

67 68 69 70 71 72
    def ret_fcol(self,cl,sql):                     #返回结果的第一列
        cl.execute(sql)
        fcol_list=[]
        for data in cl:
            fcol_list.append(data[0])
        return fcol_list
73 74

    def r_stb_list(self,cl):                    #返回超级表列表
75 76
        sql='show '+self.dbname+'.stables'
        self.stb_list=self.ret_fcol(cl,sql)
77 78

    def r_subtb_list(self,cl,stablename):       #每个超级表返回2个子表
79 80
        sql='select tbname from '+self.dbname+'.'+stablename+' limit 2;'
        self.subtb_list+=self.ret_fcol(cl,sql)
81 82

    def cal_struct(self,cl,tbname):             #查看表结构
83 84 85 86 87 88 89 90 91 92
        tb=[]
        tag=[]
        sql='describe '+self.dbname+'.'+tbname+';'
        cl.execute(sql)
        for data in cl:
            if data[3]:
                tag.append(data[0])
            else:
                tb.append(data[0])
        return tb,tag
93 94

    def r_stb_stru(self,cl):                    #获取所有超级表的表结构
95 96 97 98
        for i in self.stb_list:
            tb,tag=self.cal_struct(cl,i)
            self.stb_stru_list.append(tb)
            self.stb_tag_list.append(tag)
99 100

    def r_subtb_stru(self,cl):                  #返回所有子表的表结构
101 102 103 104
        for i in self.subtb_list:
            tb,tag=self.cal_struct(cl,i)
            self.subtb_stru_list.append(tb)
            self.subtb_tag_list.append(tag)
105 106

    def get_full(self):                         #获取所有的表、表结构
L
liuyq-617 已提交
107 108 109
        host = self.host
        user = self.user
        password = self.password
110 111 112 113 114 115 116 117 118 119 120 121
        conn = taos.connect(
            host,
            user,
            password,
            )
        cl = conn.cursor()
        self.r_stb_list(cl)
        for i in self.stb_list:
            self.r_subtb_list(cl,i)
        self.r_stb_stru(cl)
        self.r_subtb_stru(cl)
        cl.close()
122 123 124
        conn.close()  
        
    #query condition
125
    def con_where(self,tlist,col_list,tag_list):                               
126 127 128 129 130 131 132 133
        l=[]
        for i in range(random.randint(0,len(tlist))):
            c = random.choice(where_list)
            if c == '_c0>now-10d':
                l.append(c)
            else:
                l.append(random.choice(tlist)+c)
        return 'where '+random.choice([' and ',' or ']).join(l)
134

135
    def con_interval(self,tlist,col_list,tag_list): 
L
liuyq-617 已提交
136
        interval = 'interval(' + str(random.randint(0,20)) + random.choice(['a','s','d','w','n','y'])  + ')'          
137
        return interval
138

139 140 141
    def con_limit(self,tlist,col_list,tag_list):
        rand1 = str(random.randint(0,1000))
        rand2 = str(random.randint(0,1000))
142 143 144
        return random.choice(['limit ' + rand1,'limit ' + rand1 + ' offset '+rand2,
        ' slimit ' + rand1,' slimit ' + rand1 + ' offset ' + rand2,'limit '+rand1 + ' slimit '+ rand2,
        'limit '+ rand1 + ' offset' + rand2 + ' slimit '+ rand1 + ' soffset ' + rand2 ])
145
    
146
    def con_fill(self,tlist,col_list,tag_list):
147
        return random.choice(['fill(null)','fill(prev)','fill(none)','fill(LINEAR)'])
148
    
149 150 151
    def con_group(self,tlist,col_list,tag_list):
        rand_tag = random.randint(0,5)
        rand_col = random.randint(0,1)
L
liuyq-617 已提交
152
        return 'group by '+','.join(random.sample(col_list,rand_col) + random.sample(tag_list,rand_tag))
153
    
154
    def con_order(self,tlist,col_list,tag_list):
155
        return 'order by '+random.choice(tlist)
156 157 158
    
    def gen_query_sql(self):                        #生成查询语句
        tbi=random.randint(0,len(self.subtb_list)+len(self.stb_list))  #随机决定查询哪张表
159 160 161 162 163 164 165 166 167 168 169 170 171 172
        tbname=''
        col_list=[]
        tag_list=[]
        is_stb=0
        if tbi>len(self.stb_list) :
            tbi=tbi-len(self.stb_list)
            tbname=self.subtb_list[tbi-1]
            col_list=self.subtb_stru_list[tbi-1]
            tag_list=self.subtb_tag_list[tbi-1]
        else:
            tbname=self.stb_list[tbi-1]
            col_list=self.stb_stru_list[tbi-1]
            tag_list=self.stb_tag_list[tbi-1]
            is_stb=1
173
        tlist=col_list+tag_list+['abc']            #增加不存在的域'abc',是否会引起新bug
174 175 176 177 178 179 180 181 182 183
        con_rand=random.randint(0,len(condition_list))
        func_rand=random.randint(0,len(func_list))
        col_rand=random.randint(0,len(col_list))
        tag_rand=random.randint(0,len(tag_list))
        t_rand=random.randint(0,len(tlist))
        sql='select '                                           #select 
        random.shuffle(col_list)
        random.shuffle(func_list)
        sel_col_list=[]
        col_rand=random.randint(0,len(col_list))
L
liuyq-617 已提交
184
        loop = 0
185
        for i,j in zip(col_list[0:col_rand],func_list):         #决定每个被查询col的函数
L
liuyq-617 已提交
186 187
            alias = ' as '+ 'taos%d ' % loop
            loop += 1
188
            pick_func = ''
189
            if j == 'leastsquares':
190
                pick_func=j+'('+i+',1,1)'
191
            elif j == 'top' or j == 'bottom' or j == 'percentile' or j == 'apercentile':
192
                pick_func=j+'('+i+',1)'
193
            else:
194 195 196 197 198
                pick_func=j+'('+i+')'
            if bool(random.getrandbits(1)):
                pick_func+=alias
            sel_col_list.append(pick_func)
            
L
liuyq-617 已提交
199 200 201 202 203 204 205
        sql=sql+','.join(sel_col_list)         #select col & func
        if self.mix_table == 0:
            sql = sql + ' from '+random.choice(self.stb_list+self.subtb_list)+' '         
        elif self.mix_table == 1:
            sql = sql + ' from '+random.choice(self.subtb_list)+' '
        else:
            sql = sql + ' from '+random.choice(self.stb_list)+' ' 
206 207 208 209
        con_func=[self.con_where,self.con_interval,self.con_limit,self.con_group,self.con_order,self.con_fill]
        sel_con=random.sample(con_func,random.randint(0,len(con_func)))
        sel_con_list=[]
        for i in sel_con:
210
            sel_con_list.append(i(tlist,col_list,tag_list))                                  #获取对应的条件函数
211
        sql+=' '.join(sel_con_list)                                       # condition
L
liuyq-617 已提交
212
        #print(sql)
213
        return sql
214

215
    def gen_query_join(self):                        #生成join查询语句
L
liuyq-617 已提交
216 217 218 219 220
        tbname   = []
        col_list = []
        tag_list = []
        col_intersection = []
        tag_intersection = []
221
        subtable = None
L
liuyq-617 已提交
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
        if self.mix_table == 0:
            if bool(random.getrandbits(1)):
                subtable = True
                tbname = random.sample(self.subtb_list,2)
                for i in tbname:
                    col_list.append(self.subtb_stru_list[self.subtb_list.index(i)])
                    tag_list.append(self.subtb_stru_list[self.subtb_list.index(i)])
                col_intersection = list(set(col_list[0]).intersection(set(col_list[1])))
                tag_intersection = list(set(tag_list[0]).intersection(set(tag_list[1])))
            else:
                tbname = random.sample(self.stb_list,2)
                for i in tbname:
                    col_list.append(self.stb_stru_list[self.stb_list.index(i)])
                    tag_list.append(self.stb_stru_list[self.stb_list.index(i)])
                col_intersection = list(set(col_list[0]).intersection(set(col_list[1])))
                tag_intersection = list(set(tag_list[0]).intersection(set(tag_list[1])))
        elif self.mix_table == 1:
239
            subtable = True
L
liuyq-617 已提交
240 241 242 243 244 245
            tbname = random.sample(self.subtb_list,2)
            for i in tbname:
                col_list.append(self.subtb_stru_list[self.subtb_list.index(i)])
                tag_list.append(self.subtb_stru_list[self.subtb_list.index(i)])
            col_intersection = list(set(col_list[0]).intersection(set(col_list[1])))
            tag_intersection = list(set(tag_list[0]).intersection(set(tag_list[1])))
246
        else:
L
liuyq-617 已提交
247 248 249 250 251 252
            tbname = random.sample(self.stb_list,2)
            for i in tbname:
                col_list.append(self.stb_stru_list[self.stb_list.index(i)])
                tag_list.append(self.stb_stru_list[self.stb_list.index(i)])
            col_intersection = list(set(col_list[0]).intersection(set(col_list[1])))
            tag_intersection = list(set(tag_list[0]).intersection(set(tag_list[1])))
253 254 255 256
        con_rand=random.randint(0,len(condition_list))
        col_rand=random.randint(0,len(col_list))
        tag_rand=random.randint(0,len(tag_list))
        sql='select '                                           #select 
L
liuyq-617 已提交
257 258
        
        sel_col_tag=[]
259
        col_rand=random.randint(0,len(col_list))
L
liuyq-617 已提交
260 261 262 263 264 265 266
        if bool(random.getrandbits(1)):
            sql += '*'
        else:
            sel_col_tag.append('t1.' + str(random.choice(col_list[0] + tag_list[0])))
            sel_col_tag.append('t2.' + str(random.choice(col_list[1] + tag_list[1])))
            sql += ','.join(sel_col_tag)

267 268
        sql = sql + ' from '+ str(tbname[0]) +' t1,' + str(tbname[1]) + ' t2 '                        #select col & func
        join_section = None
L
liuyq-617 已提交
269
        temp = None
270
        if subtable:
L
liuyq-617 已提交
271 272 273
            temp = random.choices(col_intersection)
            join_section = temp.pop()
            sql += 'where t1._c0 = t2._c0 and ' + 't1.' + str(join_section) + '=t2.' + str(join_section)
274
        else:
L
liuyq-617 已提交
275 276 277 278
            temp = random.choices(col_intersection+tag_intersection)
            join_section = temp.pop()
            print(random.choices(col_intersection))
            sql += 'where t1._c0 = t2._c0 and ' + 't1.' + str(join_section) + '=t2.' + str(join_section)
279 280 281 282 283 284 285 286 287 288
        return sql

    def random_pick(self): 
        x = random.uniform(0,1) 
        cumulative_probability = 0.0 
        for item, item_probability in zip(self.ifjoin, self.probabilities): 
            cumulative_probability += item_probability 
            if x < cumulative_probability:break 
        return item
        
L
liuyq-617 已提交
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
    def gen_data(self):
        stableNum = self.stableNum
        subtableNum = self.subtableNum
        insertRows = self.insertRows
        t0 = self.ts
        host = self.host
        user = self.user
        password = self.password
        conn = taos.connect(
            host,
            user,
            password,
            )
        cl = conn.cursor()
        cl.execute("drop database if  exists %s;" %self.dbname)
        cl.execute("create database if not exists %s;" %self.dbname)
        cl.execute("use %s" % self.dbname)
        for k in range(stableNum):
            sql="create table %s (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool,c8 binary(20),c9 nchar(20)) \
            tags(t1 int, t2 float, t3 bigint, t4 smallint, t5 tinyint, t6 double, t7 bool,t8 binary(20),t9 nchar(20))" % (self.stb_prefix+str(k))
            cl.execute(sql)
            for j in range(subtableNum):
                sql = "create table %s using %s tags(%d,%d,%d,%d,%d,%d,%d,'%s','%s')" % \
                        (self.subtb_prefix+str(k)+'_'+str(j),self.stb_prefix+str(k),j,j/2.0,j%41,j%51,j%53,j*1.0,j%2,'taos'+str(j),'涛思'+str(j))
                print(sql)
                cl.execute(sql)
                for i in range(insertRows):
                    ret = cl.execute(
                        "insert into %s values (%d , %d,%d,%d,%d,%d,%d,%d,'%s','%s')" %
                        (self.subtb_prefix+str(k)+'_'+str(j),t0+i,i%100,i/2.0,i%41,i%51,i%53,i*1.0,i%2,'taos'+str(i),'涛思'+str(i)))
        cl.close()
        conn.close()
        
322
    def rest_query(self,sql):                                       #rest 接口
L
liuyq-617 已提交
323 324 325
        host = self.host
        user = self.user
        password = self.password
326 327 328 329
        port =6041
        url = "http://{}:{}/rest/sql".format(host, port )
        try:
            r = requests.post(url, 
L
liuyq-617 已提交
330
                data = 'use %s' % self.dbname,
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
                auth = HTTPBasicAuth('root', 'taosdata'))  
            r = requests.post(url, 
                data = sql,
                auth = HTTPBasicAuth('root', 'taosdata'))         
        except:
            print("REST API Failure (TODO: more info here)")
            raise
        rj = r.json()
        if ('status' not in rj):
            raise RuntimeError("No status in REST response")

        if rj['status'] == 'error':  # clearly reported error
            if ('code' not in rj):  # error without code
                raise RuntimeError("REST error return without code")
            errno = rj['code']  # May need to massage this in the future
            # print("Raising programming error with REST return: {}".format(rj))
            raise taos.error.ProgrammingError(
                rj['desc'], errno)  # todo: check existance of 'desc'

        if rj['status'] != 'succ':  # better be this
            raise RuntimeError(
                "Unexpected REST return status: {}".format(
                    rj['status']))

        nRows = rj['rows'] if ('rows' in rj) else 0
        return nRows
357

358
    
359
    def query_thread_n(self,threadID):                      #使用原生python接口查询
L
liuyq-617 已提交
360 361 362
        host = self.host
        user = self.user
        password = self.password
L
liuyq-617 已提交
363 364 365 366 367 368
        conn = taos.connect(
            host,
            user,
            password,
            )
        cl = conn.cursor()
L
liuyq-617 已提交
369
        cl.execute("use %s;" % self.dbname)
L
liuyq-617 已提交
370 371
        
        print("Thread %d: starting" % threadID)
L
liuyq-617 已提交
372 373
        loop = self.loop
        while loop:
374
            
L
liuyq-617 已提交
375
                try:
376 377 378 379
                    if self.random_pick():
                        sql=self.gen_query_sql()
                    else:
                        sql=self.gen_query_join()
380
                    print("sql is ",sql)
L
liuyq-617 已提交
381
                    start = time.time()
382 383
                    cl.execute(sql)
                    cl.fetchall()
L
liuyq-617 已提交
384 385
                    end = time.time()
                    print("time cost :",end-start)
L
liuyq-617 已提交
386
                except Exception as e:
L
liuyq-617 已提交
387
                    print('-'*40)
L
liuyq-617 已提交
388
                    print(
L
liuyq-617 已提交
389
                "Failure thread%d, sql: %s \nexception: %s" %
390
                (threadID, str(sql),str(e)))
L
liuyq-617 已提交
391 392 393
                    err_uec='Unable to establish connection'
                    if err_uec in str(e) and loop >0:
                        exit(-1)
L
liuyq-617 已提交
394 395
                loop -= 1
                if loop == 0: break
L
liuyq-617 已提交
396
                    
L
liuyq-617 已提交
397 398
        cl.close()
        conn.close()       
399
        print("Thread %d: finishing" % threadID)
L
liuyq-617 已提交
400
          
401
    def query_thread_r(self,threadID):                      #使用rest接口查询
402
        print("Thread %d: starting" % threadID)
L
liuyq-617 已提交
403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
        loop = self.loop
        while loop:
            try:
                if self.random_pick():
                    sql=self.gen_query_sql()
                else:
                    sql=self.gen_query_join()
                print("sql is ",sql)
                start = time.time()
                self.rest_query(sql)
                end = time.time()
                print("time cost :",end-start)
            except Exception as e:
                print('-'*40)
                print(
            "Failure thread%d, sql: %s \nexception: %s" %
            (threadID, str(sql),str(e)))
L
liuyq-617 已提交
420 421 422
                err_uec='Unable to establish connection'
                if err_uec in str(e) and loop >0:
                    exit(-1)
L
liuyq-617 已提交
423 424
            loop -= 1    
            if loop == 0: break
425 426
                
        print("Thread %d: finishing" % threadID)    
L
liuyq-617 已提交
427 428

    def run(self):
429
        print(self.n_numOfTherads,self.r_numOfTherads)  
L
liuyq-617 已提交
430
        threads = []
431 432
        for i in range(self.n_numOfTherads):
            thread = threading.Thread(target=self.query_thread_n, args=(i,))
L
liuyq-617 已提交
433 434
            threads.append(thread)
            thread.start()  
435 436 437 438
        for i in range(self.r_numOfTherads):
            thread = threading.Thread(target=self.query_thread_r, args=(i,))
            threads.append(thread)
            thread.start()
L
liuyq-617 已提交
439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545
 
parser = argparse.ArgumentParser()
parser.add_argument(
    '-H',
    '--host-name',
    action='store',
    default='127.0.0.1',
    type=str,
    help='host name to be connected (default: 127.0.0.1)')
parser.add_argument(
    '-S',
    '--ts',
    action='store',
    default=1500000000000,
    type=int,
    help='insert data from timestamp (default: 1500000000000)')
parser.add_argument(
    '-d',
    '--db-name',
    action='store',
    default='test',
    type=str,
    help='Database name to be created (default: test)')
parser.add_argument(
    '-t',
    '--number-of-native-threads',
    action='store',
    default=10,
    type=int,
    help='Number of native threads (default: 10)')
parser.add_argument(
    '-T',
    '--number-of-rest-threads',
    action='store',
    default=10,
    type=int,
    help='Number of rest threads (default: 10)')
parser.add_argument(
    '-r',
    '--number-of-records',
    action='store',
    default=100,
    type=int,
    help='Number of record to be created for each table  (default: 100)')
parser.add_argument(
    '-c',
    '--create-table',
    action='store',
    default='0',
    type=int,
    help='whether gen data (default: 0)')
parser.add_argument(
    '-p',
    '--subtb-name-prefix',
    action='store',
    default='t',
    type=str,
    help='subtable-name-prefix (default: t)')
parser.add_argument(
    '-P',
    '--stb-name-prefix',
    action='store',
    default='st',
    type=str,
    help='stable-name-prefix (default: st)')
parser.add_argument(
    '-b',
    '--probabilities',
    action='store',
    default='0.05',
    type=float,
    help='probabilities of join (default: 0.05)')
parser.add_argument(
    '-l',
    '--loop-per-thread',
    action='store',
    default='100',
    type=int,
    help='loop per thread (default: 100)')
parser.add_argument(
    '-u',
    '--user',
    action='store', 
    default='root',
    type=str,
    help='user name')
parser.add_argument(
    '-w',
    '--password',
    action='store', 
    default='root',
    type=str,
    help='user name')
parser.add_argument(
    '-n',
    '--number-of-tables',
    action='store',
    default=1000,
    type=int,
    help='Number of subtales per stable (default: 1000)')
parser.add_argument(
    '-N',
    '--number-of-stables',
    action='store',
    default=2,
    type=int,
    help='Number of stables  (default: 2)')
L
liuyq-617 已提交
546 547 548 549 550 551 552
parser.add_argument(
    '-m',
    '--mix-stable-subtable',
    action='store',
    default=0,
    type=int,
    help='0:stable & substable ,1:subtable ,2:stable (default: 0)')
L
liuyq-617 已提交
553 554 555 556 557

args = parser.parse_args()
q = ConcurrentInquiry(
    args.ts,args.host_name,args.user,args.password,args.db_name,
                args.stb_name_prefix,args.subtb_name_prefix,args.number_of_native_threads,args.number_of_rest_threads,
L
liuyq-617 已提交
558 559
                args.probabilities,args.loop_per_thread,args.number_of_stables,args.number_of_tables ,args.number_of_records,
                args.mix_stable_subtable )
L
liuyq-617 已提交
560 561 562

if args.create_table: 
    q.gen_data()
563
q.get_full()
L
liuyq-617 已提交
564

565
#q.gen_query_sql()
L
liuyq-617 已提交
566
q.run()
L
liuyq-617 已提交
567