diff --git a/tests/perftest-scripts/perftest-query.sh b/tests/perftest-scripts/perftest-query.sh index 8b9fa1a546eedc955b4b902a20731a4736d8f60d..cc44b224184bcd1ee8c55e769b0e3c6ca16a2012 100755 --- a/tests/perftest-scripts/perftest-query.sh +++ b/tests/perftest-scripts/perftest-query.sh @@ -73,6 +73,9 @@ function runQueryPerfTest { python3 insert/insertFromCSVPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT python3 tools/taosdemoPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT + + python3 perfbenchmark/joinPerformance.py | tee -a $PERFORMANCE_TEST_REPORT + } diff --git a/tests/pytest/perfbenchmark/joinPerformance.py b/tests/pytest/perfbenchmark/joinPerformance.py new file mode 100644 index 0000000000000000000000000000000000000000..acd4f88c9b1a2df91f7e129ac87d57c9b04c7106 --- /dev/null +++ b/tests/pytest/perfbenchmark/joinPerformance.py @@ -0,0 +1,355 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import taos +import os +import json +import argparse +import subprocess +import datetime +import re + +from multiprocessing import cpu_count +# from util.log import * +# from util.sql import * +# from util.cases import * +# from util.dnodes import * + +class JoinPerf: + + def __init__(self, clearCache, dbName, keep): + self.clearCache = clearCache + self.dbname = dbName + self.drop = "yes" + self.keep = keep + self.host = "127.0.0.1" + self.user = "root" + self.password = "taosdata" + self.config = "/etc/taosperf" + self.conn = taos.connect( + self.host, + self.user, + self.password, + self.config) + + # def init(self, conn, logSql): + # tdLog.debug(f"start to excute {__file__}") + # tdSql.init(conn.cursor()) + + def getBuildPath(self) -> str: + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/debug/build/bin")] + break + return buildPath + + def getCfgDir(self) -> str: + return self.getBuildPath() + "/sim/dnode1/cfg" + + # def initConnection(self): + # return self.getCfgDir() + + # def connectDB(self): + # self.conn = taos.connect( + # self.host, + # self.user, + # self.password, + # self.getCfgDir()) + # return self.conn.cursor() + + def dbinfocfg(self) -> dict: + return { + "name": self.dbname, + "drop": self.drop, + "replica": 1, + "days": 10, + "cache": 16, + "blocks": 8, + "precision": "ms", + "keep": self.keep, + "minRows": 100, + "maxRows": 4096, + "comp": 2, + "walLevel": 1, + "cachelast": 0, + "quorum": 1, + "fsync": 3000, + "update": 0 + } + + # def column_tag_count(self, **column_tag) -> list : + # return [{"type": type, "count": count} for type, count in column_tag.items()] + + def type_check(func): + def wrapper(self, **kwargs): + num_types = ["int", "float", "bigint", "tinyint", "smallint", "double"] + str_types = ["binary", "nchar"] + for k ,v in kwargs.items(): + if k.lower() not in num_types and k.lower() not in str_types: + return f"args {k} type error, not allowed" + elif not isinstance(v, (int, list, tuple)): + return f"value {v} type error, not allowed" + elif k.lower() in num_types and not isinstance(v, int): + return f"arg {v} takes 1 positional argument must be type int " + elif isinstance(v, (list,tuple)) and len(v) > 2: + return f"arg {v} takes from 1 to 2 positional arguments but more than 2 were given " + elif isinstance(v,(list,tuple)) and [ False for _ in v if not isinstance(_, int) ]: + return f"arg {v} takes from 1 to 2 positional arguments must be type int " + else: + pass + return func(self, **kwargs) + return wrapper + + @type_check + def column_tag_count(self, **column_tag) -> list : + init_column_tag = [] + for k, v in column_tag.items(): + if re.search(k, "int, float, bigint, tinyint, smallint, double", re.IGNORECASE): + init_column_tag.append({"type": k, "count": v}) + elif re.search(k, "binary, nchar", re.IGNORECASE): + if isinstance(v, int): + init_column_tag.append({"type": k, "count": v, "len":8}) + elif len(v) == 1: + init_column_tag.append({"type": k, "count": v[0], "len": 8}) + else: + init_column_tag.append({"type": k, "count": v[0], "len": v[1]}) + return init_column_tag + + def stbcfg(self, stb: str, child_tab_count: int, prechildtab: str, columns: dict, tags: dict) -> dict: + return { + "name": stb, + "child_table_exists": "no", + "childtable_count": child_tab_count, + "childtable_prefix": prechildtab, + "auto_create_table": "no", + "data_source": "rand", + "insert_mode": "taosc", + "insert_rows": 50, + "multi_thread_write_one_tbl": "no", + "number_of_tbl_in_one_sql": 0, + "rows_per_tbl": 1, + "max_sql_len": 65480, + "disorder_ratio": 0, + "disorder_range": 1000, + "timestamp_step": 20000, + "start_timestamp": "1969-12-31 00:00:00.000", + "sample_format": "csv", + "sample_file": "./sample.csv", + "tags_file": "", + "columns": self.column_tag_count(**columns), + "tags": self.column_tag_count(**tags) + } + + def createcfg(self,db: dict, stbs: list) -> dict: + return { + "filetype": "insert", + "cfgdir": self.config, + "host": self.host, + "port": 6030, + "user": self.user, + "password": self.password, + "thread_count": cpu_count(), + "thread_count_create_tbl": cpu_count(), + "result_file": "/tmp/insert_res.txt", + "confirm_parameter_prompt": "no", + "insert_interval": 0, + "num_of_records_per_req": 100, + "max_sql_len": 1024000, + "databases": [{ + "dbinfo": db, + "super_tables": stbs + }] + } + + def createinsertfile(self,db: dict, stbs: list) -> str: + date = datetime.datetime.now().strftime("%Y%m%d%H%M") + file_create_table = f"/tmp/insert_{date}.json" + + with open(file_create_table, 'w') as f: + json.dump(self.createcfg(db, stbs), f) + + return file_create_table + + def querysqls(self, sql: str) -> list: + return [{"sql":sql,"result":""}] + + def querycfg(self, sql: str) -> dict: + return { + "filetype": "query", + "cfgdir": self.config, + "host": self.host, + "port": 6030, + "user": self.user, + "password": self.password, + "confirm_parameter_prompt": "yes", + "databases": "db", + "specified_table_query": { + "query_interval": 0, + "concurrent": cpu_count(), + "sqls": self.querysqls(sql) + } + } + + def createqueryfile(self, sql: str): + date = datetime.datetime.now().strftime("%Y%m%d%H%M") + file_query_table = f"/tmp/query_{date}.json" + + with open(file_query_table,"w") as f: + json.dump(self.querycfg(sql), f) + + return file_query_table + + def taosdemotable(self, filepath, resultfile="/dev/null"): + taosdemopath = self.getBuildPath() + "/debug/build/bin" + with open(filepath,"r") as f: + filetype = json.load(f)["filetype"] + if filetype == "insert": + taosdemo_table_cmd = f"{taosdemopath}/taosdemo -f {filepath} > {resultfile} 2>&1" + else: + taosdemo_table_cmd = f"yes | {taosdemopath}/taosdemo -f {filepath} > {resultfile} 2>&1" + _ = subprocess.check_output(taosdemo_table_cmd, shell=True).decode("utf-8") + + def droptmpfile(self): + drop_file_cmd = "rm -f /tmp/query_* " + _ = subprocess.check_output(drop_file_cmd, shell=True).decode("utf-8") + drop_file_cmd = "rm -f querySystemInfo-*" + _ = subprocess.check_output(drop_file_cmd, shell=True).decode("utf-8") + drop_file_cmd = "rm -f /tmp/insert_* " + _ = subprocess.check_output(drop_file_cmd, shell=True).decode("utf-8") + + def run(self): + print("========== join on table schema performance ==========") + if self.clearCache == True: + # must be root permission + subprocess.check_output("echo 3 > /proc/sys/vm/drop_caches") + + # cfg = { + # 'enableRecordSql': '1' + # } + # tdDnodes.deploy(1, cfg) + + # tdLog.printNoPrefix("==========step1: create 1024 columns on different data type==========") + # db = self.dbinfocfg() + # stblist = [] + # # the supertable config for each type in column_tag_types + # column_tag_types = ["INT","FLOAT","BIGINT","TINYINT","SMALLINT","DOUBLE","BINARY","NCHAR"] + # for i in range(len(column_tag_types)): + # tagtype = { + # "INT": 0, + # "FLOAT": 0, + # "BIGINT": 0, + # "TINYINT": 0, + # "SMALLINT": 0, + # "DOUBLE": 0, + # "BINARY": 0, + # "NCHAR": 0 + # } + # columntype = tagtype.copy() + # tagtype["INT"] = 2 + # if column_tag_types[i] == "BINARY": + # columntype[column_tag_types[i]] = [509, 10] + # elif column_tag_types[i] == "NCHAR": + # columntype[column_tag_types[i]] = [350, 10] + # else: + # columntype[column_tag_types[i]] = 1021 + # supertable = self.stbcfg( + # stb=f"stb{i}", + # child_tab_count=2, + # prechildtab=f"t{i}", + # columns=columntype, + # tags=tagtype + # ) + # stblist.append(supertable) + # self.taosdemotable(self.createinsertfile(db=db, stbs=stblist)) + + # tdLog.printNoPrefix("==========step2: execute query operation==========") + # tdLog.printNoPrefix("==========execute query operation==========") + sqls = { + "nchar":"select * from t00,t01 where t00.ts=t01.ts" + } + for type, sql in sqls.items(): + result_file = f"/tmp/queryResult_{type}.log" + self.taosdemotable(self.createqueryfile(sql), resultfile=result_file) + if result_file: + # tdLog.printNoPrefix(f"execute type {type} sql, the sql is: {sql}") + print(f"execute type {type} sql, the sql is: {sql}") + max_sql_time_cmd = f''' + grep Spent {result_file} | awk 'NR==1{{max=$7;next}}{{max=max>$7?max:$7}}END{{print "Max=",max,"s"}}' + ''' + max_sql_time = subprocess.check_output(max_sql_time_cmd, shell=True).decode("UTF-8") + # tdLog.printNoPrefix(f"type: {type} sql time : {max_sql_time}") + print(f"type: {type} sql time : {max_sql_time}") + + min_sql_time_cmd = f''' + grep Spent {result_file} | awk 'NR==1{{min=$7;next}}{{min=min<$7?min:$7}}END{{print "Min=",min,"s"}}' + ''' + min_sql_time = subprocess.check_output(min_sql_time_cmd, shell=True).decode("UTF-8") + # tdLog.printNoPrefix(f"type: {type} sql time : {min_sql_time}") + print(f"type: {type} sql time : {min_sql_time}") + + avg_sql_time_cmd = f''' + grep Spent {result_file} |awk '{{sum+=$7}}END{{print "Average=",sum/NR,"s"}}' + ''' + avg_sql_time = subprocess.check_output(avg_sql_time_cmd, shell=True).decode("UTF-8") + # tdLog.printNoPrefix(f"type: {type} sql time : {avg_sql_time}") + print(f"type: {type} sql time : {avg_sql_time}") + + # tdLog.printNoPrefix(f"==========type {type} sql is over==========") + + # tdLog.printNoPrefix("==========query operation is over==========") + + self.droptmpfile() + # tdLog.printNoPrefix("==========tmp file has been deleted==========") + + # def stop(self): + # tdSql.close() + # tdLog.success(f"{__file__} successfully executed") + +# tdCases.addLinux(__file__, TDTestCase()) +# tdCases.addWindows(__file__, TDTestCase()) + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument( + '-r', + '--remove-cache', + action='store_true', + default=False, + help='clear cache before query (default: False)') + parser.add_argument( + '-d', + '--database-name', + action='store', + default='db', + type=str, + help='Database name to be created (default: db)') + parser.add_argument( + '-k', + '--keep-time', + action='store', + default=36500, + type=int, + help='Database keep parameters (default: 36500)') + + args = parser.parse_args() + jointest = JoinPerf(args.remove_cache, args.database_name, args.keep_time) + jointest.run() \ No newline at end of file