################################################################### # Copyright (c) 2016 by TAOS Technologies, Inc. # All rights reserved. # # This file is proprietary and confidential to TAOS Technologies. # No part of this file may be reproduced, stored, transmitted, # disclosed or used in any form or by any means other than as # expressly provided by the written permission from Jianhui Tao # ################################################################### # -*- coding: utf-8 -*- import sys import os import threading as thd import multiprocessing as mp from numpy.lib.function_base import insert import taos from util.log import * from util.cases import * from util.sql import * import numpy as np import datetime as dt import time # constant define WAITS = 5 # wait seconds class TDTestCase: # # --------------- main frame ------------------- # clientCfgDict = {'queryproxy': '1'} clientCfgDict["queryproxy"] = '2' updatecfgDict = {'clientCfg': {}} updatecfgDict["clientCfg"] = clientCfgDict def caseDescription(self): ''' limit and offset keyword function test cases; case1: limit offset base function test case2: offset return valid ''' return def getBuildPath(self): selfPath = os.path.dirname(os.path.realpath(__file__)) if ("community" in selfPath): projPath = selfPath[:selfPath.find("community")] else: projPath = selfPath[:selfPath.find("tests")] for root, dirs, files in os.walk(projPath): if ("taosd" in files): rootRealPath = os.path.dirname(os.path.realpath(root)) if ("packaging" not in rootRealPath): buildPath = root[:len(root)-len("/build/bin")] break return buildPath # init def init(self, conn, logSql): tdLog.debug("start to execute %s" % __file__) tdSql.init(conn.cursor()) # tdSql.prepare() # self.create_tables(); self.ts = 1500000000000 # stop def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) # --------------- case ------------------- def newcur(self,host,cfg): user = "root" password = "taosdata" port =6030 con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port) cur=con.cursor() print(cur) return cur # create tables def create_tables(self,host,dbname,stbname,count): buildPath = self.getBuildPath() config = buildPath+ "../sim/dnode1/cfg/" tsql=self.newcur(host,config) tsql.execute("use %s" %dbname) pre_create = "create table" sql = pre_create count=int(count) tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) # print(time.time()) exeStartTime=time.time() # print(type(tcountStop),type(tcountStart)) for i in range(0,count): sql += " %s_%d using %s tags(%d)"%(stbname,i,stbname,i+1) if i >0 and i%20000 == 0: # print(sql) tsql.execute(sql) sql = pre_create # print(time.time()) # end sql if sql != pre_create: # print(sql) tsql.execute(sql) exeEndTime=time.time() spendTime=exeEndTime-exeStartTime speedCreate=count/spendTime # tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate)) return def mutiThread_create_tables(self,host,dbname,stbname,vgroups,threadNumbers,count): buildPath = self.getBuildPath() config = buildPath+ "../sim/dnode1/cfg/" tsql=self.newcur(host,config) tdLog.debug("create database %s"%dbname) tsql.execute("drop database if exists %s"%dbname) tsql.execute("create database %s vgroups %d"%(dbname,vgroups)) tsql.execute("use %s" %dbname) count=int(count) threads = [] for i in range(threadNumbers): tsql.execute("create stable %s%d(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%(stbname,i)) threads.append(thd.Thread(target=self.create_tables, args=(host, dbname, stbname+"%d"%i, count,))) start_time = time.time() for tr in threads: tr.start() for tr in threads: tr.join() end_time = time.time() spendTime=end_time-start_time speedCreate=count/spendTime tdLog.debug("spent %.2fs to create %d stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,threadNumbers,threadNumbers*count,speedCreate)) return # def create_tables(self,host,dbname,stbname,vgroups,tcountStart,tcountStop): # insert data def insert_data(self, host, dbname, stbname, ts_start,rowCount): buildPath = self.getBuildPath() config = buildPath+ "../sim/dnode1/cfg/" tsql=self.newcur(host,config) tdLog.debug("ready to inser data") tsql.execute("use %s" %dbname) pre_insert = "insert into " sql = pre_insert tcount=int(tcount) allRows=tcount*rowCount tdLog.debug("doing insert data into stable-index:%s rows:%d ..."%(stbname, allRows)) exeStartTime=time.time() for i in range(0,tcount): sql += " %s_%d values "%(stbname,i) for j in range(rowCount): sql += "(%d, %d, 'taos_%d') "%(ts_start + j*1000, j, j) if j >0 and j%5000 == 0: # print(sql) tdSql.execute(sql) sql = "insert into %s_%d values " %(stbname,i) # end sql if sql != pre_insert: # print(sql) tdSql.execute(sql) exeEndTime=time.time() spendTime=exeEndTime-exeStartTime speedInsert=allRows/spendTime # tdLog.debug("spent %.2fs to INSERT %d rows , insert rate is %.2f rows/s... [OK]"% (spendTime,allRows,speedInsert)) tdLog.debug("INSERT TABLE DATA ............ [OK]") return def mutiThread_insert_data(self, host, dbname, stbname, threadNumbers, ts_start, tcountStart,tcountStop,rowCount): buildPath = self.getBuildPath() config = buildPath+ "../sim/dnode1/cfg/" tsql=self.newcur(host,config) tdLog.debug("ready to inser data") tsql.execute("use %s" %dbname) pre_insert = "insert into " sql = pre_insert tcount=tcountStop-tcountStart allRows=tcount*rowCount tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbname, allRows)) exeStartTime=time.time() for i in range(tcountStart,tcountStop): sql += " %s_%d values "%(stbname,i) for j in range(rowCount): sql += "(%d, %d, 'taos_%d') "%(ts_start + j*1000, j, j) if j >0 and j%5000 == 0: # print(sql) tdSql.execute(sql) sql = "insert into %s_%d values " %(stbname,i) # end sql if sql != pre_insert: # print(sql) tdSql.execute(sql) exeEndTime=time.time() spendTime=exeEndTime-exeStartTime speedInsert=allRows/spendTime # tdLog.debug("spent %.2fs to INSERT %d rows , insert rate is %.2f rows/s... [OK]"% (spendTime,allRows,speedInsert)) tdLog.debug("INSERT TABLE DATA ............ [OK]") buildPath = self.getBuildPath() config = buildPath+ "../sim/dnode1/cfg/" tsql=self.newcur(host,config) tsql.execute("use %s" %dbname) count=int(count) threads = [] for i in range(threadNumbers): tsql.execute("create stable %s%d(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%(stbname,i)) threads.append(thd.Thread(target=self.create_tables, args=(host, dbname, stbname+"%d"%i, count,))) start_time = time.time() for tr in threads: tr.start() for tr in threads: tr.join() end_time = time.time() spendTime=end_time-start_time speedCreate=count/spendTime tdLog.debug("spent %.2fs to create %d stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,threadNumbers,threadNumbers*count,speedCreate)) return def taosBench(self,jsonFile): buildPath = self.getBuildPath() if (buildPath == ""): tdLog.exit("taosd not found!") else: tdLog.info("taosd found in %s" % buildPath) taosBenchbin = buildPath+ "/build/bin/taosBenchmark" os.system("%s -f %s -y " %(taosBenchbin,jsonFile)) return def taosBenchCreate(self,host,dropdb,dbname,stbname,vgroups,processNumbers,count): # count=50000 buildPath = self.getBuildPath() config = buildPath+ "../sim/dnode1/cfg/" tsql=self.newcur(host,config) # insert: create one or mutiple tables per sql and insert multiple rows per sql tsql.execute("drop database if exists %s"%dbname) tsql.execute("create database %s vgroups %d"%(dbname,vgroups)) print("db has been created") # tsql.getResult("show databases") # print(tdSql.queryResult) tsql.execute("use %s" %dbname) threads = [] for i in range(processNumbers): jsonfile="1-insert/Vgroups%d%d.json"%(vgroups,i) os.system("cp -f 1-insert/manyVgroups.json %s"%(jsonfile)) os.system("sed -i 's/\"name\": \"db\",/\"name\": \"%s\",/g' %s"%(dbname,jsonfile)) os.system("sed -i 's/\"drop\": \"no\",/\"drop\": \"%s\",/g' %s"%(dropdb,jsonfile)) os.system("sed -i 's/\"host\": \"127.0.0.1\",/\"host\": \"%s\",/g' %s"%(host,jsonfile)) os.system("sed -i 's/\"childtable_count\": 10000,/\"childtable_count\": %d,/g' %s "%(count,jsonfile)) os.system("sed -i 's/\"name\": \"stb1\",/\"name\": \"%s%d\",/g' %s "%(stbname,i,jsonfile)) os.system("sed -i 's/\"childtable_prefix\": \"stb1_\",/\"childtable_prefix\": \"%s%d_\",/g' %s "%(stbname,i,jsonfile)) threads.append(mp.Process(target=self.taosBench, args=("%s"%jsonfile,))) start_time = time.time() for tr in threads: tr.start() for tr in threads: tr.join() end_time = time.time() spendTime=end_time-start_time speedCreate=count/spendTime tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate)) return # test case1 base def test_case1(self): tdLog.debug("-----create database and muti-thread create tables test------- ") #host,dbname,stbname,vgroups,threadNumbers,tcountStart,tcountStop self.mutiThread_create_tables(host="localhost",dbname="db2",stbname="stb2", vgroups=1, threadNumbers=5, count=10000) return # test case2 base:insert data def test_case2(self): tdLog.debug("-----muti-thread insert data test------- ") # drop database tdSql.execute("drop database if exists db1") tdSql.execute("drop database if exists db4") tdSql.execute("drop database if exists db6") tdSql.execute("drop database if exists db8") tdSql.execute("drop database if exists db12") tdSql.execute("drop database if exists db16") #create database and tables; tdSql.execute("create database db1 vgroups 1") self.create_tables("db1", "stb1", 1*100) self.insert_data("db1", "stb1", self.ts, 1*50,1*10000) return def test_case3(self): self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 1, 8, 1*10000) # self.taosBenchCreate("test209","no","db2", "stb2", 1, 8, 1*10000) # self.taosBenchCreate("chenhaoran02","no","db1", "stb1", 1, 8, 1*10000) # self.taosBenchCreate("db1", "stb1", 4, 5, 100*10000) # self.taosBenchCreate("db1", "stb1", 1, 5, 100*10000) return def test_case4(self): self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 1, 2, 1*10) tdSql.execute("use db1;") tdSql.query("show dnodes;") dnodeId=tdSql.getData(0,0) print(dnodeId) tdSql.execute("create qnode on dnode %s"%dnodeId) tdSql.query("select max(c1) from stb10;") maxQnode=tdSql.getData(0,0) tdSql.query("select min(c1) from stb11;") minQnode=tdSql.getData(0,0) tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;") unionQnode=tdSql.queryResult tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;") unionallQnode=tdSql.queryResult # tdSql.query("show qnodes;") # qnodeId=tdSql.getData(0,0) tdSql.execute("drop qnode on dnode %s"%dnodeId) tdSql.execute("reset query cache") tdSql.query("select max(c1) from stb10;") tdSql.checkData(0, 0, "%s"%maxQnode) tdSql.query("select min(c1) from stb11;") tdSql.checkData(0, 0, "%s"%minQnode) tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;") unionVnode=tdSql.queryResult assert unionQnode == unionVnode tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;") unionallVnode=tdSql.queryResult assert unionallQnode == unionallVnode # tdSql.execute("create qnode on dnode %s"%dnodeId) # self.taosBenchCreate("test209","no","db2", "stb2", 1, 8, 1*10000) # self.taosBenchCreate("chenhaoran02","no","db1", "stb1", 1, 8, 1*10000) # self.taosBenchCreate("db1", "stb1", 4, 5, 100*10000) # self.taosBenchCreate("db1", "stb1", 1, 5, 100*10000) # run case def run(self): # # test base case # self.test_case1() # tdLog.debug(" LIMIT test_case1 ............ [OK]") # test case # self.test_case2() # tdLog.debug(" LIMIT test_case2 ............ [OK]") # test case self.test_case3() tdLog.debug(" LIMIT test_case3 ............ [OK]") # # test qnode # self.test_case4() # tdLog.debug(" LIMIT test_case3 ............ [OK]") return # # add case with filename # tdCases.addWindows(__file__, TDTestCase()) tdCases.addLinux(__file__, TDTestCase())