restfulInsert.py 9.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-

import requests
import threading
import random
import time
P
Ping Xiao 已提交
18
import argparse
19 20

class RestfulInsert:
L
liuyq-617 已提交
21
    def __init__(self, host, startTimestamp, dbname, threads, tables, records, batchSize, tbNamePerfix, outOfOrder,tablePerbatch):
22
        self.header = {'Authorization': 'Basic cm9vdDp0YW9zZGF0YQ=='}
P
Ping Xiao 已提交
23
        self.url = "http://%s:6041/rest/sql" % host
P
Ping Xiao 已提交
24
        self.ts = startTimestamp
P
Ping Xiao 已提交
25 26 27 28 29 30 31
        self.dbname = dbname
        self.numOfThreads = threads
        self.numOfTables = tables
        self.recordsPerTable = records
        self.batchSize = batchSize
        self.tableNamePerfix = tbNamePerfix
        self.outOfOrder = outOfOrder
L
liuyq-617 已提交
32
        self.tablePerbatch = tablePerbatch
33 34
    
    def createTable(self, threadID):
L
liuyq-617 已提交
35 36 37 38
        tablesPerThread = int (self.numOfTables / self.numOfThreads)
        loop = tablesPerThread if threadID != self.numOfThreads - 1 else self.numOfTables - tablesPerThread * threadID        
        print("create table %d to %d" % (tablesPerThread * threadID, tablesPerThread * threadID + loop - 1))
        for i in range(loop):
39
            tableID = threadID * tablesPerThread
L
liuyq-617 已提交
40
            if tableID + i >= self.numOfTables : break
41
            name = 'beijing' if tableID % 2 == 0 else 'shanghai'
P
Ping Xiao 已提交
42 43 44 45
            data = "create table if not exists %s.%s%d using %s.meters tags(%d, '%s')" % (self.dbname, self.tableNamePerfix, tableID + i, self.dbname, tableID + i, name)
            response = requests.post(self.url, data, headers = self.header)
            if response.status_code != 200:
                    print(response.content)
46

L
liuyq-617 已提交
47

L
liuyq-617 已提交
48
   
L
liuyq-617 已提交
49
    def insertData(self, threadID):        
L
liuyq-617 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
        print("thread %d started" % threadID)
        tablesPerThread = int (self.numOfTables / self.numOfThreads)   
        loop = int(self.recordsPerTable / self.batchSize)   
        if self.tablePerbatch == 1 : 
            for i in range(tablesPerThread+1):            
                tableID = i + threadID * tablesPerThread
                if tableID >= self.numOfTables: return
                start = self.ts
                start1=time.time()
                for k in range(loop):
                    data = "insert into %s.%s%d values" % (self.dbname, self.tableNamePerfix, tableID)
                    values = []
                    bloop = self.batchSize if k != loop - 1 else self.recordsPerTable - self.batchSize * k
                    for l in range(bloop):
                        values.append("(%d, %d, %d, %d)" %  (start + k * self.batchSize + l, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100)))                              
                    if len(data) > 1048576 : 
                        print ('batch size is larger than 1M')
                        exit(-1)
                    if self.outOfOrder :
                        random.shuffle(values)
                    data+=''.join(values)
                    response = requests.post(self.url, data, headers = self.header)
                    if response.status_code != 200:
                        print(response.content)
        else:
            for i in range(0,tablesPerThread+self.tablePerbatch,self.tablePerbatch): 
                for k in range(loop):
                    data = "insert into "
                    for j in range(self.tablePerbatch):
                        tableID = i + threadID * tablesPerThread+j
                        if tableID >= self.numOfTables: return
                        start = self.ts
                        data += "%s.%s%d values" % (self.dbname, self.tableNamePerfix, tableID)
                        values = []
                        bloop = self.batchSize if k != loop - 1 else self.recordsPerTable - self.batchSize * k
                        for l in range(bloop):
                            values.append("(%d, %d, %d, %d)" %  (start + k * self.batchSize + l, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100)))    
                        if self.outOfOrder :
                            random.shuffle(values)
L
liuyq-617 已提交
89
                        data+=''.join(values)                          
L
liuyq-617 已提交
90 91 92 93 94 95 96
                    if len(data) > 1024*1024 : 
                        print ('batch size is larger than 1M')
                        exit(-1)
                    response = requests.post(self.url, data, headers = self.header)
                    if response.status_code != 200:
                        print(response.content)

P
Ping Xiao 已提交
97 98 99 100

    def insertUnlimitedData(self, threadID):        
        print("thread %d started" % threadID)
        tablesPerThread = int (self.numOfTables / self.numOfThreads)
101 102
        
        count = 0
P
Ping Xiao 已提交
103 104
        while True:
            i = 0
105 106
            start = self.ts  + count * self.batchSize          
            count = count + 1            
P
Ping Xiao 已提交
107 108 109 110 111 112 113
            
            for i in range(tablesPerThread):
                tableID = i + threadID * tablesPerThread
                                
                data = "insert into %s.%s%d values" % (self.dbname, self.tableNamePerfix, tableID)
                values = []
                for k in range(self.batchSize):
114
                    values.append("(%d, %d, %d, %d)" %  (start + k, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100)))
P
Ping Xiao 已提交
115 116 117 118 119 120 121
                
                if(self.outOfOrder == False):
                    for k in range(len(values)):            
                        data += values[k]
                else:
                    random.shuffle(values)
                    for k in range(len(values)):            
P
Ping Xiao 已提交
122 123 124 125
                        data += values[k]                
                response = requests.post(self.url, data, headers = self.header)
                if response.status_code != 200:
                    print(response.content)
126

L
liuyq-617 已提交
127
    def run(self):            
P
Ping Xiao 已提交
128
        data = "create database if not exists %s" % self.dbname
129
        requests.post(self.url, data, headers = self.header)
P
Ping Xiao 已提交
130
        data = "create table if not exists %s.meters(ts timestamp, f1 int, f2 int, f3 int) tags(id int, loc nchar(20))" % self.dbname
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
        requests.post(self.url, data, headers = self.header)

        threads = []
        startTime = time.time()    
        for i in range(self.numOfThreads):
            thread = threading.Thread(target=self.createTable, args=(i,))
            thread.start()
            threads.append(thread)
        for i in range(self.numOfThreads):
            threads[i].join()
        print("createing %d tables takes %d seconds" % (self.numOfTables, (time.time() - startTime)))

        print("inserting data =======")
        threads = []
        startTime = time.time()
        for i in range(self.numOfThreads):
P
Ping Xiao 已提交
147 148 149 150
            if(self.recordsPerTable != -1):          
                thread = threading.Thread(target=self.insertData, args=(i,))
            else:
                thread = threading.Thread(target=self.insertUnlimitedData, args=(i,))
151 152 153 154 155
            thread.start()
            threads.append(thread)
        
        for i in range(self.numOfThreads):
            threads[i].join()
L
liuyq-617 已提交
156
        print("inserting %s records takes %d seconds" % (self.numOfTables * self.recordsPerTable, (time.time() - startTime)))
157

P
Ping Xiao 已提交
158 159 160 161 162 163 164 165
parser = argparse.ArgumentParser()
parser.add_argument(
    '-H',
    '--host-name',
    action='store',
    default='127.0.0.1',
    type=str,
    help='host name to be connected (default: 127.0.0.1)')
P
Ping Xiao 已提交
166 167 168 169 170 171 172
parser.add_argument(
    '-S',
    '--start-timestamp',
    action='store',
    default=1500000000000,
    type=int,
    help='insert data from timestamp (default: 1500000000000)')
P
Ping Xiao 已提交
173 174 175 176
parser.add_argument(
    '-d',
    '--db-name',
    action='store',
L
liuyq-617 已提交
177
    default='test',
P
Ping Xiao 已提交
178 179 180 181 182 183
    type=str,
    help='Database name to be created (default: test)')
parser.add_argument(
    '-t',
    '--number-of-threads',
    action='store',
L
liuyq-617 已提交
184
    default=10,
P
Ping Xiao 已提交
185 186 187 188 189 190
    type=int,
    help='Number of threads to create tables and insert datas (default: 10)')
parser.add_argument(
    '-T',
    '--number-of-tables',
    action='store',
L
liuyq-617 已提交
191
    default=10000,
P
Ping Xiao 已提交
192 193 194 195 196 197
    type=int,
    help='Number of tables to be created (default: 1000)')
parser.add_argument(
    '-r',
    '--number-of-records',
    action='store',
L
liuyq-617 已提交
198
    default=10000,
P
Ping Xiao 已提交
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
    type=int,
    help='Number of record to be created for each table  (default: 1000, -1 for unlimited records)')
parser.add_argument(
    '-s',
    '--batch-size',
    action='store',
    default='1000',
    type=int,
    help='Number of tables to be created (default: 1000)')
parser.add_argument(
    '-p',
    '--table-name-prefix',
    action='store',
    default='t',
    type=str,
    help='Number of tables to be created (default: 1000)')
parser.add_argument(
    '-o',
    '--out-of-order',
    action='store_true', 
    help='The order of test data (default: False)')
L
liuyq-617 已提交
220 221 222 223 224 225 226 227 228
parser.add_argument(
    '-b',
    '--table-per-batch',
    action='store', 
    default=1,
    type=int,
    help='the table per batch (default: 1)')


P
Ping Xiao 已提交
229 230

args = parser.parse_args()
L
liuyq-617 已提交
231 232 233
ri = RestfulInsert(
        args.host_name, args.start_timestamp, args.db_name, args.number_of_threads, args.number_of_tables, 
        args.number_of_records, args.batch_size, args.table_name_prefix, args.out_of_order, args.table_per_batch)
234
ri.run()