restfulInsert.py 10.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-

import requests
import threading
import random
import time
P
Ping Xiao 已提交
18
import argparse
19 20

class RestfulInsert:
L
liuyq-617 已提交
21
    def __init__(self, host, startTimestamp, dbname, threads, tables, records, batchSize, tbNamePerfix, outOfOrder,tablePerbatch):
22
        self.header = {'Authorization': 'Basic cm9vdDp0YW9zZGF0YQ=='}
P
Ping Xiao 已提交
23
        self.url = "http://%s:6041/rest/sql" % host
P
Ping Xiao 已提交
24
        self.ts = startTimestamp
P
Ping Xiao 已提交
25 26 27 28 29 30 31
        self.dbname = dbname
        self.numOfThreads = threads
        self.numOfTables = tables
        self.recordsPerTable = records
        self.batchSize = batchSize
        self.tableNamePerfix = tbNamePerfix
        self.outOfOrder = outOfOrder
L
liuyq-617 已提交
32
        self.tablePerbatch = tablePerbatch
33 34
    
    def createTable(self, threadID):
L
liuyq-617 已提交
35 36 37 38
        tablesPerThread = int (self.numOfTables / self.numOfThreads)
        loop = tablesPerThread if threadID != self.numOfThreads - 1 else self.numOfTables - tablesPerThread * threadID        
        print("create table %d to %d" % (tablesPerThread * threadID, tablesPerThread * threadID + loop - 1))
        for i in range(loop):
39
            tableID = threadID * tablesPerThread
L
liuyq-617 已提交
40
            if tableID + i >= self.numOfTables : break
41
            name = 'beijing' if tableID % 2 == 0 else 'shanghai'
P
Ping Xiao 已提交
42 43 44 45
            data = "create table if not exists %s.%s%d using %s.meters tags(%d, '%s')" % (self.dbname, self.tableNamePerfix, tableID + i, self.dbname, tableID + i, name)
            response = requests.post(self.url, data, headers = self.header)
            if response.status_code != 200:
                    print(response.content)
46 47 48 49 50 51 52 53

    def insertData(self, threadID):        
        print("thread %d started" % threadID)
        tablesPerThread = int (self.numOfTables / self.numOfThreads)        
        for i in range(tablesPerThread):
            tableID = i + threadID * tablesPerThread
            start = self.ts
            for j in range(int(self.recordsPerTable / self.batchSize)):
P
Ping Xiao 已提交
54 55
                data = "insert into %s.%s%d values" % (self.dbname, self.tableNamePerfix, tableID)
                values = []
56
                for k in range(self.batchSize):
P
Ping Xiao 已提交
57
                    data +=  "(%d, %d, %d, %d)" %  (start + j * self.batchSize + k, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100))                                
P
Ping Xiao 已提交
58 59 60
                response = requests.post(self.url, data, headers = self.header)
                if response.status_code != 200:
                    print(response.content)
L
liuyq-617 已提交
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
   
    def insertnData(self, threadID):        
        print("thread %d started" % threadID)
        tablesPerThread = int (self.numOfTables / self.numOfThreads)   
        loop = int(self.recordsPerTable / self.batchSize)   
        if self.tablePerbatch == 1 : 
            for i in range(tablesPerThread+1):            
                tableID = i + threadID * tablesPerThread
                if tableID >= self.numOfTables: return
                start = self.ts
                start1=time.time()
                for k in range(loop):
                    data = "insert into %s.%s%d values" % (self.dbname, self.tableNamePerfix, tableID)
                    values = []
                    bloop = self.batchSize if k != loop - 1 else self.recordsPerTable - self.batchSize * k
                    for l in range(bloop):
                        values.append("(%d, %d, %d, %d)" %  (start + k * self.batchSize + l, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100)))                              
                    if len(data) > 1048576 : 
                        print ('batch size is larger than 1M')
                        exit(-1)
                    if self.outOfOrder :
                        random.shuffle(values)
                    data+=''.join(values)
                    
                    response = requests.post(self.url, data, headers = self.header)
                    if response.status_code != 200:
                        print(response.content)
                print('----------------',loop,time.time()-start1)
        else:
            for i in range(0,tablesPerThread+self.tablePerbatch,self.tablePerbatch): 
                for k in range(loop):
                    data = "insert into "
                    for j in range(self.tablePerbatch):
                        tableID = i + threadID * tablesPerThread+j
                        if tableID >= self.numOfTables: return
                        start = self.ts
                        data += "%s.%s%d values" % (self.dbname, self.tableNamePerfix, tableID)
                        values = []
                        bloop = self.batchSize if k != loop - 1 else self.recordsPerTable - self.batchSize * k
                        for l in range(bloop):
                            values.append("(%d, %d, %d, %d)" %  (start + k * self.batchSize + l, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100)))    
                        if self.outOfOrder :
                            random.shuffle(values)
                        data+=''.join(values)  
                    print('------------------',len(data))                        
                    if len(data) > 1024*1024 : 
                        print ('batch size is larger than 1M')
                        exit(-1)
                    response = requests.post(self.url, data, headers = self.header)
                    if response.status_code != 200:
                        print(response.content)

P
Ping Xiao 已提交
113 114 115 116

    def insertUnlimitedData(self, threadID):        
        print("thread %d started" % threadID)
        tablesPerThread = int (self.numOfTables / self.numOfThreads)
117 118
        
        count = 0
P
Ping Xiao 已提交
119 120
        while True:
            i = 0
121 122
            start = self.ts  + count * self.batchSize          
            count = count + 1            
P
Ping Xiao 已提交
123 124 125 126 127 128 129
            
            for i in range(tablesPerThread):
                tableID = i + threadID * tablesPerThread
                                
                data = "insert into %s.%s%d values" % (self.dbname, self.tableNamePerfix, tableID)
                values = []
                for k in range(self.batchSize):
130
                    values.append("(%d, %d, %d, %d)" %  (start + k, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100)))
P
Ping Xiao 已提交
131 132 133 134 135 136 137
                
                if(self.outOfOrder == False):
                    for k in range(len(values)):            
                        data += values[k]
                else:
                    random.shuffle(values)
                    for k in range(len(values)):            
P
Ping Xiao 已提交
138 139 140 141
                        data += values[k]                
                response = requests.post(self.url, data, headers = self.header)
                if response.status_code != 200:
                    print(response.content)
142

L
liuyq-617 已提交
143
    def run(self):            
P
Ping Xiao 已提交
144
        data = "create database if not exists %s" % self.dbname
145
        requests.post(self.url, data, headers = self.header)
P
Ping Xiao 已提交
146
        data = "create table if not exists %s.meters(ts timestamp, f1 int, f2 int, f3 int) tags(id int, loc nchar(20))" % self.dbname
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
        requests.post(self.url, data, headers = self.header)

        threads = []
        startTime = time.time()    
        for i in range(self.numOfThreads):
            thread = threading.Thread(target=self.createTable, args=(i,))
            thread.start()
            threads.append(thread)
        for i in range(self.numOfThreads):
            threads[i].join()
        print("createing %d tables takes %d seconds" % (self.numOfTables, (time.time() - startTime)))

        print("inserting data =======")
        threads = []
        startTime = time.time()
        for i in range(self.numOfThreads):
P
Ping Xiao 已提交
163 164 165 166
            if(self.recordsPerTable != -1):          
                thread = threading.Thread(target=self.insertData, args=(i,))
            else:
                thread = threading.Thread(target=self.insertUnlimitedData, args=(i,))
167 168 169 170 171 172 173
            thread.start()
            threads.append(thread)
        
        for i in range(self.numOfThreads):
            threads[i].join()
        print("inserting %d records takes %d seconds" % (self.numOfTables * self.recordsPerTable, (time.time() - startTime)))

P
Ping Xiao 已提交
174 175 176 177 178 179 180 181
parser = argparse.ArgumentParser()
parser.add_argument(
    '-H',
    '--host-name',
    action='store',
    default='127.0.0.1',
    type=str,
    help='host name to be connected (default: 127.0.0.1)')
P
Ping Xiao 已提交
182 183 184 185 186 187 188
parser.add_argument(
    '-S',
    '--start-timestamp',
    action='store',
    default=1500000000000,
    type=int,
    help='insert data from timestamp (default: 1500000000000)')
P
Ping Xiao 已提交
189 190 191 192
parser.add_argument(
    '-d',
    '--db-name',
    action='store',
L
liuyq-617 已提交
193
    default='test',
P
Ping Xiao 已提交
194 195 196 197 198 199
    type=str,
    help='Database name to be created (default: test)')
parser.add_argument(
    '-t',
    '--number-of-threads',
    action='store',
L
liuyq-617 已提交
200
    default=10,
P
Ping Xiao 已提交
201 202 203 204 205 206 207 208 209 210 211 212 213
    type=int,
    help='Number of threads to create tables and insert datas (default: 10)')
parser.add_argument(
    '-T',
    '--number-of-tables',
    action='store',
    default=1000,
    type=int,
    help='Number of tables to be created (default: 1000)')
parser.add_argument(
    '-r',
    '--number-of-records',
    action='store',
L
liuyq-617 已提交
214
    default=1000,
P
Ping Xiao 已提交
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
    type=int,
    help='Number of record to be created for each table  (default: 1000, -1 for unlimited records)')
parser.add_argument(
    '-s',
    '--batch-size',
    action='store',
    default='1000',
    type=int,
    help='Number of tables to be created (default: 1000)')
parser.add_argument(
    '-p',
    '--table-name-prefix',
    action='store',
    default='t',
    type=str,
    help='Number of tables to be created (default: 1000)')
parser.add_argument(
    '-o',
    '--out-of-order',
    action='store_true', 
    help='The order of test data (default: False)')
L
liuyq-617 已提交
236 237 238 239 240 241 242 243 244
parser.add_argument(
    '-b',
    '--table-per-batch',
    action='store', 
    default=1,
    type=int,
    help='the table per batch (default: 1)')


P
Ping Xiao 已提交
245 246

args = parser.parse_args()
L
liuyq-617 已提交
247 248 249
ri = RestfulInsert(
        args.host_name, args.start_timestamp, args.db_name, args.number_of_threads, args.number_of_tables, 
        args.number_of_records, args.batch_size, args.table_name_prefix, args.out_of_order, args.table_per_batch)
250
ri.run()