未验证 提交 db8deb44 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #5831 from taosdata/master

Merge master into develop
......@@ -1063,7 +1063,7 @@ static void rand_string(char *str, int size) {
int n;
for (n = 0; n < size - 1; n++) {
int key = rand_tinyint() % (int)(sizeof(charset) - 1);
int key = abs(rand_tinyint()) % (int)(sizeof(charset) - 1);
str[n] = charset[key];
str[n] = 0;
......@@ -4799,6 +4799,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
int interlaceRows = superTblInfo?superTblInfo->interlaceRows:g_args.interlace_rows;
if (interlaceRows > insertRows)
interlaceRows = insertRows;
if (interlaceRows > g_args.num_of_RPR)
interlaceRows = g_args.num_of_RPR;
......@@ -4847,9 +4850,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
assert(pThreadInfo->ntables > 0);
if (interlaceRows > g_args.num_of_RPR)
interlaceRows = g_args.num_of_RPR;
int batchPerTbl = interlaceRows;
int batchPerTblTimes;
......@@ -1878,6 +1878,17 @@ static void doFreeQueryHandle(SQueryRuntimeEnv* pRuntimeEnv) {
assert(pMemRef->ref == 0 && pMemRef->snapshot.imem == NULL && pMemRef->snapshot.mem == NULL);
static void destroyTsComp(SQueryRuntimeEnv *pRuntimeEnv, SQuery *pQuery) {
if (isTsCompQuery(pQuery)) {
SColumnInfoData* pColInfoData = taosArrayGet(pRuntimeEnv->outputBuf->pDataBlock, 0);
FILE *f = *(FILE **)pColInfoData->pData; // TODO refactor
if (f) {
*(FILE **)pColInfoData->pData = NULL;
static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SQInfo* pQInfo = (SQInfo*) pRuntimeEnv->qinfo;
......@@ -1896,6 +1907,8 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
destroyTsComp(pRuntimeEnv, pQuery);
pRuntimeEnv->pTsBuf = tsBufDestroy(pRuntimeEnv->pTsBuf);
......@@ -6863,6 +6876,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
*(FILE **)pColInfoData->pData = NULL;
// all data returned, set query over
......@@ -266,6 +266,7 @@ int32_t tExtMemBufferFlush(tExtMemBuffer *pMemBuffer) {
size_t retVal = fwrite((char *)&(first->item), pMemBuffer->pageSize, 1, pMemBuffer->file);
if (retVal <= 0) { // failed to write to buffer, may be not enough space
ret = TAOS_SYSTEM_ERROR(errno);
pMemBuffer->pHead = first;
return ret;
......@@ -104,7 +104,7 @@ int32_t walAlter(void *handle, SWalCfg *pCfg) {
pWal->level = pCfg->walLevel;
pWal->fsyncPeriod = pCfg->fsyncPeriod;
pWal->fsyncSeq = pCfg->fsyncPeriod % 1000;
pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
......@@ -66,7 +66,7 @@ int main(int argc, char *argv[]) {
printf("failed to connect to server, reason:%s\n", "null taos"/*taos_errstr(taos)*/);
for (int i = 0; i < 4000000; i++) {
for (int i = 0; i < 100; i++) {
Test(taos, qstr, i);
......@@ -40,8 +40,8 @@ function buildTDengine {
git remote update > /dev/null
git reset --hard HEAD
git checkout develop
REMOTE_COMMIT=`git rev-parse --short remotes/origin/develop`
git checkout master
REMOTE_COMMIT=`git rev-parse --short remotes/origin/master`
LOCAL_COMMIT=`git rev-parse --short @`
......@@ -73,6 +73,9 @@ function runQueryPerfTest {
python3 insert/insertFromCSVPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
python3 tools/taosdemoPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
python3 perfbenchmark/joinPerformance.py | tee -a $PERFORMANCE_TEST_REPORT
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
# -*- coding: utf-8 -*-
import taos
import os
import json
import argparse
import subprocess
import datetime
import re
from multiprocessing import cpu_count
# from util.log import *
# from util.sql import *
# from util.cases import *
# from util.dnodes import *
class JoinPerf:
def __init__(self, clearCache, dbName, keep):
self.clearCache = clearCache
self.dbname = dbName
self.drop = "yes"
self.keep = keep
self.host = ""
self.user = "root"
self.password = "taosdata"
self.config = "/etc/taosperf"
self.conn = taos.connect(
# def init(self, conn, logSql):
# tdLog.debug(f"start to excute {__file__}")
# tdSql.init(conn.cursor())
def getBuildPath(self) -> str:
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/debug/build/bin")]
return buildPath
def getCfgDir(self) -> str:
return self.getBuildPath() + "/sim/dnode1/cfg"
# def initConnection(self):
# return self.getCfgDir()
# def connectDB(self):
# self.conn = taos.connect(
# self.host,
# self.user,
# self.password,
# self.getCfgDir())
# return self.conn.cursor()
def dbinfocfg(self) -> dict:
return {
"name": self.dbname,
"drop": self.drop,
"replica": 1,
"days": 10,
"cache": 16,
"blocks": 8,
"precision": "ms",
"keep": self.keep,
"minRows": 100,
"maxRows": 4096,
"comp": 2,
"walLevel": 1,
"cachelast": 0,
"quorum": 1,
"fsync": 3000,
"update": 0
# def column_tag_count(self, **column_tag) -> list :
# return [{"type": type, "count": count} for type, count in column_tag.items()]
def type_check(func):
def wrapper(self, **kwargs):
num_types = ["int", "float", "bigint", "tinyint", "smallint", "double"]
str_types = ["binary", "nchar"]
for k ,v in kwargs.items():
if k.lower() not in num_types and k.lower() not in str_types:
return f"args {k} type error, not allowed"
elif not isinstance(v, (int, list, tuple)):
return f"value {v} type error, not allowed"
elif k.lower() in num_types and not isinstance(v, int):
return f"arg {v} takes 1 positional argument must be type int "
elif isinstance(v, (list,tuple)) and len(v) > 2:
return f"arg {v} takes from 1 to 2 positional arguments but more than 2 were given "
elif isinstance(v,(list,tuple)) and [ False for _ in v if not isinstance(_, int) ]:
return f"arg {v} takes from 1 to 2 positional arguments must be type int "
return func(self, **kwargs)
return wrapper
def column_tag_count(self, **column_tag) -> list :
init_column_tag = []
for k, v in column_tag.items():
if re.search(k, "int, float, bigint, tinyint, smallint, double", re.IGNORECASE):
init_column_tag.append({"type": k, "count": v})
elif re.search(k, "binary, nchar", re.IGNORECASE):
if isinstance(v, int):
init_column_tag.append({"type": k, "count": v, "len":8})
elif len(v) == 1:
init_column_tag.append({"type": k, "count": v[0], "len": 8})
init_column_tag.append({"type": k, "count": v[0], "len": v[1]})
return init_column_tag
def stbcfg(self, stb: str, child_tab_count: int, prechildtab: str, columns: dict, tags: dict) -> dict:
return {
"name": stb,
"child_table_exists": "no",
"childtable_count": child_tab_count,
"childtable_prefix": prechildtab,
"auto_create_table": "no",
"data_source": "rand",
"insert_mode": "taosc",
"insert_rows": 50,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"rows_per_tbl": 1,
"max_sql_len": 65480,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 20000,
"start_timestamp": "1969-12-31 00:00:00.000",
"sample_format": "csv",
"sample_file": "./sample.csv",
"tags_file": "",
"columns": self.column_tag_count(**columns),
"tags": self.column_tag_count(**tags)
def createcfg(self,db: dict, stbs: list) -> dict:
return {
"filetype": "insert",
"cfgdir": self.config,
"host": self.host,
"port": 6030,
"user": self.user,
"password": self.password,
"thread_count": cpu_count(),
"thread_count_create_tbl": cpu_count(),
"result_file": "/tmp/insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"num_of_records_per_req": 100,
"max_sql_len": 1024000,
"databases": [{
"dbinfo": db,
"super_tables": stbs
def createinsertfile(self,db: dict, stbs: list) -> str:
date = datetime.datetime.now().strftime("%Y%m%d%H%M")
file_create_table = f"/tmp/insert_{date}.json"
with open(file_create_table, 'w') as f:
json.dump(self.createcfg(db, stbs), f)
return file_create_table
def querysqls(self, sql: str) -> list:
return [{"sql":sql,"result":""}]
def querycfg(self, sql: str) -> dict:
return {
"filetype": "query",
"cfgdir": self.config,
"host": self.host,
"port": 6030,
"user": self.user,
"password": self.password,
"confirm_parameter_prompt": "yes",
"databases": "db",
"specified_table_query": {
"query_interval": 0,
"concurrent": cpu_count(),
"sqls": self.querysqls(sql)
def createqueryfile(self, sql: str):
date = datetime.datetime.now().strftime("%Y%m%d%H%M")
file_query_table = f"/tmp/query_{date}.json"
with open(file_query_table,"w") as f:
json.dump(self.querycfg(sql), f)
return file_query_table
def taosdemotable(self, filepath, resultfile="/dev/null"):
taosdemopath = self.getBuildPath() + "/debug/build/bin"
with open(filepath,"r") as f:
filetype = json.load(f)["filetype"]
if filetype == "insert":
taosdemo_table_cmd = f"{taosdemopath}/taosdemo -f {filepath} > {resultfile} 2>&1"
taosdemo_table_cmd = f"yes | {taosdemopath}/taosdemo -f {filepath} > {resultfile} 2>&1"
_ = subprocess.check_output(taosdemo_table_cmd, shell=True).decode("utf-8")
def droptmpfile(self):
drop_file_cmd = "rm -f /tmp/query_* "
_ = subprocess.check_output(drop_file_cmd, shell=True).decode("utf-8")
drop_file_cmd = "rm -f querySystemInfo-*"
_ = subprocess.check_output(drop_file_cmd, shell=True).decode("utf-8")
drop_file_cmd = "rm -f /tmp/insert_* "
_ = subprocess.check_output(drop_file_cmd, shell=True).decode("utf-8")
def run(self):
print("========== join on table schema performance ==========")
if self.clearCache == True:
# must be root permission
subprocess.check_output("echo 3 > /proc/sys/vm/drop_caches")
# cfg = {
# 'enableRecordSql': '1'
# }
# tdDnodes.deploy(1, cfg)
# tdLog.printNoPrefix("==========step1: create 1024 columns on different data type==========")
# db = self.dbinfocfg()
# stblist = []
# # the supertable config for each type in column_tag_types
# column_tag_types = ["INT","FLOAT","BIGINT","TINYINT","SMALLINT","DOUBLE","BINARY","NCHAR"]
# for i in range(len(column_tag_types)):
# tagtype = {
# "INT": 0,
# "FLOAT": 0,
# "BIGINT": 0,
# "TINYINT": 0,
# "SMALLINT": 0,
# "DOUBLE": 0,
# "BINARY": 0,
# "NCHAR": 0
# }
# columntype = tagtype.copy()
# tagtype["INT"] = 2
# if column_tag_types[i] == "BINARY":
# columntype[column_tag_types[i]] = [509, 10]
# elif column_tag_types[i] == "NCHAR":
# columntype[column_tag_types[i]] = [350, 10]
# else:
# columntype[column_tag_types[i]] = 1021
# supertable = self.stbcfg(
# stb=f"stb{i}",
# child_tab_count=2,
# prechildtab=f"t{i}",
# columns=columntype,
# tags=tagtype
# )
# stblist.append(supertable)
# self.taosdemotable(self.createinsertfile(db=db, stbs=stblist))
# tdLog.printNoPrefix("==========step2: execute query operation==========")
# tdLog.printNoPrefix("==========execute query operation==========")
sqls = {
"nchar":"select * from t00,t01 where t00.ts=t01.ts"
for type, sql in sqls.items():
result_file = f"/tmp/queryResult_{type}.log"
self.taosdemotable(self.createqueryfile(sql), resultfile=result_file)
if result_file:
# tdLog.printNoPrefix(f"execute type {type} sql, the sql is: {sql}")
print(f"execute type {type} sql, the sql is: {sql}")
max_sql_time_cmd = f'''
grep Spent {result_file} | awk 'NR==1{{max=$7;next}}{{max=max>$7?max:$7}}END{{print "Max=",max,"s"}}'
max_sql_time = subprocess.check_output(max_sql_time_cmd, shell=True).decode("UTF-8")
# tdLog.printNoPrefix(f"type: {type} sql time : {max_sql_time}")
print(f"type: {type} sql time : {max_sql_time}")
min_sql_time_cmd = f'''
grep Spent {result_file} | awk 'NR==1{{min=$7;next}}{{min=min<$7?min:$7}}END{{print "Min=",min,"s"}}'
min_sql_time = subprocess.check_output(min_sql_time_cmd, shell=True).decode("UTF-8")
# tdLog.printNoPrefix(f"type: {type} sql time : {min_sql_time}")
print(f"type: {type} sql time : {min_sql_time}")
avg_sql_time_cmd = f'''
grep Spent {result_file} |awk '{{sum+=$7}}END{{print "Average=",sum/NR,"s"}}'
avg_sql_time = subprocess.check_output(avg_sql_time_cmd, shell=True).decode("UTF-8")
# tdLog.printNoPrefix(f"type: {type} sql time : {avg_sql_time}")
print(f"type: {type} sql time : {avg_sql_time}")
# tdLog.printNoPrefix(f"==========type {type} sql is over==========")
# tdLog.printNoPrefix("==========query operation is over==========")
# tdLog.printNoPrefix("==========tmp file has been deleted==========")
# def stop(self):
# tdSql.close()
# tdLog.success(f"{__file__} successfully executed")
# tdCases.addLinux(__file__, TDTestCase())
# tdCases.addWindows(__file__, TDTestCase())
if __name__ == '__main__':
parser = argparse.ArgumentParser()
help='clear cache before query (default: False)')
help='Database name to be created (default: db)')
help='Database keep parameters (default: 36500)')
args = parser.parse_args()
jointest = JoinPerf(args.remove_cache, args.database_name, args.keep_time)
\ No newline at end of file
......@@ -516,6 +516,15 @@ if [ "$2" != "sim" ] && [ "$2" != "python" ] && [ "$2" != "jdbc" ] && [ "$2" !=
echo "asyncdemo pass"
totalExamplePass=`expr $totalExamplePass + 1`
./demo > /dev/null 2>&1
if [ $? != "0" ]; then
echo "demo failed"
totalExampleFailed=`expr $totalExampleFailed + 1`
echo "demo pass"
totalExamplePass=`expr $totalExamplePass + 1`
if [ "$totalExamplePass" -gt "0" ]; then
echo -e "\n${GREEN} ### Total $totalExamplePass examples succeed! ### ${NC}"
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册