restfulInsert.py 6.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-

import requests
import threading
import random
import time
P
Ping Xiao 已提交
18
import argparse
19 20

class RestfulInsert:
P
Ping Xiao 已提交
21
    def __init__(self, host, dbname, threads, tables, records, batchSize, tbNamePerfix, outOfOrder):
22
        self.header = {'Authorization': 'Basic cm9vdDp0YW9zZGF0YQ=='}
P
Ping Xiao 已提交
23
        self.url = "http://%s:6041/rest/sql" % host
24
        self.ts = 1500000000000
P
Ping Xiao 已提交
25 26 27 28 29 30 31
        self.dbname = dbname
        self.numOfThreads = threads
        self.numOfTables = tables
        self.recordsPerTable = records
        self.batchSize = batchSize
        self.tableNamePerfix = tbNamePerfix
        self.outOfOrder = outOfOrder
32 33
    
    def createTable(self, threadID):
P
Ping Xiao 已提交
34
        tablesPerThread = int (self.numOfTables / self.numOfThreads)        
35 36 37 38
        print("create table %d to %d" % (tablesPerThread * threadID, tablesPerThread * (threadID + 1) - 1))
        for i in range(tablesPerThread):
            tableID = threadID * tablesPerThread
            name = 'beijing' if tableID % 2 == 0 else 'shanghai'
P
Ping Xiao 已提交
39
            data = "create table %s.%s%d using %s.meters tags(%d, '%s')" % (self.dbname, self.tableNamePerfix, tableID + i, self.dbname, tableID + i, name)
40 41 42 43 44 45 46 47 48
            requests.post(self.url, data, headers = self.header)

    def insertData(self, threadID):        
        print("thread %d started" % threadID)
        tablesPerThread = int (self.numOfTables / self.numOfThreads)        
        for i in range(tablesPerThread):
            tableID = i + threadID * tablesPerThread
            start = self.ts
            for j in range(int(self.recordsPerTable / self.batchSize)):
P
Ping Xiao 已提交
49 50
                data = "insert into %s.%s%d values" % (self.dbname, self.tableNamePerfix, tableID)
                values = []
51
                for k in range(self.batchSize):
P
Ping Xiao 已提交
52 53 54 55 56 57
                    data +=  "(%d, %d, %d, %d)" %  (start + j * self.batchSize + k, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100))                                
                requests.post(self.url, data, headers = self.header)

    def insertUnlimitedData(self, threadID):        
        print("thread %d started" % threadID)
        tablesPerThread = int (self.numOfTables / self.numOfThreads)
58 59
        
        count = 0
P
Ping Xiao 已提交
60 61
        while True:
            i = 0
62 63
            start = self.ts  + count * self.batchSize          
            count = count + 1            
P
Ping Xiao 已提交
64 65 66 67 68 69 70
            
            for i in range(tablesPerThread):
                tableID = i + threadID * tablesPerThread
                                
                data = "insert into %s.%s%d values" % (self.dbname, self.tableNamePerfix, tableID)
                values = []
                for k in range(self.batchSize):
71
                    values.append("(%d, %d, %d, %d)" %  (start + k, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100)))
P
Ping Xiao 已提交
72 73 74 75 76 77 78 79
                
                if(self.outOfOrder == False):
                    for k in range(len(values)):            
                        data += values[k]
                else:
                    random.shuffle(values)
                    for k in range(len(values)):            
                        data += values[k]
80 81 82
                requests.post(self.url, data, headers = self.header)

    def run(self):
P
Ping Xiao 已提交
83
        data = "drop database if exists %s" % self.dbname
84
        requests.post(self.url, data, headers = self.header)
P
Ping Xiao 已提交
85
        data = "create database %s" % self.dbname
86
        requests.post(self.url, data, headers = self.header)
P
Ping Xiao 已提交
87
        data = "create table %s.meters(ts timestamp, f1 int, f2 int, f3 int) tags(id int, loc nchar(20))" % self.dbname
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
        requests.post(self.url, data, headers = self.header)

        threads = []
        startTime = time.time()    
        for i in range(self.numOfThreads):
            thread = threading.Thread(target=self.createTable, args=(i,))
            thread.start()
            threads.append(thread)
        for i in range(self.numOfThreads):
            threads[i].join()
        print("createing %d tables takes %d seconds" % (self.numOfTables, (time.time() - startTime)))

        print("inserting data =======")
        threads = []
        startTime = time.time()
        for i in range(self.numOfThreads):
P
Ping Xiao 已提交
104 105 106 107
            if(self.recordsPerTable != -1):          
                thread = threading.Thread(target=self.insertData, args=(i,))
            else:
                thread = threading.Thread(target=self.insertUnlimitedData, args=(i,))
108 109 110 111 112 113 114
            thread.start()
            threads.append(thread)
        
        for i in range(self.numOfThreads):
            threads[i].join()
        print("inserting %d records takes %d seconds" % (self.numOfTables * self.recordsPerTable, (time.time() - startTime)))

P
Ping Xiao 已提交
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
parser = argparse.ArgumentParser()
parser.add_argument(
    '-H',
    '--host-name',
    action='store',
    default='127.0.0.1',
    type=str,
    help='host name to be connected (default: 127.0.0.1)')
parser.add_argument(
    '-d',
    '--db-name',
    action='store',
    default='test',
    type=str,
    help='Database name to be created (default: test)')
parser.add_argument(
    '-t',
    '--number-of-threads',
    action='store',
    default=10,
    type=int,
    help='Number of threads to create tables and insert datas (default: 10)')
parser.add_argument(
    '-T',
    '--number-of-tables',
    action='store',
    default=1000,
    type=int,
    help='Number of tables to be created (default: 1000)')
parser.add_argument(
    '-r',
    '--number-of-records',
    action='store',
    default=1000,
    type=int,
    help='Number of record to be created for each table  (default: 1000, -1 for unlimited records)')
parser.add_argument(
    '-s',
    '--batch-size',
    action='store',
    default='1000',
    type=int,
    help='Number of tables to be created (default: 1000)')
parser.add_argument(
    '-p',
    '--table-name-prefix',
    action='store',
    default='t',
    type=str,
    help='Number of tables to be created (default: 1000)')
parser.add_argument(
    '-o',
    '--out-of-order',
    action='store_true', 
    help='The order of test data (default: False)')

args = parser.parse_args()
ri = RestfulInsert(args.host_name, args.db_name, args.number_of_threads, args.number_of_tables, args.number_of_records, args.batch_size, args.table_name_prefix, args.out_of_order)
173
ri.run()