restfulInsert.py 7.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-

import requests
import threading
import random
import time
P
Ping Xiao 已提交
18
import argparse
19 20

class RestfulInsert:
P
Ping Xiao 已提交
21
    def __init__(self, host, startTimestamp, dbname, threads, tables, records, batchSize, tbNamePerfix, outOfOrder):
22
        self.header = {'Authorization': 'Basic cm9vdDp0YW9zZGF0YQ=='}
P
Ping Xiao 已提交
23
        self.url = "http://%s:6041/rest/sql" % host
P
Ping Xiao 已提交
24
        self.ts = startTimestamp
P
Ping Xiao 已提交
25 26 27 28 29 30 31
        self.dbname = dbname
        self.numOfThreads = threads
        self.numOfTables = tables
        self.recordsPerTable = records
        self.batchSize = batchSize
        self.tableNamePerfix = tbNamePerfix
        self.outOfOrder = outOfOrder
32 33
    
    def createTable(self, threadID):
P
Ping Xiao 已提交
34
        tablesPerThread = int (self.numOfTables / self.numOfThreads)        
35 36 37 38
        print("create table %d to %d" % (tablesPerThread * threadID, tablesPerThread * (threadID + 1) - 1))
        for i in range(tablesPerThread):
            tableID = threadID * tablesPerThread
            name = 'beijing' if tableID % 2 == 0 else 'shanghai'
P
Ping Xiao 已提交
39 40 41 42
            data = "create table if not exists %s.%s%d using %s.meters tags(%d, '%s')" % (self.dbname, self.tableNamePerfix, tableID + i, self.dbname, tableID + i, name)
            response = requests.post(self.url, data, headers = self.header)
            if response.status_code != 200:
                    print(response.content)
43 44 45 46 47 48 49 50

    def insertData(self, threadID):        
        print("thread %d started" % threadID)
        tablesPerThread = int (self.numOfTables / self.numOfThreads)        
        for i in range(tablesPerThread):
            tableID = i + threadID * tablesPerThread
            start = self.ts
            for j in range(int(self.recordsPerTable / self.batchSize)):
P
Ping Xiao 已提交
51 52
                data = "insert into %s.%s%d values" % (self.dbname, self.tableNamePerfix, tableID)
                values = []
53
                for k in range(self.batchSize):
P
Ping Xiao 已提交
54
                    data +=  "(%d, %d, %d, %d)" %  (start + j * self.batchSize + k, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100))                                
P
Ping Xiao 已提交
55 56 57
                response = requests.post(self.url, data, headers = self.header)
                if response.status_code != 200:
                    print(response.content)
P
Ping Xiao 已提交
58 59 60 61

    def insertUnlimitedData(self, threadID):        
        print("thread %d started" % threadID)
        tablesPerThread = int (self.numOfTables / self.numOfThreads)
62 63
        
        count = 0
P
Ping Xiao 已提交
64 65
        while True:
            i = 0
66 67
            start = self.ts  + count * self.batchSize          
            count = count + 1            
P
Ping Xiao 已提交
68 69 70 71 72 73 74
            
            for i in range(tablesPerThread):
                tableID = i + threadID * tablesPerThread
                                
                data = "insert into %s.%s%d values" % (self.dbname, self.tableNamePerfix, tableID)
                values = []
                for k in range(self.batchSize):
75
                    values.append("(%d, %d, %d, %d)" %  (start + k, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100)))
P
Ping Xiao 已提交
76 77 78 79 80 81 82
                
                if(self.outOfOrder == False):
                    for k in range(len(values)):            
                        data += values[k]
                else:
                    random.shuffle(values)
                    for k in range(len(values)):            
P
Ping Xiao 已提交
83 84 85 86
                        data += values[k]                
                response = requests.post(self.url, data, headers = self.header)
                if response.status_code != 200:
                    print(response.content)
87

P
Ping Xiao 已提交
88 89
    def run(self):                
        data = "create database if not exists %s" % self.dbname
90
        requests.post(self.url, data, headers = self.header)
P
Ping Xiao 已提交
91
        data = "create table if not exists %s.meters(ts timestamp, f1 int, f2 int, f3 int) tags(id int, loc nchar(20))" % self.dbname
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
        requests.post(self.url, data, headers = self.header)

        threads = []
        startTime = time.time()    
        for i in range(self.numOfThreads):
            thread = threading.Thread(target=self.createTable, args=(i,))
            thread.start()
            threads.append(thread)
        for i in range(self.numOfThreads):
            threads[i].join()
        print("createing %d tables takes %d seconds" % (self.numOfTables, (time.time() - startTime)))

        print("inserting data =======")
        threads = []
        startTime = time.time()
        for i in range(self.numOfThreads):
P
Ping Xiao 已提交
108 109 110 111
            if(self.recordsPerTable != -1):          
                thread = threading.Thread(target=self.insertData, args=(i,))
            else:
                thread = threading.Thread(target=self.insertUnlimitedData, args=(i,))
112 113 114 115 116 117 118
            thread.start()
            threads.append(thread)
        
        for i in range(self.numOfThreads):
            threads[i].join()
        print("inserting %d records takes %d seconds" % (self.numOfTables * self.recordsPerTable, (time.time() - startTime)))

P
Ping Xiao 已提交
119 120 121 122 123 124 125 126
parser = argparse.ArgumentParser()
parser.add_argument(
    '-H',
    '--host-name',
    action='store',
    default='127.0.0.1',
    type=str,
    help='host name to be connected (default: 127.0.0.1)')
P
Ping Xiao 已提交
127 128 129 130 131 132 133
parser.add_argument(
    '-S',
    '--start-timestamp',
    action='store',
    default=1500000000000,
    type=int,
    help='insert data from timestamp (default: 1500000000000)')
P
Ping Xiao 已提交
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
parser.add_argument(
    '-d',
    '--db-name',
    action='store',
    default='test',
    type=str,
    help='Database name to be created (default: test)')
parser.add_argument(
    '-t',
    '--number-of-threads',
    action='store',
    default=10,
    type=int,
    help='Number of threads to create tables and insert datas (default: 10)')
parser.add_argument(
    '-T',
    '--number-of-tables',
    action='store',
    default=1000,
    type=int,
    help='Number of tables to be created (default: 1000)')
parser.add_argument(
    '-r',
    '--number-of-records',
    action='store',
    default=1000,
    type=int,
    help='Number of record to be created for each table  (default: 1000, -1 for unlimited records)')
parser.add_argument(
    '-s',
    '--batch-size',
    action='store',
    default='1000',
    type=int,
    help='Number of tables to be created (default: 1000)')
parser.add_argument(
    '-p',
    '--table-name-prefix',
    action='store',
    default='t',
    type=str,
    help='Number of tables to be created (default: 1000)')
parser.add_argument(
    '-o',
    '--out-of-order',
    action='store_true', 
    help='The order of test data (default: False)')

args = parser.parse_args()
P
Ping Xiao 已提交
183
ri = RestfulInsert(args.host_name, args.start_timestamp, args.db_name, args.number_of_threads, args.number_of_tables, args.number_of_records, args.batch_size, args.table_name_prefix, args.out_of_order)
184
ri.run()