insertWithMoreVgroup.py 14.4 KB
Newer Older
haoranc's avatar
haoranc 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-

import sys
15
import os
16
import threading as thd
haoranc's avatar
haoranc 已提交
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
import multiprocessing as mp
from numpy.lib.function_base import insert
import taos
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
import datetime as dt
import time
# constant define
WAITS = 5 # wait seconds

class TDTestCase:
    #
    # --------------- main frame -------------------
    #
33
    clientCfgDict = {'queryproxy': '1','debugFlag': 135}
haoranc's avatar
haoranc 已提交
34
    clientCfgDict["queryproxy"] = '2'
35 36
    clientCfgDict["debugFlag"] = 143

haoranc's avatar
haoranc 已提交
37
    updatecfgDict = {'clientCfg': {}}
38
    updatecfgDict = {'debugFlag': 143}
haoranc's avatar
haoranc 已提交
39
    updatecfgDict["clientCfg"]  = clientCfgDict
haoranc's avatar
haoranc 已提交
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
    def caseDescription(self):
        '''
        limit and offset keyword function test cases;
        case1: limit offset base function test
        case2: offset return valid
        '''
        return 

    def getBuildPath(self):
        selfPath = os.path.dirname(os.path.realpath(__file__))

        if ("community" in selfPath):
            projPath = selfPath[:selfPath.find("community")]
        else:
            projPath = selfPath[:selfPath.find("tests")]

        for root, dirs, files in os.walk(projPath):
wafwerar's avatar
wafwerar 已提交
57
            if ("taosd" in files or "taosd.exe" in files):
haoranc's avatar
haoranc 已提交
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
                rootRealPath = os.path.dirname(os.path.realpath(root))
                if ("packaging" not in rootRealPath):
                    buildPath = root[:len(root)-len("/build/bin")]
                    break
        return buildPath

    # init
    def init(self, conn, logSql):
        tdLog.debug("start to execute %s" % __file__)
        tdSql.init(conn.cursor())
        # tdSql.prepare()
        # self.create_tables();
        self.ts = 1500000000000

    # stop 
    def stop(self):
        tdSql.close()
        tdLog.success("%s successfully executed" % __file__)


78
    # --------------- case  -------------------
haoranc's avatar
haoranc 已提交
79 80 81 82 83

    def newcur(self,host,cfg):
        user = "root"
        password = "taosdata"
        port =6030 
haoranc's avatar
haoranc 已提交
84
        con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
haoranc's avatar
haoranc 已提交
85 86 87 88
        cur=con.cursor()
        print(cur)
        return cur

89 90
    # create tables
    def create_tables(self,host,dbname,stbname,count):
haoranc's avatar
haoranc 已提交
91 92 93 94 95 96 97 98
        buildPath = self.getBuildPath()
        config = buildPath+ "../sim/dnode1/cfg/"
        
        tsql=self.newcur(host,config)
        tsql.execute("use %s" %dbname)

        pre_create = "create table"
        sql = pre_create
99
        count=int(count)
haoranc's avatar
haoranc 已提交
100 101 102 103 104

        tdLog.debug("doing create one  stable %s and %d  child table in %s  ..." %(stbname, count ,dbname))
        # print(time.time())
        exeStartTime=time.time()
        # print(type(tcountStop),type(tcountStart))
105
        for i in range(0,count):
haoranc's avatar
haoranc 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
            sql += " %s_%d using %s tags(%d)"%(stbname,i,stbname,i+1)
            if i >0 and i%20000 == 0:
                # print(sql)
                tsql.execute(sql)
                sql = pre_create
        # print(time.time())
        # end sql        
        if sql != pre_create:
            # print(sql)
            tsql.execute(sql)
        exeEndTime=time.time()
        spendTime=exeEndTime-exeStartTime
        speedCreate=count/spendTime
        # tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate))
        return

haoranc's avatar
haoranc 已提交
122
    def mutiThread_create_tables(self,host,dbname,stbname,vgroups,threadNumbers,childcount):
123 124 125 126 127 128 129 130
        buildPath = self.getBuildPath()
        config = buildPath+ "../sim/dnode1/cfg/"
        
        tsql=self.newcur(host,config)
        tdLog.debug("create database %s"%dbname)
        tsql.execute("drop database if exists %s"%dbname)
        tsql.execute("create database %s vgroups %d"%(dbname,vgroups))
        tsql.execute("use %s" %dbname)
haoranc's avatar
haoranc 已提交
131
        count=int(childcount)
132 133 134 135 136 137 138 139 140 141 142
        threads = []
        for i in range(threadNumbers):
            tsql.execute("create stable %s%d(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%(stbname,i))
            threads.append(thd.Thread(target=self.create_tables, args=(host, dbname, stbname+"%d"%i, count,))) 
        start_time = time.time()
        for tr in threads:
            tr.start()
        for tr in threads:
            tr.join()
        end_time = time.time()
        spendTime=end_time-start_time
haoranc's avatar
haoranc 已提交
143
        speedCreate=threadNumbers*count/spendTime
144 145 146 147 148
        tdLog.debug("spent %.2fs to create %d stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,threadNumbers,threadNumbers*count,speedCreate))
        
        return

    # def create_tables(self,host,dbname,stbname,vgroups,tcountStart,tcountStop):
haoranc's avatar
haoranc 已提交
149 150 151


    # insert data
152
    def insert_data(self, host, dbname, stbname, chilCount, ts_start, rowCount):
153 154 155 156 157 158 159
        buildPath = self.getBuildPath()
        config = buildPath+ "../sim/dnode1/cfg/"
        tsql=self.newcur(host,config)
        tdLog.debug("ready to inser data")
        tsql.execute("use %s" %dbname)
        pre_insert = "insert into "
        sql = pre_insert
160 161
        chilCount=int(chilCount)
        allRows=chilCount*rowCount
162 163
        tdLog.debug("doing insert data into stable-index:%s rows:%d ..."%(stbname, allRows))
        exeStartTime=time.time()
164
        for i in range(0,chilCount):
165 166 167
            sql += " %s_%d values "%(stbname,i)
            for j in range(rowCount):
                sql += "(%d, %d, 'taos_%d') "%(ts_start + j*1000, j, j)
168
                if j >0 and j%4000 == 0:
169
                    # print(sql)
170
                    tsql.execute(sql)
171 172 173
                    sql = "insert into %s_%d values " %(stbname,i)
        # end sql        
        if sql != pre_insert:
haoranc's avatar
haoranc 已提交
174
            # print(sql)
175 176
            print(len(sql))
            tsql.execute(sql)
177 178 179
        exeEndTime=time.time()
        spendTime=exeEndTime-exeStartTime
        speedInsert=allRows/spendTime
180 181
        tdLog.debug("spent %.2fs to INSERT  %d rows into %s , insert rate is  %.2f rows/s... [OK]"% (spendTime,allRows,stbname,speedInsert))
        # tdLog.debug("INSERT TABLE DATA ............ [OK]")
182 183
        return

184
    def mutiThread_insert_data(self, host, dbname, stbname, threadNumbers, chilCount, ts_start, childrowcount):
185 186 187 188 189 190 191
        buildPath = self.getBuildPath()
        config = buildPath+ "../sim/dnode1/cfg/"
        
        tsql=self.newcur(host,config)
        tdLog.debug("ready to inser data")

        tsql.execute("use %s" %dbname)
192
        chilCount=int(chilCount)
193 194
        threads = []
        for i in range(threadNumbers):
195 196
            # tsql.execute("create stable %s%d(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%(stbname,i))
            threads.append(thd.Thread(target=self.insert_data, args=(host, dbname, stbname+"%d"%i, chilCount, ts_start, childrowcount,))) 
197 198 199 200 201 202 203
        start_time = time.time()
        for tr in threads:
            tr.start()
        for tr in threads:
            tr.join()
        end_time = time.time()
        spendTime=end_time-start_time
204 205 206 207 208 209 210 211 212 213 214 215
        tableCounts=threadNumbers*chilCount
        stableRows=chilCount*childrowcount
        allRows=stableRows*threadNumbers
        speedInsert=allRows/spendTime

        for i in range(threadNumbers):
            tdSql.execute("use %s" %dbname)
            tdSql.query("select count(*) from %s%d"%(stbname,i))
            tdSql.checkData(0,0,stableRows)
        tdLog.debug("spent %.2fs to insert %d rows  into %d stable and %d table,  speed is %.2f table/s... [OK]"% (spendTime,allRows,threadNumbers,tableCounts,speedInsert))
        tdLog.debug("INSERT TABLE DATA ............ [OK]")

haoranc's avatar
haoranc 已提交
216 217
        return

218

219 220 221 222 223 224 225 226 227 228
    def taosBench(self,jsonFile):
        buildPath = self.getBuildPath()
        if (buildPath == ""):
            tdLog.exit("taosd not found!")
        else:
            tdLog.info("taosd found in %s" % buildPath)
        taosBenchbin = buildPath+ "/build/bin/taosBenchmark"
        os.system("%s -f %s -y " %(taosBenchbin,jsonFile))
       
        return
haoranc's avatar
haoranc 已提交
229
    def taosBenchCreate(self,host,dropdb,dbname,stbname,vgroups,processNumbers,count):
230
        
231 232
        # count=50000
        buildPath = self.getBuildPath()
233 234
        config = buildPath+ "../sim/dnode1/cfg/"
        tsql=self.newcur(host,config)
235 236

        # insert: create one  or mutiple tables per sql and insert multiple rows per sql
237
        tsql.execute("drop database if exists %s"%dbname)
haoranc's avatar
haoranc 已提交
238

239 240 241 242 243
        tsql.execute("create database %s vgroups %d"%(dbname,vgroups))
        print("db has been created")
        # tsql.getResult("show databases")
        # print(tdSql.queryResult)
        tsql.execute("use %s" %dbname)
244 245
    
        threads = []
haoranc's avatar
haoranc 已提交
246
        for i in range(processNumbers):
247 248
            jsonfile="1-insert/Vgroups%d%d.json"%(vgroups,i)
            os.system("cp -f 1-insert/manyVgroups.json   %s"%(jsonfile))
P
plum-lihui 已提交
249
            os.system("sed -i 's/\"name\": \"db\",/\"name\": \"%s\",/g' %s"%(dbname,jsonfile))
250 251 252
            os.system("sed -i 's/\"drop\": \"no\",/\"drop\": \"%s\",/g' %s"%(dropdb,jsonfile))
            os.system("sed -i 's/\"host\": \"127.0.0.1\",/\"host\": \"%s\",/g' %s"%(host,jsonfile))
            os.system("sed -i 's/\"childtable_count\": 10000,/\"childtable_count\": %d,/g' %s "%(count,jsonfile))
253 254 255 256 257 258 259 260 261 262 263 264 265 266
            os.system("sed -i 's/\"name\": \"stb1\",/\"name\":  \"%s%d\",/g' %s "%(stbname,i,jsonfile))
            os.system("sed -i 's/\"childtable_prefix\": \"stb1_\",/\"childtable_prefix\": \"%s%d_\",/g' %s "%(stbname,i,jsonfile))
            threads.append(mp.Process(target=self.taosBench, args=("%s"%jsonfile,))) 
        start_time = time.time()
        for tr in threads:
            tr.start()
        for tr in threads:
            tr.join()
        end_time = time.time()

        spendTime=end_time-start_time
        speedCreate=count/spendTime
        tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate))
        return
haoranc's avatar
haoranc 已提交
267

haoranc's avatar
haoranc 已提交
268 269 270 271 272 273 274 275 276 277
    def checkData(self,dbname,stbname,stableCount,CtableCount,rowsPerSTable,):
        tdSql.execute("use %s"%dbname)
        tdSql.query("show stables")
        tdSql.checkRows(stableCount)
        tdSql.query("show tables")
        tdSql.checkRows(CtableCount)
        for i in range(stableCount):
            tdSql.query("select count(*) from %s%d"%(stbname,i))
            tdSql.checkData(0,0,rowsPerSTable)
        return 
haoranc's avatar
haoranc 已提交
278 279 280
     
     
    # test case1 base 
haoranc's avatar
haoranc 已提交
281
    def test_case1(self):
haoranc's avatar
haoranc 已提交
282 283 284 285 286 287 288 289 290 291 292 293
        #stableCount=threadNumbersCtb
        parameterDict = {'vgroups':        1,    \
                         'threadNumbersCtb': 5,  \
                         'threadNumbersIda': 5, \
                         'stableCount':   5,      \
                         'tablesPerStb':    50,  \
                         'rowsPerTable':    10,  \
                         'dbname':    'db',    \
                         'stbname':    'stb',   \
                         'host':  'localhost',    \
                         'startTs':    1640966400000}  # 2022-01-01 00:00:00.000
        
294 295
        tdLog.debug("-----create database and muti-thread create tables test------- ")
        #host,dbname,stbname,vgroups,threadNumbers,tcountStart,tcountStop
296
        #host, dbname, stbname, threadNumbers, chilCount, ts_start, childrowcount
haoranc's avatar
haoranc 已提交
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
        self.mutiThread_create_tables(
            host=parameterDict['host'],
            dbname=parameterDict['dbname'],
            stbname=parameterDict['stbname'], 
            vgroups=parameterDict['vgroups'], 
            threadNumbers=parameterDict['threadNumbersCtb'], 
            childcount=parameterDict['tablesPerStb'])

        self.mutiThread_insert_data(
            host=parameterDict['host'],
            dbname=parameterDict['dbname'],
            stbname=parameterDict['stbname'], 
            threadNumbers=parameterDict['threadNumbersIda'],
            chilCount=parameterDict['tablesPerStb'],
            ts_start=parameterDict['startTs'],
            childrowcount=parameterDict['rowsPerTable'])

        tableCount=parameterDict['threadNumbersCtb']*parameterDict['tablesPerStb']
        rowsPerStable=parameterDict['rowsPerTable']*parameterDict['tablesPerStb']

        self.checkData(dbname=parameterDict['dbname'],stbname=parameterDict['stbname'], stableCount=parameterDict['threadNumbersCtb'],CtableCount=tableCount,rowsPerSTable=rowsPerStable)
    
319
    def test_case3(self):
haoranc's avatar
haoranc 已提交
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
        #stableCount=threadNumbersCtb
        parameterDict = {'vgroups':        1,    \
                         'threadNumbersCtb': 8,  \
                         'stableCount':   5,      \
                         'tablesPerStb':    10,  \
                         'rowsPerTable':    100,  \
                         'dbname':    'db1',    \
                         'stbname':    'stb1',   \
                         'host':  'localhost',    \
                         'startTs':    1640966400000}  # 2022-01-01 00:00:00.000
                         
        self.taosBenchCreate(
            parameterDict['host'],
            "no",
            parameterDict['dbname'], 
            parameterDict['stbname'], 
            parameterDict['vgroups'],  
            parameterDict['threadNumbersCtb'], 
            parameterDict['tablesPerStb'])
        tableCount=parameterDict['threadNumbersCtb']*parameterDict['tablesPerStb']
        rowsPerStable=parameterDict['rowsPerTable']*parameterDict['tablesPerStb']

        self.checkData(
            dbname=parameterDict['dbname'],
            stbname=parameterDict['stbname'], 
            stableCount=parameterDict['threadNumbersCtb'],
            CtableCount=tableCount,
            rowsPerSTable=rowsPerStable)

349
        # self.taosBenchCreate("test209","no","db2", "stb2", 1, 8, 1*10000)
350

351
        # self.taosBenchCreate("chenhaoran02","no","db1", "stb1", 1, 8, 1*10000)
352

P
plum-lihui 已提交
353 354 355
        # self.taosBenchCreate("db1", "stb1", 4, 5, 100*10000)
        # self.taosBenchCreate("db1", "stb1", 1, 5, 100*10000)

356 357
        return 

358 359 360
    # run case   
    def run(self):

haoranc's avatar
haoranc 已提交
361 362 363
        # create database and tables。
        self.test_case1()
        tdLog.debug(" LIMIT test_case1 ............ [OK]")
364

365 366 367
        # taosBenchmark:create database/table and insert data
        self.test_case3()
        tdLog.debug(" LIMIT test_case3 ............ [OK]")
368 369 370



haoranc's avatar
haoranc 已提交
371
        return 
haoranc's avatar
haoranc 已提交
372 373 374 375
#
# add case with filename
#
tdCases.addWindows(__file__, TDTestCase())
haoranc's avatar
haoranc 已提交
376
tdCases.addLinux(__file__, TDTestCase())