提交 0b1b28ec 编写于 作者: sangshuduo's avatar sangshuduo

[TD-3218] <fix>: python taosdemo improvement.

上级 59239c6d
......@@ -14,6 +14,7 @@
# -*- coding: utf-8 -*-
import inspect
import sys
import getopt
import requests
......@@ -21,11 +22,33 @@ import json
import random
import time
import datetime
from multiprocessing import Manager, Pool, Lock
from multiprocessing import Manager, Lock, Process, current_process
from multipledispatch import dispatch
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
global verbose
global debug
global threads
def PrintLineno():
callerframerecord = inspect.stack()[1] # 0 represents this line
# 1 represents line at caller
frame = callerframerecord[0]
info = inspect.getframeinfo(frame)
print("LN%d" % info.lineno) # __LINE__ -> 13
def PrintFrame():
callerframerecord = inspect.stack()[1] # 0 represents this line
# 1 represents line at caller
frame = callerframerecord[0]
info = inspect.getframeinfo(frame)
print("FILE: %s" % info.filename) # __FILE__ -> Test.py
print("%s()" % info.function) # __FUNCTION__ -> Main
print("LN%d" % info.lineno) # __LINE__ -> 13
@dispatch(str, str)
def v_print(msg: str, arg: str):
if verbose:
......@@ -136,7 +159,12 @@ def query_func(process: int, thread: int, cmd: str):
def query_data_process(cmd: str):
# establish connection if native
if native:
v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir)
v_print(
"host:%s, user:%s passwd:%s configDir:%s ",
host,
user,
password,
configDir)
try:
conn = taos.connect(
host=host,
......@@ -171,11 +199,11 @@ def query_data_process(cmd: str):
else:
restful_execute(
host,
port,
user,
password,
cmd)
host,
port,
user,
password,
cmd)
if native:
cursor.close()
......@@ -256,41 +284,57 @@ def drop_databases():
(dbName, i))
def insert_func(process: int, thread: int):
v_print("%d process %d thread, insert_func ", process, thread)
def insert_func(process: int, thread: int, verbose: bool):
if verbose:
print("%d process %d thread, insert_func " % (process, thread))
PrintLineno()
# generate uuid
uuid_int = random.randint(0, numOfTb + 1)
PrintLineno()
uuid = "%s" % uuid_int
v_print("uuid is: %s", uuid)
PrintLineno()
if verbose:
print("uuid is: %s" % uuid)
PrintLineno()
# establish connection if native
if native:
v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir)
if verbose:
print(
"host:%s, user:%s passwd:%s configDir:%s " %
(host,
user,
password,
configDir))
try:
conn = taos.connect(
host=host,
user=user,
password=password,
config=configDir)
v_print("conn: %s", str(conn.__class__))
if verbose:
print("conn: %s" % str(conn.__class__))
except Exception as e:
print("Error: %s" % e.args[0])
sys.exit(1)
try:
cursor = conn.cursor()
v_print("cursor:%d %s", id(cursor), str(cursor.__class__))
if verbose:
print("cursor:%d %s" % (id(cursor), str(cursor.__class__)))
except Exception as e:
print("Error: %s" % e.args[0])
conn.close()
sys.exit(1)
v_print("numOfRec %d:", numOfRec)
if verbose:
print("numOfRec %d:" % numOfRec)
row = 0
while row < numOfRec:
v_print("row: %d", row)
if verbose:
print("row: %d" % row)
sqlCmd = ['INSERT INTO ']
try:
sqlCmd.append(
......@@ -307,9 +351,9 @@ def insert_func(process: int, thread: int):
for batchIter in range(0, batch):
sqlCmd.append("('%s', %f) " %
(
start_time +
datetime.timedelta(
milliseconds=batchIter),
start_time +
datetime.timedelta(
milliseconds=batchIter),
random.random()))
row = row + 1
if row >= numOfRec:
......@@ -321,23 +365,27 @@ def insert_func(process: int, thread: int):
cmd = ' '.join(sqlCmd)
if measure:
exec_start_time = datetime.datetime.now()
if verbose:
print("cmd: %s, length:%d" % (cmd, len(cmd)))
if native:
affectedRows = cursor.execute(cmd)
else:
restful_execute(
host, port, user, password, cmd)
if measure:
exec_start_time = datetime.datetime.now()
if measure:
exec_end_time = datetime.datetime.now()
exec_delta = exec_end_time - exec_start_time
v_print(
"consume %d microseconds",
exec_delta.microseconds)
if native:
affectedRows = cursor.execute(cmd)
if verbose:
print("affectedRows: %d" % affectedRows)
else:
restful_execute(
host, port, user, password, cmd)
v_print("cmd: %s, length:%d", cmd, len(cmd))
if measure:
exec_end_time = datetime.datetime.now()
exec_delta = exec_end_time - exec_start_time
if verbose:
print(
"consume %d microseconds" %
exec_delta.microseconds)
if native:
cursor.close()
......@@ -374,35 +422,49 @@ def create_tb():
(tbName, j))
def insert_data_process(lock, i: int, begin: int, end: int):
lock.acquire()
tasks = end - begin
v_print("insert_data_process:%d table from %d to %d, tasks %d", i, begin, end, tasks)
def insert_data_process(
lock,
i: int,
begin: int,
end: int,
threads: int,
verbose: int):
PrintLineno()
try:
# lock.acquire()
print(current_process().name + " acquire")
if (threads < (end - begin)):
for j in range(begin, end, threads):
tasks = end - begin
if verbose:
print(
"insert_data_process:%d table from %d to %d" %
(i, begin, end))
if (threads < (end - begin)):
for j in range(begin, end, threads):
with ThreadPoolExecutor(max_workers=threads) as executor:
k = end if ((j + threads) > end) else (j + threads)
workers = [
executor.submit(
insert_func,
i, n, verbose) for n in range(
j, k)]
wait(workers, return_when=ALL_COMPLETED)
else:
with ThreadPoolExecutor(max_workers=threads) as executor:
k = end if ((j + threads) > end) else (j + threads)
workers = [
executor.submit(
insert_func,
i,
n) for n in range(
j,
k)]
i, j, verbose) for j in range(
begin, end)]
wait(workers, return_when=ALL_COMPLETED)
else:
with ThreadPoolExecutor(max_workers=threads) as executor:
workers = [
executor.submit(
insert_func,
i,
j) for j in range(
begin,
end)]
wait(workers, return_when=ALL_COMPLETED)
except BaseException:
print(current_process().name + " lock failed")
lock.release()
finally:
if verbose:
print(current_process().name + " release")
# lock.release()
def query_db(i):
......@@ -462,9 +524,9 @@ def printConfig():
if __name__ == "__main__":
native = False
verbose = False
debug = False
native = False
measure = True
dropDbOnly = False
colsPerRecord = 3
......@@ -534,22 +596,23 @@ if __name__ == "__main__":
print('\t-P, --password <password> password, The password to use when connecting to the server. Default is \'taosdata\'.')
print('\t-l, --colsPerRec <number> num_of_columns_per_record, The number of columns per record. Default is 3.')
print(
'\t-d, --dbname <dbname> database, Destination database. Default is \'test\'.')
'\t-d, --dbname <dbname> database, Destination database. Default is \'test\'.')
print('\t-a, --replica <replications> replica, Set the replica parameters of the database, Default 1, min: 1, max: 5.')
print(
'\t-m, --tbname <table prefix> table_prefix, Table prefix name. Default is \'t\'.')
'\t-m, --tbname <table prefix> table_prefix, Table prefix name. Default is \'t\'.')
print(
'\t-M, --stable flag, Use super table. Default is no')
'\t-M, --stable flag, Use super table. Default is no')
print(
'\t-s, --stbname <stable prefix> stable_prefix, STable prefix name. Default is \'st\'')
print('\t-Q, --query [NO|EACHTB|command] query, Execute query command. set \'EACHTB\' means select * from each table')
'\t-s, --stbname <stable prefix> stable_prefix, STable prefix name. Default is \'st\'')
print(
'\t-T, --threads <number> num_of_threads, The number of threads. Default is 1.')
'\t-Q, --query [NO|EACHTB|command] query, Execute query command. set \'EACHTB\' means select * from each table')
print(
'\t-C, --processes <number> num_of_processes, The number of threads. Default is 1.')
'\t-T, --threads <number> num_of_threads, The number of threads. Default is 1.')
print(
'\t-C, --processes <number> num_of_processes, The number of threads. Default is 1.')
print('\t-r, --batch <number> num_of_records_per_req, The number of records per request. Default is 1000.')
print(
'\t-t, --numOfTb <number> num_of_tables, The number of tables. Default is 1.')
'\t-t, --numOfTb <number> num_of_tables, The number of tables. Default is 1.')
print('\t-n, --numOfRec <number> num_of_records_per_table, The number of records per table. Default is 1.')
print('\t-c, --config <path> config_directory, Configuration directory. Default is \'/etc/taos/\'.')
print('\t-x, --inserOnly flag, Insert only flag.')
......@@ -559,7 +622,7 @@ if __name__ == "__main__":
print('\t-v, --verbose Print verbose output')
print('\t-g, --debug Print debug output')
print(
'\t-y, --skipPrompt Skip read key for continous test, default is not skip')
'\t-y, --skipPrompt Skip read key for continous test, default is not skip')
print('')
sys.exit(0)
......@@ -636,7 +699,6 @@ if __name__ == "__main__":
print("FATAL: number of records must be larger than 0")
sys.exit(1)
if key in ['-c', '--config']:
configDir = value
v_print("config dir: %s", configDir)
......@@ -679,7 +741,12 @@ if __name__ == "__main__":
# establish connection first if native
if native:
v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir)
v_print(
"host:%s, user:%s passwd:%s configDir:%s ",
host,
user,
password,
configDir)
try:
conn = taos.connect(
host=host,
......@@ -734,7 +801,7 @@ if __name__ == "__main__":
end_time = time.time()
print(
"Total time consumed {} seconds for create table.".format(
(end_time - start_time_begin)))
(end_time - start_time_begin)))
if native:
cursor.close()
......@@ -746,7 +813,6 @@ if __name__ == "__main__":
manager = Manager()
lock = manager.Lock()
pool = Pool(processes)
begin = 0
end = 0
......@@ -763,6 +829,7 @@ if __name__ == "__main__":
quotient,
remainder)
procs = []
for i in range(processes):
begin = end
......@@ -770,18 +837,28 @@ if __name__ == "__main__":
end = begin + quotient + 1
else:
end = begin + quotient
pool.apply_async(insert_data_process, args=(lock, i, begin, end,))
p = Process(
target=insert_data_process,
args=(
lock,
i,
begin,
end,
threads,
verbose))
procs.append(p)
p.start()
for p in procs:
p.join()
pool.close()
pool.join()
time.sleep(1)
if measure:
end_time = time.time()
print(
"Total time consumed {} seconds for insert data.".format(
(end_time - start_time)))
(end_time - start_time)))
# query data
if queryCmd != "NO":
......@@ -795,3 +872,4 @@ if __name__ == "__main__":
(end_time - start_time_begin)))
print("done")
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册