restfulInsert.py 10.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-

import requests
import threading
import random
import time
P
Ping Xiao 已提交
18
import argparse
19 20

class RestfulInsert:
L
liuyq-617 已提交
21
    def __init__(self, host, startTimestamp, dbname, threads, tables, records, batchSize, tbNamePerfix, outOfOrder,tablePerbatch):
22
        self.header = {'Authorization': 'Basic cm9vdDp0YW9zZGF0YQ=='}
P
Ping Xiao 已提交
23
        self.url = "http://%s:6041/rest/sql" % host
P
Ping Xiao 已提交
24
        self.ts = startTimestamp
P
Ping Xiao 已提交
25 26 27 28 29 30 31
        self.dbname = dbname
        self.numOfThreads = threads
        self.numOfTables = tables
        self.recordsPerTable = records
        self.batchSize = batchSize
        self.tableNamePerfix = tbNamePerfix
        self.outOfOrder = outOfOrder
L
liuyq-617 已提交
32
        self.tablePerbatch = tablePerbatch
33 34
    
    def createTable(self, threadID):
L
liuyq-617 已提交
35 36 37 38
        tablesPerThread = int (self.numOfTables / self.numOfThreads)
        loop = tablesPerThread if threadID != self.numOfThreads - 1 else self.numOfTables - tablesPerThread * threadID        
        print("create table %d to %d" % (tablesPerThread * threadID, tablesPerThread * threadID + loop - 1))
        for i in range(loop):
39
            tableID = threadID * tablesPerThread
L
liuyq-617 已提交
40
            if tableID + i >= self.numOfTables : break
P
Ping Xiao 已提交
41
            name = 'beijing' if (tableID + i) % 2 == 0 else 'shanghai'
P
Ping Xiao 已提交
42
            data = "create table if not exists %s.%s%d using %s.meters tags(%d, '%s')" % (self.dbname, self.tableNamePerfix, tableID + i, self.dbname, tableID + i, name)
43 44 45 46 47 48
            try:
                response = requests.post(self.url, data, headers = self.header)
                if response.status_code != 200:
                    print(response.content)                
            except Exception as e:
                print(e)
L
liuyq-617 已提交
49 50

    def insertData(self, threadID):        
L
liuyq-617 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
        print("thread %d started" % threadID)
        tablesPerThread = int (self.numOfTables / self.numOfThreads)   
        loop = int(self.recordsPerTable / self.batchSize)   
        if self.tablePerbatch == 1 : 
            for i in range(tablesPerThread+1):            
                tableID = i + threadID * tablesPerThread
                if tableID >= self.numOfTables: return
                start = self.ts
                start1=time.time()
                for k in range(loop):
                    data = "insert into %s.%s%d values" % (self.dbname, self.tableNamePerfix, tableID)
                    values = []
                    bloop = self.batchSize if k != loop - 1 else self.recordsPerTable - self.batchSize * k
                    for l in range(bloop):
                        values.append("(%d, %d, %d, %d)" %  (start + k * self.batchSize + l, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100)))                              
                    if len(data) > 1048576 : 
                        print ('batch size is larger than 1M')
                        exit(-1)
                    if self.outOfOrder :
                        random.shuffle(values)
                    data+=''.join(values)
                    response = requests.post(self.url, data, headers = self.header)
                    if response.status_code != 200:
                        print(response.content)
        else:
            for i in range(0,tablesPerThread+self.tablePerbatch,self.tablePerbatch): 
                for k in range(loop):
                    data = "insert into "
                    for j in range(self.tablePerbatch):
                        tableID = i + threadID * tablesPerThread+j
                        if tableID >= self.numOfTables: return
                        start = self.ts
                        data += "%s.%s%d values" % (self.dbname, self.tableNamePerfix, tableID)
                        values = []
                        bloop = self.batchSize if k != loop - 1 else self.recordsPerTable - self.batchSize * k
                        for l in range(bloop):
                            values.append("(%d, %d, %d, %d)" %  (start + k * self.batchSize + l, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100)))    
                        if self.outOfOrder :
                            random.shuffle(values)
L
liuyq-617 已提交
90
                        data+=''.join(values)                          
L
liuyq-617 已提交
91 92 93
                    if len(data) > 1024*1024 : 
                        print ('batch size is larger than 1M')
                        exit(-1)
94 95 96 97 98 99 100 101 102 103
                    try:
                        startTime = time.time()
                        response = requests.post(self.url, data, headers = self.header)
                        endTime = time.time()
                        if response.status_code != 200:
                            print(response.content)
                        else:
                            print("inserted %d records, %d seconds" % (bloop, endTime - startTime))
                    except Exception as e:
                        print(e)
P
Ping Xiao 已提交
104 105 106 107

    def insertUnlimitedData(self, threadID):        
        print("thread %d started" % threadID)
        tablesPerThread = int (self.numOfTables / self.numOfThreads)
108 109
        
        count = 0
P
Ping Xiao 已提交
110 111
        while True:
            i = 0
112 113
            start = self.ts  + count * self.batchSize          
            count = count + 1            
P
Ping Xiao 已提交
114 115 116 117 118 119 120
            
            for i in range(tablesPerThread):
                tableID = i + threadID * tablesPerThread
                                
                data = "insert into %s.%s%d values" % (self.dbname, self.tableNamePerfix, tableID)
                values = []
                for k in range(self.batchSize):
121
                    values.append("(%d, %d, %d, %d)" %  (start + k, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100)))
P
Ping Xiao 已提交
122 123 124 125 126 127 128
                
                if(self.outOfOrder == False):
                    for k in range(len(values)):            
                        data += values[k]
                else:
                    random.shuffle(values)
                    for k in range(len(values)):            
129 130 131 132 133 134 135 136 137 138 139
                        data += values[k]    
                try:     
                    startTime = time.time()
                    response = requests.post(self.url, data, headers = self.header)
                    endTime = time.time()
                    if response.status_code != 200:
                        print(response.content)
                    else:
                        print("inserted %d records, %d seconds" % (self.batchSize, endTime - startTime))
                except Exception as e:
                    print(e)                    
140

L
liuyq-617 已提交
141
    def run(self):            
P
Ping Xiao 已提交
142
        data = "create database if not exists %s" % self.dbname
143
        requests.post(self.url, data, headers = self.header)
P
Ping Xiao 已提交
144
        data = "create table if not exists %s.meters(ts timestamp, f1 int, f2 int, f3 int) tags(id int, loc nchar(20))" % self.dbname
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
        requests.post(self.url, data, headers = self.header)

        threads = []
        startTime = time.time()    
        for i in range(self.numOfThreads):
            thread = threading.Thread(target=self.createTable, args=(i,))
            thread.start()
            threads.append(thread)
        for i in range(self.numOfThreads):
            threads[i].join()
        print("createing %d tables takes %d seconds" % (self.numOfTables, (time.time() - startTime)))

        print("inserting data =======")
        threads = []
        startTime = time.time()
        for i in range(self.numOfThreads):
P
Ping Xiao 已提交
161 162 163 164
            if(self.recordsPerTable != -1):          
                thread = threading.Thread(target=self.insertData, args=(i,))
            else:
                thread = threading.Thread(target=self.insertUnlimitedData, args=(i,))
165 166 167 168 169
            thread.start()
            threads.append(thread)
        
        for i in range(self.numOfThreads):
            threads[i].join()
L
liuyq-617 已提交
170
        print("inserting %s records takes %d seconds" % (self.numOfTables * self.recordsPerTable, (time.time() - startTime)))
171

P
Ping Xiao 已提交
172 173 174 175 176 177 178 179
parser = argparse.ArgumentParser()
parser.add_argument(
    '-H',
    '--host-name',
    action='store',
    default='127.0.0.1',
    type=str,
    help='host name to be connected (default: 127.0.0.1)')
P
Ping Xiao 已提交
180 181 182 183 184 185 186
parser.add_argument(
    '-S',
    '--start-timestamp',
    action='store',
    default=1500000000000,
    type=int,
    help='insert data from timestamp (default: 1500000000000)')
P
Ping Xiao 已提交
187 188 189 190
parser.add_argument(
    '-d',
    '--db-name',
    action='store',
L
liuyq-617 已提交
191
    default='test',
P
Ping Xiao 已提交
192 193 194 195 196 197
    type=str,
    help='Database name to be created (default: test)')
parser.add_argument(
    '-t',
    '--number-of-threads',
    action='store',
L
liuyq-617 已提交
198
    default=10,
P
Ping Xiao 已提交
199 200 201 202 203 204
    type=int,
    help='Number of threads to create tables and insert datas (default: 10)')
parser.add_argument(
    '-T',
    '--number-of-tables',
    action='store',
L
liuyq-617 已提交
205
    default=10000,
P
Ping Xiao 已提交
206 207 208 209 210 211
    type=int,
    help='Number of tables to be created (default: 1000)')
parser.add_argument(
    '-r',
    '--number-of-records',
    action='store',
L
liuyq-617 已提交
212
    default=10000,
P
Ping Xiao 已提交
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
    type=int,
    help='Number of record to be created for each table  (default: 1000, -1 for unlimited records)')
parser.add_argument(
    '-s',
    '--batch-size',
    action='store',
    default='1000',
    type=int,
    help='Number of tables to be created (default: 1000)')
parser.add_argument(
    '-p',
    '--table-name-prefix',
    action='store',
    default='t',
    type=str,
    help='Number of tables to be created (default: 1000)')
parser.add_argument(
    '-o',
    '--out-of-order',
    action='store_true', 
    help='The order of test data (default: False)')
L
liuyq-617 已提交
234 235 236 237 238 239 240 241 242
parser.add_argument(
    '-b',
    '--table-per-batch',
    action='store', 
    default=1,
    type=int,
    help='the table per batch (default: 1)')


P
Ping Xiao 已提交
243 244

args = parser.parse_args()
L
liuyq-617 已提交
245 246 247
ri = RestfulInsert(
        args.host_name, args.start_timestamp, args.db_name, args.number_of_threads, args.number_of_tables, 
        args.number_of_records, args.batch_size, args.table_name_prefix, args.out_of_order, args.table_per_batch)
248
ri.run()