提交 a966bb11 编写于 作者: J jiajingbin

add tools/schemalessInsertPerformance.py

上级 cf1d1cf9
...@@ -11,11 +11,8 @@ ...@@ -11,11 +11,8 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import traceback
import random import random
import string
from taos.error import LinesError from taos.error import LinesError
import datetime
import time import time
from copy import deepcopy from copy import deepcopy
import numpy as np import numpy as np
...@@ -24,34 +21,21 @@ from util.cases import * ...@@ -24,34 +21,21 @@ from util.cases import *
from util.sql import * from util.sql import *
from util.common import tdCom from util.common import tdCom
import threading import threading
import itertools
class TDTestCase: class TDTestCase:
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__) tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql) tdSql.init(conn.cursor(), logSql)
self._conn = conn self._conn = conn
self.lock = threading.Lock()
def genRandomTs(self):
year = random.randint(2000, 2021)
month = random.randint(10, 12)
day = random.randint(10, 29)
hour = random.randint(10, 24)
minute = random.randint(10, 59)
second = random.randint(10, 59)
m_second = random.randint(101, 199)
date_time = f'{year}-{month}-{day} {hour}:{minute}:{second}'
print(date_time)
timeArray = time.strptime(date_time, "%Y-%m-%d %H:%M:%S")
ts = int(time.mktime(timeArray))
print("------", ts)
# timestamp = time.mktime(datetime.datetime.strptime(date_time, "%Y-%m-%d %H:%M:%S.%f").timetuple())
return f'{ts}s'
def genMultiColStr(self, int_count=4, double_count=0, binary_count=0): def genMultiColStr(self, int_count=4, double_count=0, binary_count=0):
""" '''
genType must be tag/col related to self.getPerfSql()
""" :count = 4 ---> 4 int
:count = 1000 ---> 400 int 400 double 200 binary(128)
:count = 4000 ---> 1900 int 1900 double 200 binary(128)
'''
col_str = "" col_str = ""
if double_count == 0 and binary_count == 0: if double_count == 0 and binary_count == 0:
for i in range(0, int_count): for i in range(0, int_count):
...@@ -88,15 +72,23 @@ class TDTestCase: ...@@ -88,15 +72,23 @@ class TDTestCase:
return col_str return col_str
def genLongSql(self, int_count=4, double_count=0, binary_count=0, init=False): def genLongSql(self, int_count=4, double_count=0, binary_count=0, init=False):
'''
:init ---> stb insert line
'''
if init: if init:
tag_str = f'id="init",t0={random.randint(0, 65535)}i32,t1=\"{tdCom.getLongName(10, "letters")}\"' tag_str = f'id="init",t0={random.randint(0, 65535)}i32,t1=\"{tdCom.getLongName(10, "letters")}\"'
else: else:
tag_str = f'id="sub_{tdCom.getLongName(5, "letters")}_{tdCom.getLongName(5, "letters")}",t0={random.randint(0, 65535)}i32,t1=\"{tdCom.getLongName(10, "letters")}\"' tag_str = f'id="sub_{tdCom.getLongName(5, "letters")}_{tdCom.getLongName(5, "letters")}",t0={random.randint(0, 65535)}i32,t1=\"{tdCom.getLongName(10, "letters")}\"'
col_str = self.genMultiColStr(int_count, double_count, binary_count) col_str = self.genMultiColStr(int_count=int_count, double_count=double_count, binary_count=binary_count)
long_sql = 'stb' + ',' + tag_str + ' ' + col_str + '0' long_sql = 'stb' + ',' + tag_str + ' ' + col_str + '0'
return long_sql return long_sql
def getPerfSql(self, count=4, init=False): def getPerfSql(self, count=4, init=False):
'''
:count = 4 ---> 4 int
:count = 1000 ---> 400 int 400 double 200 binary(128)
:count = 4000 ---> 1900 int 1900 double 200 binary(128)
'''
if count == 4: if count == 4:
input_sql = self.genLongSql(init=init) input_sql = self.genLongSql(init=init)
elif count == 1000: elif count == 1000:
...@@ -105,193 +97,165 @@ class TDTestCase: ...@@ -105,193 +97,165 @@ class TDTestCase:
input_sql = self.genLongSql(1900, 1900, 200, init=init) input_sql = self.genLongSql(1900, 1900, 200, init=init)
return input_sql return input_sql
def tableGenerator(self, count=4, table_count=1000):
for i in range(table_count):
yield self.getPerfSql(count)
def genTableList(self, count=4, table_count=10000):
table_list = list()
for i in range(1, table_count+1):
table_list.append(self.getPerfSql(count))
return table_list
def splitTableList(self, count=4, thread_count=10, table_count=1000):
per_list_len = int(table_count/thread_count)
table_list = self.genTableList(count=count)
# ts = int(time.time())
list_of_group = zip(*(iter(table_list),) *per_list_len)
end_list = [list(i) for i in list_of_group] # i is a tuple
count = len(table_list) % per_list_len
end_list.append(table_list[-count:]) if count !=0 else end_list
return table_list, end_list
def rowsGenerator(self, end_list):
ts = int(time.time())
input_sql_list = list()
for elm_list in end_list:
for elm in elm_list:
for i in range(1, 10000):
ts -= 1
elm_new = self.replaceLastStr(elm, str(ts)) + 's'
input_sql_list.append(elm_new)
yield input_sql_list
# def insertRows(self, count=4, thread_count=10):
# table_list = self.splitTableList(count=count, thread_count=thread_count)[0]
# for
def replaceLastStr(self, str, new): def replaceLastStr(self, str, new):
'''
replace last element of str to new element
'''
list_ori = list(str) list_ori = list(str)
list_ori[-1] = new list_ori[-1] = new
return ''.join(list_ori) return ''.join(list_ori)
def genDataList(self, table_list, row_count=10):
data_list = list()
ts = int(time.time())
for table_str in table_list:
for i in range(1, row_count+1):
ts -= 1
table_str_new = self.replaceLastStr(table_str, f'{str(ts)}s')
data_list.append(table_str_new)
print(data_list)
return data_list
def createStb(self, count=4):
'''
create 1 stb
'''
input_sql = self.getPerfSql(count=count, init=True)
self._conn.insert_lines([input_sql])
def insertRows(self, count=4, table_count=1000): def batchCreateTable(self, batch_list):
table_generator = self.tableGenerator(count=count, table_count=table_count) '''
for table_name in table_generator: schemaless insert api
pass '''
self._conn.insert_lines(batch_list)
def perfTableInsert(self): # print(threading.current_thread().name, "length=", len(batch_list))
table_generator = self.tableGenerator() # print(threading.current_thread().name, 'firstline', batch_list[0])
for input_sql in table_generator: # print(threading.current_thread().name, 'lastline:', batch_list[-1])
self._conn.insert_lines([input_sql]) # print(threading.current_thread().name, 'end')
# for i in range(10):
# self._conn.insert_lines([input_sql]) def splitGenerator(self, table_list, sub_list_len):
'''
def perfDataInsert(self, count=4): split a list to n piece of sub_list
table_generator = self.tableGenerator(count=count) [a, b, c, d] ---> [[a, b], [c, d]]
ts = int(time.time()) yield type ---> generator
for input_sql in table_generator: '''
print("input_sql-----------", input_sql) for i in range(0, len(table_list), sub_list_len):
self._conn.insert_lines([input_sql]) yield table_list[i:i + sub_list_len]
for i in range(100000):
ts -= 1 def genTbListGenerator(self, table_list, sub_list_len):
input_sql_new = self.replaceLastStr(input_sql, str(ts)) + 's' '''
print("input_sql_new---------", input_sql_new) split table_list, after split, every sub_list len is sub_list_len
self._conn.insert_lines([input_sql_new]) '''
table_list_generator = self.splitGenerator(table_list, sub_list_len)
return table_list_generator
def batchInsertTable(self, batch_list): def genTableList(self, count=4, table_count=10000):
for insert_list in batch_list: '''
print(threading.current_thread().name, "length=", len(insert_list)) gen len(table_count) table_list
print(threading.current_thread().name, 'firstline', insert_list[0]) '''
print(threading.current_thread().name, 'lastline:', insert_list[-1]) table_list = list()
self._conn.insert_lines(insert_list) for i in range(table_count):
print(threading.current_thread().name, 'end') table_list.append(self.getPerfSql(count=count))
return table_list
def genTableThread(self, thread_count=10): def threadCreateTables(self, table_list_generator, thread_count=10):
'''
thread create tables
'''
threads = list() threads = list()
for i in range(thread_count): for i in range(thread_count):
t = threading.Thread(target=self.perfTableInsert) t = threading.Thread(target=self.batchCreateTable, args=(next(table_list_generator),))
threads.append(t) threads.append(t)
return threads return threads
def genMultiThread(self, count, thread_count=10): def batchInsertRows(self, table_list, rows_count):
'''
add rows in each table ---> count=rows_count
'''
for input_sql in table_list:
ts = int(time.time())
input_sql_list = list()
for i in range(rows_count-1):
ts -= 1
elm_new = self.replaceLastStr(input_sql, str(ts)) + 's'
input_sql_list.append(elm_new)
self.batchCreateTable(input_sql_list)
def threadsInsertRows(self, rows_generator, rows_count=1000, thread_count=10):
'''
multi insert rows in each table
'''
threads = list() threads = list()
for i in range(thread_count): for i in range(thread_count):
t = threading.Thread(target=self.perfDataInsert,args=(count,)) self.lock.acquire()
t = threading.Thread(target=self.batchInsertRows, args=(next(rows_generator), rows_count,))
threads.append(t) threads.append(t)
self.lock.release()
return threads return threads
def multiThreadRun(self, threads): def multiThreadRun(self, threads):
'''
multi run threads
'''
for t in threads: for t in threads:
t.start() t.start()
for t in threads: for t in threads:
t.join() t.join()
def createStb(self, count=4): def createTables(self, count, table_count=10000, sub_list_len=1000, thread_count=10):
input_sql = self.getPerfSql(count=count, init=True) '''
self._conn.insert_lines([input_sql]) create stb and tb
'''
def threadInsertTable(self, end_list, thread_count=10): table_list = self.genTableList(count=count, table_count=table_count)
threads = list() create_tables_start_time = time.time()
for i in range(thread_count):
t = threading.Thread(target=self.batchInsertTable, args=(end_list,))
threads.append(t)
return threads
def finalRun(self):
self.createStb() self.createStb()
table_list, end_list = self.splitTableList() table_list_generator = self.genTbListGenerator(table_list, sub_list_len)
batchInsertTableThread = self.threadInsertTable(end_list=end_list) create_tables_generator, insert_rows_generator = itertools.tee(table_list_generator, 2)
self.multiThreadRun(batchInsertTableThread) self.multiThreadRun(self.threadCreateTables(table_list_generator=create_tables_generator, thread_count=thread_count))
# print(end_list) create_tables_end_time = time.time()
create_tables_time = int(create_tables_end_time - create_tables_start_time)
# def createTb(self, count=4): return_str = f'create tables\' time of {count} columns ---> {create_tables_time}s'
# input_sql = self.getPerfSql(count=count) return insert_rows_generator, create_tables_time, return_str
# for i in range(10000):
# self._conn.insert_lines([input_sql]) def insertRows(self, count, rows_generator, rows_count=1000, thread_count=10):
'''
# def createTb1(self, count=4): insert rows
# start_time = time.time() '''
# self.multiThreadRun(self.genMultiThread(input_sql)) insert_rows_start_time = time.time()
# end_time = time.time() self.multiThreadRun(self.threadsInsertRows(rows_generator=rows_generator, rows_count=rows_count, thread_count=thread_count))
# return end_time - start_time insert_rows_end_time = time.time()
insert_rows_time = int(insert_rows_end_time - insert_rows_start_time)
return_str = f'insert rows\' time of {count} columns ---> {insert_rows_time}s'
return insert_rows_time, return_str
def schemalessPerfTest(self, count, table_count=10000, sub_list_len=1000, thread_count=10):
'''
get performance
'''
insert_rows_generator = self.createTables(count=count, table_count=table_count, sub_list_len=sub_list_len, thread_count=thread_count)[0]
return self.insertRows(count=count, rows_generator=insert_rows_generator, rows_count=1000, thread_count=10)
def getPerfResults(self, test_times=3, table_count=10000, sub_list_len=1000, thread_count=10):
col4_time = 0
col1000_time = 0
col4000_time = 0
# for i in range(test_times):
# time_used = self.schemalessPerfTest(count=4, table_count=table_count, sub_list_len=sub_list_len, thread_count=thread_count)[0]
# col4_time += time_used
# col4_time /= test_times
# print(col4_time)
# tdCom.cleanTb()
for i in range(test_times):
time_used = self.schemalessPerfTest(count=1000, table_count=table_count, sub_list_len=sub_list_len, thread_count=thread_count)[0]
col1000_time += time_used
col1000_time /= test_times
print(col1000_time)
# def calInsertTableTime(self): tdCom.cleanTb()
# start_time = time.time() for i in range(test_times):
# self.createStb() time_used = self.schemalessPerfTest(count=4000, table_count=table_count, sub_list_len=sub_list_len, thread_count=thread_count)[0]
# self.multiThreadRun(self.genMultiThread()) col4000_time += time_used
# end_time = time.time() col4000_time /= test_times
# return end_time - start_time print(col4000_time)
def calRunTime(self, count=4):
start_time = time.time()
self.createStb()
self.multiThreadRun(self.genMultiThread(count=count))
end_time = time.time()
return end_time - start_time
def calRunTime1(self, count=4):
start_time = time.time()
self.createStb()
self.multiThreadRun(self.perfTableInsert())
# self.perfTableInsert()
# def schemalessInsertPerfTest(self, count=4):
# input_sql = self.getPerfSql(count)
# self.calRunTime(input_sql)
# def test(self): return col4_time, col1000_time, col4000_time
# sql1 = 'stb,id="init",t0=14865i32,t1="tvnqbjuqck" c0=37i32,c1=217i32,c2=3i32,c3=88i32 1626006833640ms'
# sql2 = 'stb,id="init",t0=14865i32,t1="tvnqbjuqck" c0=38i32,c1=217i32,c2=3i32,c3=88i32 1626006833641ms'
# self._conn.insert_lines([sql1])
# self._conn.insert_lines([sql2])
def run(self): def run(self):
print("running {}".format(__file__)) print("running {}".format(__file__))
tdSql.prepare() tdSql.prepare()
self.finalRun() result = self.getPerfResults(test_times=3, table_count=1000, sub_list_len=100, thread_count=10)
# print(self.calRunTime1(count=4)) print(result)
# print(self.calRunTime(count=4))
# print(self.genRandomTs())
# self.calInsertTableTime()
# self.test()
# table_list = self.splitTableList()[0]
# data_list = self.genDataList(table_list)
# print(len(data_list))
# end_list = [['stb,id="sub_vzvfx_dbuxp",t0=9961i32,t1="zjjfayhfep" c0=83i32,c1=169i32,c2=177i32,c3=4i32 0','stb,id="sub_vzvfx_dbuxp",t0=9961i32,t1="zjjfayhfep" c0=83i32,c1=169i32,c2=177i32,c3=4i32 0'], ['stb,id="sub_vzvfx_dbuxp",t0=9961i32,t1="zjjfayhfep" c0=83i32,c1=169i32,c2=177i32,c3=4i32 0','stb,id="sub_vzvfx_dbuxp",t0=9961i32,t1="zjjfayhfep" c0=83i32,c1=169i32,c2=177i32,c3=4i32 0']]
# rowsGenerator = self.rowsGenerator(end_list)
# for i in rowsGenerator:
# print(i)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册