insertWithMoreVgroup.py 14.2 KB
Newer Older
haoranc's avatar
haoranc 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-

import sys
15
import os
16
import threading as thd
haoranc's avatar
haoranc 已提交
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
import multiprocessing as mp
from numpy.lib.function_base import insert
import taos
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
import datetime as dt
import time
# constant define
WAITS = 5 # wait seconds

class TDTestCase:
    #
    # --------------- main frame -------------------
    #
33
    clientCfgDict = {'queryproxy': '1','debugFlag': 135}
haoranc's avatar
haoranc 已提交
34
    clientCfgDict["queryproxy"] = '2'
35 36
    clientCfgDict["debugFlag"] = 143

haoranc's avatar
haoranc 已提交
37
    updatecfgDict = {'clientCfg': {}}
38
    updatecfgDict = {'debugFlag': 143}
haoranc's avatar
haoranc 已提交
39
    updatecfgDict["clientCfg"]  = clientCfgDict
haoranc's avatar
haoranc 已提交
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
    def caseDescription(self):
        '''
        limit and offset keyword function test cases;
        case1: limit offset base function test
        case2: offset return valid
        '''
        return 

    def getBuildPath(self):
        selfPath = os.path.dirname(os.path.realpath(__file__))

        if ("community" in selfPath):
            projPath = selfPath[:selfPath.find("community")]
        else:
            projPath = selfPath[:selfPath.find("tests")]

        for root, dirs, files in os.walk(projPath):
            if ("taosd" in files):
                rootRealPath = os.path.dirname(os.path.realpath(root))
                if ("packaging" not in rootRealPath):
                    buildPath = root[:len(root)-len("/build/bin")]
                    break
        return buildPath

    # init
    def init(self, conn, logSql):
        tdLog.debug("start to execute %s" % __file__)
        tdSql.init(conn.cursor())
        # tdSql.prepare()
        # self.create_tables();
        self.ts = 1500000000000

    # stop 
    def stop(self):
        tdSql.close()
        tdLog.success("%s successfully executed" % __file__)


78
    # --------------- case  -------------------
haoranc's avatar
haoranc 已提交
79 80 81 82 83 84 85 86 87 88

    def newcur(self,host,cfg):
        user = "root"
        password = "taosdata"
        port =6030 
        con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
        cur=con.cursor()
        print(cur)
        return cur

89 90
    # create tables
    def create_tables(self,host,dbname,stbname,count):
haoranc's avatar
haoranc 已提交
91 92 93 94 95 96 97 98
        buildPath = self.getBuildPath()
        config = buildPath+ "../sim/dnode1/cfg/"
        
        tsql=self.newcur(host,config)
        tsql.execute("use %s" %dbname)

        pre_create = "create table"
        sql = pre_create
99
        count=int(count)
haoranc's avatar
haoranc 已提交
100 101 102 103 104

        tdLog.debug("doing create one  stable %s and %d  child table in %s  ..." %(stbname, count ,dbname))
        # print(time.time())
        exeStartTime=time.time()
        # print(type(tcountStop),type(tcountStart))
105
        for i in range(0,count):
haoranc's avatar
haoranc 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
            sql += " %s_%d using %s tags(%d)"%(stbname,i,stbname,i+1)
            if i >0 and i%20000 == 0:
                # print(sql)
                tsql.execute(sql)
                sql = pre_create
        # print(time.time())
        # end sql        
        if sql != pre_create:
            # print(sql)
            tsql.execute(sql)
        exeEndTime=time.time()
        spendTime=exeEndTime-exeStartTime
        speedCreate=count/spendTime
        # tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate))
        return

haoranc's avatar
haoranc 已提交
122
    def mutiThread_create_tables(self,host,dbname,stbname,vgroups,threadNumbers,childrowcount):
123 124 125 126 127 128 129 130
        buildPath = self.getBuildPath()
        config = buildPath+ "../sim/dnode1/cfg/"
        
        tsql=self.newcur(host,config)
        tdLog.debug("create database %s"%dbname)
        tsql.execute("drop database if exists %s"%dbname)
        tsql.execute("create database %s vgroups %d"%(dbname,vgroups))
        tsql.execute("use %s" %dbname)
haoranc's avatar
haoranc 已提交
131
        count=int(childrowcount)
132 133 134 135 136 137 138 139 140 141 142
        threads = []
        for i in range(threadNumbers):
            tsql.execute("create stable %s%d(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%(stbname,i))
            threads.append(thd.Thread(target=self.create_tables, args=(host, dbname, stbname+"%d"%i, count,))) 
        start_time = time.time()
        for tr in threads:
            tr.start()
        for tr in threads:
            tr.join()
        end_time = time.time()
        spendTime=end_time-start_time
haoranc's avatar
haoranc 已提交
143
        speedCreate=threadNumbers*count/spendTime
144 145 146 147 148
        tdLog.debug("spent %.2fs to create %d stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,threadNumbers,threadNumbers*count,speedCreate))
        
        return

    # def create_tables(self,host,dbname,stbname,vgroups,tcountStart,tcountStop):
haoranc's avatar
haoranc 已提交
149 150 151


    # insert data
152
    def insert_data(self, host, dbname, stbname, chilCount, ts_start, rowCount):
153 154 155 156 157 158 159
        buildPath = self.getBuildPath()
        config = buildPath+ "../sim/dnode1/cfg/"
        tsql=self.newcur(host,config)
        tdLog.debug("ready to inser data")
        tsql.execute("use %s" %dbname)
        pre_insert = "insert into "
        sql = pre_insert
160 161
        chilCount=int(chilCount)
        allRows=chilCount*rowCount
162 163
        tdLog.debug("doing insert data into stable-index:%s rows:%d ..."%(stbname, allRows))
        exeStartTime=time.time()
164
        for i in range(0,chilCount):
165 166 167
            sql += " %s_%d values "%(stbname,i)
            for j in range(rowCount):
                sql += "(%d, %d, 'taos_%d') "%(ts_start + j*1000, j, j)
168
                if j >0 and j%4000 == 0:
169
                    # print(sql)
170
                    tsql.execute(sql)
171 172 173
                    sql = "insert into %s_%d values " %(stbname,i)
        # end sql        
        if sql != pre_insert:
haoranc's avatar
haoranc 已提交
174
            # print(sql)
175 176
            print(len(sql))
            tsql.execute(sql)
177 178 179
        exeEndTime=time.time()
        spendTime=exeEndTime-exeStartTime
        speedInsert=allRows/spendTime
180 181
        tdLog.debug("spent %.2fs to INSERT  %d rows into %s , insert rate is  %.2f rows/s... [OK]"% (spendTime,allRows,stbname,speedInsert))
        # tdLog.debug("INSERT TABLE DATA ............ [OK]")
182 183
        return

184
    def mutiThread_insert_data(self, host, dbname, stbname, threadNumbers, chilCount, ts_start, childrowcount):
185 186 187 188 189 190 191
        buildPath = self.getBuildPath()
        config = buildPath+ "../sim/dnode1/cfg/"
        
        tsql=self.newcur(host,config)
        tdLog.debug("ready to inser data")

        tsql.execute("use %s" %dbname)
192
        chilCount=int(chilCount)
193 194
        threads = []
        for i in range(threadNumbers):
195 196
            # tsql.execute("create stable %s%d(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%(stbname,i))
            threads.append(thd.Thread(target=self.insert_data, args=(host, dbname, stbname+"%d"%i, chilCount, ts_start, childrowcount,))) 
197 198 199 200 201 202 203
        start_time = time.time()
        for tr in threads:
            tr.start()
        for tr in threads:
            tr.join()
        end_time = time.time()
        spendTime=end_time-start_time
204 205 206 207 208 209 210 211 212 213 214 215
        tableCounts=threadNumbers*chilCount
        stableRows=chilCount*childrowcount
        allRows=stableRows*threadNumbers
        speedInsert=allRows/spendTime

        for i in range(threadNumbers):
            tdSql.execute("use %s" %dbname)
            tdSql.query("select count(*) from %s%d"%(stbname,i))
            tdSql.checkData(0,0,stableRows)
        tdLog.debug("spent %.2fs to insert %d rows  into %d stable and %d table,  speed is %.2f table/s... [OK]"% (spendTime,allRows,threadNumbers,tableCounts,speedInsert))
        tdLog.debug("INSERT TABLE DATA ............ [OK]")

haoranc's avatar
haoranc 已提交
216 217
        return

218

219 220 221 222 223 224 225 226 227 228
    def taosBench(self,jsonFile):
        buildPath = self.getBuildPath()
        if (buildPath == ""):
            tdLog.exit("taosd not found!")
        else:
            tdLog.info("taosd found in %s" % buildPath)
        taosBenchbin = buildPath+ "/build/bin/taosBenchmark"
        os.system("%s -f %s -y " %(taosBenchbin,jsonFile))
       
        return
haoranc's avatar
haoranc 已提交
229
    def taosBenchCreate(self,host,dropdb,dbname,stbname,vgroups,processNumbers,count):
230
        
231 232
        # count=50000
        buildPath = self.getBuildPath()
233 234
        config = buildPath+ "../sim/dnode1/cfg/"
        tsql=self.newcur(host,config)
235 236

        # insert: create one  or mutiple tables per sql and insert multiple rows per sql
237
        tsql.execute("drop database if exists %s"%dbname)
haoranc's avatar
haoranc 已提交
238

239 240 241 242 243
        tsql.execute("create database %s vgroups %d"%(dbname,vgroups))
        print("db has been created")
        # tsql.getResult("show databases")
        # print(tdSql.queryResult)
        tsql.execute("use %s" %dbname)
244 245
    
        threads = []
haoranc's avatar
haoranc 已提交
246
        for i in range(processNumbers):
247 248
            jsonfile="1-insert/Vgroups%d%d.json"%(vgroups,i)
            os.system("cp -f 1-insert/manyVgroups.json   %s"%(jsonfile))
P
plum-lihui 已提交
249
            os.system("sed -i 's/\"name\": \"db\",/\"name\": \"%s\",/g' %s"%(dbname,jsonfile))
250 251 252
            os.system("sed -i 's/\"drop\": \"no\",/\"drop\": \"%s\",/g' %s"%(dropdb,jsonfile))
            os.system("sed -i 's/\"host\": \"127.0.0.1\",/\"host\": \"%s\",/g' %s"%(host,jsonfile))
            os.system("sed -i 's/\"childtable_count\": 10000,/\"childtable_count\": %d,/g' %s "%(count,jsonfile))
253 254 255 256 257 258 259 260 261 262 263 264 265 266
            os.system("sed -i 's/\"name\": \"stb1\",/\"name\":  \"%s%d\",/g' %s "%(stbname,i,jsonfile))
            os.system("sed -i 's/\"childtable_prefix\": \"stb1_\",/\"childtable_prefix\": \"%s%d_\",/g' %s "%(stbname,i,jsonfile))
            threads.append(mp.Process(target=self.taosBench, args=("%s"%jsonfile,))) 
        start_time = time.time()
        for tr in threads:
            tr.start()
        for tr in threads:
            tr.join()
        end_time = time.time()

        spendTime=end_time-start_time
        speedCreate=count/spendTime
        tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate))
        return
haoranc's avatar
haoranc 已提交
267 268
    # test case1 base 
    def test_case1(self):
269 270
        tdLog.debug("-----create database and muti-thread create tables test------- ")
        #host,dbname,stbname,vgroups,threadNumbers,tcountStart,tcountStop
271
        #host, dbname, stbname, threadNumbers, chilCount, ts_start, childrowcount
haoranc's avatar
haoranc 已提交
272
        self.mutiThread_create_tables(host="localhost",dbname="db",stbname="stb", vgroups=1, threadNumbers=5, childrowcount=50)
273 274
        self.mutiThread_insert_data(host="localhost",dbname="db",stbname="stb", threadNumbers=5,chilCount=50,ts_start=self.ts,childrowcount=10)

haoranc's avatar
haoranc 已提交
275 276 277 278 279
        return 

    # test case2 base:insert data
    def test_case2(self):

280
        tdLog.debug("-----muti-thread insert data test------- ")
haoranc's avatar
haoranc 已提交
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
        # drop database
        tdSql.execute("drop database if exists db1")
        tdSql.execute("drop database if exists db4")
        tdSql.execute("drop database if exists db6")
        tdSql.execute("drop database if exists db8")
        tdSql.execute("drop database if exists db12")
        tdSql.execute("drop database if exists db16")

        #create database and tables;

        tdSql.execute("create database db1 vgroups 1")
        self.create_tables("db1", "stb1", 1*100)
        self.insert_data("db1", "stb1", self.ts, 1*50,1*10000)
        return

296
    def test_case3(self):
297
        self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 1, 8, 1*10000)
298
        # self.taosBenchCreate("test209","no","db2", "stb2", 1, 8, 1*10000)
299

300
        # self.taosBenchCreate("chenhaoran02","no","db1", "stb1", 1, 8, 1*10000)
301

P
plum-lihui 已提交
302 303 304
        # self.taosBenchCreate("db1", "stb1", 4, 5, 100*10000)
        # self.taosBenchCreate("db1", "stb1", 1, 5, 100*10000)

305 306
        return 

haoranc's avatar
haoranc 已提交
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
    def test_case4(self):
        self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 1, 2, 1*10)
        tdSql.execute("use db1;")
        tdSql.query("show dnodes;")
        dnodeId=tdSql.getData(0,0)
        print(dnodeId)
        tdSql.execute("create qnode on dnode %s"%dnodeId)
        tdSql.query("select max(c1) from stb10;")
        maxQnode=tdSql.getData(0,0)
        tdSql.query("select min(c1) from stb11;")
        minQnode=tdSql.getData(0,0)
        tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;")
        unionQnode=tdSql.queryResult
        tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all  select c0,c1 from stb11_1 where c0>2000;")
        unionallQnode=tdSql.queryResult

        # tdSql.query("show qnodes;")
        # qnodeId=tdSql.getData(0,0)
        tdSql.execute("drop qnode on dnode %s"%dnodeId)
        tdSql.execute("reset query cache")
        tdSql.query("select max(c1) from stb10;")
        tdSql.checkData(0, 0, "%s"%maxQnode)
        tdSql.query("select min(c1) from stb11;")     
        tdSql.checkData(0, 0, "%s"%minQnode)
        tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;")
        unionVnode=tdSql.queryResult
        assert unionQnode == unionVnode
        tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all  select c0,c1 from stb11_1 where c0>2000;")
        unionallVnode=tdSql.queryResult
        assert unionallQnode == unionallVnode


        # tdSql.execute("create qnode on dnode %s"%dnodeId)


        # self.taosBenchCreate("test209","no","db2", "stb2", 1, 8, 1*10000)

        # self.taosBenchCreate("chenhaoran02","no","db1", "stb1", 1, 8, 1*10000)

        # self.taosBenchCreate("db1", "stb1", 4, 5, 100*10000)
        # self.taosBenchCreate("db1", "stb1", 1, 5, 100*10000)

349 350 351
    # run case   
    def run(self):

352 353 354
        # create database and tables。
        self.test_case1()
        tdLog.debug(" LIMIT test_case1 ............ [OK]")
355

356 357 358
    #    # taosBenchmark : create database and table 
    #     self.test_case2()
    #     tdLog.debug(" LIMIT test_case2 ............ [OK]")
359

360 361 362
        # # taosBenchmark:create database/table and insert data
        # self.test_case3()
        # tdLog.debug(" LIMIT test_case3 ............ [OK]")
363 364 365 366 367 368 369


        # # test qnode
        # self.test_case4()
        # tdLog.debug(" LIMIT test_case3 ............ [OK]")


haoranc's avatar
haoranc 已提交
370
        return 
haoranc's avatar
haoranc 已提交
371 372 373 374 375
#
# add case with filename
#
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())