未验证 提交 6e3322d3 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Feature/sangshuduo/td 3434 taosdemo support query times (#5541)

* [TD-3434] <feature>: taosdemo support query times

first commit.

* [TD-3434] <feature>: taosdemo support query times.

refine interlace test case.

* [TD-3434] <feature>: taosdemo support query times.

change cmdline parameters according to TD-3431.

* [TD-3434] <feature>: taosdemo support query times.

fix typo in fulltest.sh

* [TD-3434] <feature>: taosdemo support query times.

add 2 sec sleep to avoid segfault.

* [TD-3434] <feature>: taosdemo support query times.

make query test easier to pass.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 51d5b560
......@@ -199,6 +199,7 @@ typedef struct SArguments_S {
int num_of_CPR;
int num_of_threads;
int insert_interval;
int query_times;
int interlace_rows;
int num_of_RPR;
int max_sql_len;
......@@ -351,7 +352,7 @@ typedef struct SuperQueryInfo_S {
int subscribeInterval; // ms
int subscribeRestart;
int subscribeKeepProgress;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
} SuperQueryInfo;
......@@ -359,7 +360,7 @@ typedef struct SuperQueryInfo_S {
typedef struct SubQueryInfo_S {
char sTblName[MAX_TB_NAME_SIZE+1];
int rate; // 0: unlimit > 0 loop/s
int threadCnt;
int threadCnt;
int subscribeMode; // 0: sync, 1: async
int subscribeInterval; // ms
int subscribeRestart;
......@@ -367,7 +368,7 @@ typedef struct SubQueryInfo_S {
int childTblCount;
char childTblPrefix[MAX_TB_NAME_SIZE];
int sqlCount;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
......@@ -397,14 +398,14 @@ typedef struct SThreadInfo_S {
int end_table_to;
int ntables;
int data_of_rate;
uint64_t start_time;
char* cols;
bool use_metric;
uint64_t start_time;
char* cols;
bool use_metric;
SSuperTable* superTblInfo;
// for async insert
tsem_t lock_sem;
int64_t counter;
int64_t counter;
uint64_t st;
uint64_t et;
int64_t lastTs;
......@@ -547,6 +548,7 @@ SArguments g_args = {
10, // num_of_CPR
10, // num_of_connections/thread
0, // insert_interval
1, // query_times
0, // interlace_rows;
100, // num_of_RPR
TSDB_PAYLOAD_SIZE, // max_sql_len
......@@ -681,6 +683,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->num_of_threads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") == 0) {
arguments->insert_interval = atoi(argv[++i]);
} else if (strcmp(argv[i], "-qt") == 0) {
arguments->query_times = atoi(argv[++i]);
} else if (strcmp(argv[i], "-B") == 0) {
arguments->interlace_rows = atoi(argv[++i]);
} else if (strcmp(argv[i], "-r") == 0) {
......@@ -1379,12 +1383,12 @@ static void printfQueryMeta() {
printf("host: \033[33m%s:%u\033[0m\n",
g_queryInfo.host, g_queryInfo.port);
printf("user: \033[33m%s\033[0m\n", g_queryInfo.user);
printf("password: \033[33m%s\033[0m\n", g_queryInfo.password);
printf("database name: \033[33m%s\033[0m\n", g_queryInfo.dbName);
printf("\n");
printf("specified table query info: \n");
printf("query interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.rate);
printf("query times: \033[33m%d\033[0m\n", g_args.query_times);
printf("concurrent: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.concurrent);
printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.sqlCount);
......@@ -1421,7 +1425,6 @@ static void printfQueryMeta() {
SHOW_PARSE_RESULT_END();
}
static char* formatTimestamp(char* buf, int64_t val, int precision) {
time_t tt;
if (precision == TSDB_TIME_PRECISION_MICRO) {
......@@ -2045,15 +2048,25 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
res = taos_query(taos, command);
int32_t code = taos_errno(res);
if (code != 0) {
printf("failed to run command %s\n", command);
taos_free_result(res);
taos_close(taos);
errorPrint("%s() LN%d, failed to run command %s\n",
__func__, __LINE__, command);
exit(-1);
}
int childTblCount = (limit < 0)?10000:limit;
int count = 0;
// childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN);
if (childTblName == NULL) {
childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN);
if (NULL == childTblName) {
taos_free_result(res);
taos_close(taos);
errorPrint("%s() LN%d, failed to allocate memory!\n", __func__, __LINE__);
exit(-1);
}
}
char* pTblName = childTblName;
while ((row = taos_fetch_row(res)) != NULL) {
int32_t* len = taos_fetch_lengths(res);
......@@ -3015,10 +3028,20 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER;
}
cJSON* rowsPerTbl = cJSON_GetObjectItem(root, "interlace_rows");
if (rowsPerTbl && rowsPerTbl->type == cJSON_Number) {
g_args.interlace_rows = rowsPerTbl->valueint;
} else if (!rowsPerTbl) {
cJSON* gQueryTimes = cJSON_GetObjectItem(root, "query_times");
if (gQueryTimes && gQueryTimes->type == cJSON_Number) {
g_args.query_times = gQueryTimes->valueint;
} else if (!gQueryTimes) {
g_args.query_times = 1;
} else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", __func__, __LINE__);
goto PARSE_OVER;
}
cJSON* interlaceRows = cJSON_GetObjectItem(root, "interlace_rows");
if (interlaceRows && interlaceRows->type == cJSON_Number) {
g_args.interlace_rows = interlaceRows->valueint;
} else if (!interlaceRows) {
g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else {
errorPrint("%s() LN%d, failed to read json, interlace_rows input mistake\n", __func__, __LINE__);
......@@ -4448,7 +4471,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo->totalAffectedRows = 0;
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
int insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
uint64_t st = 0;
uint64_t et = 0xffffffff;
......@@ -4647,8 +4671,10 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
int64_t startTs = taosGetTimestampUs();
int64_t endTs;
int timeStampStep = superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
int timeStampStep =
superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
int insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
uint64_t st = 0;
uint64_t et = 0xffffffff;
......@@ -4657,7 +4683,8 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->samplePos = 0;
for (uint32_t tableSeq = pThreadInfo->start_table_from; tableSeq <= pThreadInfo->end_table_to;
for (uint32_t tableSeq =
pThreadInfo->start_table_from; tableSeq <= pThreadInfo->end_table_to;
tableSeq ++) {
int64_t start_time = pThreadInfo->start_time;
......@@ -4768,14 +4795,15 @@ static void callBack(void *param, TAOS_RES *res, int code) {
threadInfo* winfo = (threadInfo*)param;
SSuperTable* superTblInfo = winfo->superTblInfo;
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
int insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
if (insert_interval) {
winfo->et = taosGetTimestampUs();
if (((winfo->et - winfo->st)/1000) < insert_interval) {
taosMsleep(insert_interval - (winfo->et - winfo->st)/1000); // ms
}
}
char *buffer = calloc(1, winfo->superTblInfo->maxSqlLen);
char *data = calloc(1, MAX_DATA_SIZE);
char *pstr = buffer;
......@@ -4793,17 +4821,17 @@ static void callBack(void *param, TAOS_RES *res, int code) {
taos_free_result(res);
return;
}
for (int i = 0; i < g_args.num_of_RPR; i++) {
int rand_num = taosRandom() % 100;
if (0 != winfo->superTblInfo->disorderRatio && rand_num < winfo->superTblInfo->disorderRatio)
{
if (0 != winfo->superTblInfo->disorderRatio
&& rand_num < winfo->superTblInfo->disorderRatio) {
int64_t d = winfo->lastTs - taosRandom() % 1000000 + rand_num;
//generateData(data, datatype, ncols_per_record, d, len_of_binary);
(void)generateRowData(data, MAX_DATA_SIZE, d, winfo->superTblInfo);
generateRowData(data, MAX_DATA_SIZE, d, winfo->superTblInfo);
} else {
//generateData(data, datatype, ncols_per_record, start_time += 1000, len_of_binary);
(void)generateRowData(data, MAX_DATA_SIZE, winfo->lastTs += 1000, winfo->superTblInfo);
generateRowData(data, MAX_DATA_SIZE, winfo->lastTs += 1000, winfo->superTblInfo);
}
pstr += sprintf(pstr, "%s", data);
winfo->counter++;
......@@ -4831,7 +4859,8 @@ static void *asyncWrite(void *sarg) {
winfo->et = 0;
winfo->lastTs = winfo->start_time;
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
int insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
if (insert_interval) {
winfo->st = taosGetTimestampUs();
}
......@@ -4857,7 +4886,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
int ntables = 0;
if (superTblInfo) {
if ((superTblInfo->childTblOffset >= 0)
if ((superTblInfo->childTblOffset >= 0)
&& (superTblInfo->childTblLimit > 0)) {
ntables = superTblInfo->childTblLimit;
......@@ -5416,7 +5445,9 @@ static void *subQueryProcess(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg;
int64_t st = 0;
int64_t et = (int64_t)g_queryInfo.subQueryInfo.rate*1000;
while (1) {
int queryTimes = g_args.query_times;
while (queryTimes --) {
if (g_queryInfo.subQueryInfo.rate
&& (et - st) < (int64_t)g_queryInfo.subQueryInfo.rate*1000) {
taosMsleep(g_queryInfo.subQueryInfo.rate*1000 - (et - st)); // ms
......@@ -5444,15 +5475,21 @@ static void *subQueryProcess(void *sarg) {
winfo->end_table_to,
(double)(et - st)/1000000.0);
}
return NULL;
}
static int queryTestProcess() {
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
NULL,
setupForAnsiEscape();
printfQueryMeta();
resetAfterAnsiEscape();
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
NULL,
g_queryInfo.port);
if (taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
......@@ -5467,13 +5504,11 @@ static int queryTestProcess() {
&g_queryInfo.subQueryInfo.childTblCount);
}
printfQueryMeta();
if (!g_args.answer_yes) {
printf("Press enter key to continue\n\n");
(void)getchar();
}
printfQuerySystemInfo(taos);
pthread_t *pids = NULL;
......@@ -5762,18 +5797,20 @@ static void *superSubscribeProcess(void *sarg) {
}
static int subscribeTestProcess() {
setupForAnsiEscape();
printfQueryMeta();
resetAfterAnsiEscape();
if (!g_args.answer_yes) {
printf("Press enter key to continue\n\n");
(void)getchar();
}
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
g_queryInfo.dbName,
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
g_queryInfo.dbName,
g_queryInfo.port);
if (taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
......@@ -5781,10 +5818,10 @@ static int subscribeTestProcess() {
}
if (0 != g_queryInfo.subQueryInfo.sqlCount) {
getAllChildNameOfSuperTable(taos,
g_queryInfo.dbName,
g_queryInfo.subQueryInfo.sTblName,
&g_queryInfo.subQueryInfo.childTblName,
getAllChildNameOfSuperTable(taos,
g_queryInfo.dbName,
g_queryInfo.subQueryInfo.sTblName,
&g_queryInfo.subQueryInfo.childTblName,
&g_queryInfo.subQueryInfo.childTblCount);
}
......@@ -6094,15 +6131,21 @@ static void querySqlFile(TAOS* taos, char* sqlFile)
static void testMetaFile() {
if (INSERT_TEST == g_args.test_mode) {
if (g_Dbs.cfgDir[0]) taos_options(TSDB_OPTION_CONFIGDIR, g_Dbs.cfgDir);
insertTestProcess();
} else if (QUERY_TEST == g_args.test_mode) {
if (g_queryInfo.cfgDir[0])
taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir);
queryTestProcess();
} else if (SUBSCRIBE_TEST == g_args.test_mode) {
if (g_queryInfo.cfgDir[0])
taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir);
subscribeTestProcess();
} else {
;
}
......
......@@ -233,16 +233,6 @@ python3 client/twoClients.py
python3 test.py -f query/queryInterval.py
python3 test.py -f query/queryFillTest.py
# tools
python3 test.py -f tools/taosdemoTest.py
python3 test.py -f tools/taosdemoTestWithoutMetric.py
python3 test.py -f tools/taosdemoTestWithJson.py
python3 test.py -f tools/taosdemoTestLimitOffset.py
python3 test.py -f tools/taosdumpTest.py
python3 test.py -f tools/taosdemoTest2.py
python3 test.py -f tools/taosdemoTestSampleData.py
python3 test.py -f tools/taosdemoTestInterlace.py
# subscribe
python3 test.py -f subscribe/singlemeter.py
#python3 test.py -f subscribe/stability.py
......@@ -252,6 +242,18 @@ python3 test.py -f subscribe/supertable.py
#======================p3-end===============
#======================p4-start===============
# tools
python3 test.py -f tools/taosdumpTest.py
python3 test.py -f tools/taosdemoTest.py
python3 test.py -f tools/taosdemoTestWithoutMetric.py
python3 test.py -f tools/taosdemoTestWithJson.py
python3 test.py -f tools/taosdemoTestLimitOffset.py
python3 test.py -f tools/taosdemoTest2.py
python3 test.py -f tools/taosdemoTestSampleData.py
python3 test.py -f tools/taosdemoTestInterlace.py
python3 test.py -f tools/taosdemoTestQuery.py
python3 ./test.py -f update/merge_commit_data-0.py
# wal
python3 ./test.py -f wal/addOldWalTest.py
......
{
"filetype": "query",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"confirm_parameter_prompt": "no",
"databases": "test",
"query_times": 1,
"super_table_query": {
"stblname": "meters",
"query_interval": 10,
"threads": 8,
"sqls": [
{
"sql": "select last_row(ts) from xxxx",
"result": ""
}
]
}
}
###################################################################
##################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
......@@ -25,9 +25,6 @@ class TDTestCase:
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.numberOfTables = 10000
self.numberOfRecords = 100
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import os
import time
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
import subprocess
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.numberOfTables = 1000
self.numberOfRecords = 100
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def run(self):
tdSql.prepare()
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath + "/build/bin/"
os.system("%staosdemo -y -t %d -n %d" %
(binPath, self.numberOfTables, self.numberOfRecords))
print("Sleep 2 seconds..")
time.sleep(2)
os.system('%staosdemo -f tools/query.json ' % binPath)
# taosdemoCmd = '%staosdemo -f tools/query.json ' % binPath
# threads = subprocess.check_output(
# taosdemoCmd, shell=True).decode("utf-8")
# print("threads: %d" % int(threads))
# if (int(threads) != 8):
# caller = inspect.getframeinfo(inspect.stack()[0][0])
# tdLog.exit(
# "%s(%d) failed: expected threads 8, actual %d" %
# (caller.filename, caller.lineno, int(threads)))
#
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册