Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
bcbb6017
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
bcbb6017
编写于
4月 28, 2021
作者:
S
Steven Li
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/master' into feature/crash_gen2
上级
8a6fd8df
b6e41d48
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
205 addition
and
185 deletion
+205
-185
src/kit/taosdemo/taosdemo.c
src/kit/taosdemo/taosdemo.c
+205
-185
未找到文件。
src/kit/taosdemo/taosdemo.c
浏览文件 @
bcbb6017
...
...
@@ -210,13 +210,13 @@ typedef struct SArguments_S {
int
len_of_binary
;
int
num_of_CPR
;
int
num_of_threads
;
int
insert_interval
;
int
query_times
;
int
interlace_rows
;
int
num_of_RPR
;
int
max_sql_len
;
int
num_of_tables
;
int
num_of_DPT
;
int
64_t
insert_interval
;
int
64_t
query_times
;
int
64_t
interlace_rows
;
int
64_t
num_of_RPR
;
// num_of_records_per_req
int
64_t
max_sql_len
;
int
64_t
num_of_tables
;
int
64_t
num_of_DPT
;
int
abort
;
int
disorderRatio
;
// 0: no disorder, >0: x%
int
disorderRange
;
// ms or us by database precision
...
...
@@ -235,23 +235,23 @@ typedef struct SColumn_S {
typedef
struct
SSuperTable_S
{
char
sTblName
[
MAX_TB_NAME_SIZE
+
1
];
int
childTblCount
;
int
64_t
childTblCount
;
bool
childTblExists
;
// 0: no, 1: yes
int
batchCreateTableNum
;
// 0: no batch, > 0: batch table number in one sql
int
64_t
batchCreateTableNum
;
// 0: no batch, > 0: batch table number in one sql
int8_t
autoCreateTable
;
// 0: create sub table, 1: auto create sub table
char
childTblPrefix
[
MAX_TB_NAME_SIZE
];
char
dataSource
[
MAX_TB_NAME_SIZE
+
1
];
// rand_gen or sample
char
insertMode
[
MAX_TB_NAME_SIZE
];
// taosc, restful
int
childTblLimit
;
int
childTblOffset
;
int
64_t
childTblLimit
;
int
64_t
childTblOffset
;
int
multiThreadWriteOneTbl
;
// 0: no, 1: yes
int
interlaceRows
;
//
//
int multiThreadWriteOneTbl; // 0: no, 1: yes
int
64_t
interlaceRows
;
//
int
disorderRatio
;
// 0: no disorder, >0: x%
int
disorderRange
;
// ms or us by database precision
int
maxSqlLen
;
//
int
64_t
maxSqlLen
;
//
int
insertInterval
;
// insert interval, will override global insert interval
int
64_t
insertInterval
;
// insert interval, will override global insert interval
int64_t
insertRows
;
int64_t
timeStampStep
;
char
startTimestamp
[
MAX_TB_NAME_SIZE
];
...
...
@@ -266,8 +266,8 @@ typedef struct SSuperTable_S {
char
*
childTblName
;
char
*
colsOfCreateChildTable
;
int
lenOfOneRow
;
int
lenOfTagOfOneRow
;
int
64_t
lenOfOneRow
;
int
64_t
lenOfTagOfOneRow
;
char
*
sampleDataBuf
;
//int sampleRowCount;
...
...
@@ -279,8 +279,8 @@ typedef struct SSuperTable_S {
int
tagUsePos
;
// statistics
int64_t
totalInsertRows
;
int64_t
totalAffectedRows
;
int64_t
totalInsertRows
;
int64_t
totalAffectedRows
;
}
SSuperTable
;
typedef
struct
{
...
...
@@ -327,7 +327,7 @@ typedef struct SDataBase_S {
char
dbName
[
MAX_DB_NAME_SIZE
];
bool
drop
;
// 0: use exists, 1: if exists, drop then new create
SDbCfg
dbCfg
;
int
superTblCount
;
int
64_t
superTblCount
;
SSuperTable
superTbls
[
MAX_SUPER_TABLE_COUNT
];
}
SDataBase
;
...
...
@@ -349,44 +349,44 @@ typedef struct SDbs_S {
SDataBase
db
[
MAX_DB_COUNT
];
// statistics
int64_t
totalInsertRows
;
int64_t
totalAffectedRows
;
int64_t
totalInsertRows
;
int64_t
totalAffectedRows
;
}
SDbs
;
typedef
struct
SpecifiedQueryInfo_S
{
int
queryInterval
;
// 0: unlimit > 0 loop/s
int
concurrent
;
int
sqlCount
;
int
64_t
queryInterval
;
// 0: unlimit > 0 loop/s
int
64_t
concurrent
;
int
64_t
sqlCount
;
int
mode
;
// 0: sync, 1: async
int
subscribeInterval
;
// ms
int
queryTimes
;
int
64_t
subscribeInterval
;
// ms
int
64_t
queryTimes
;
int
subscribeRestart
;
int
subscribeKeepProgress
;
char
sql
[
MAX_QUERY_SQL_COUNT
][
MAX_QUERY_SQL_LENGTH
+
1
];
char
result
[
MAX_QUERY_SQL_COUNT
][
MAX_FILE_NAME_LEN
+
1
];
TAOS_SUB
*
tsub
[
MAX_QUERY_SQL_COUNT
];
int
totalQueried
;
int
64_t
totalQueried
;
}
SpecifiedQueryInfo
;
typedef
struct
SuperQueryInfo_S
{
char
sTblName
[
MAX_TB_NAME_SIZE
+
1
];
int
queryInterval
;
// 0: unlimit > 0 loop/s
int
64_t
queryInterval
;
// 0: unlimit > 0 loop/s
int
threadCnt
;
int
mode
;
// 0: sync, 1: async
int
subscribeInterval
;
// ms
int
64_t
subscribeInterval
;
// ms
int
subscribeRestart
;
int
subscribeKeepProgress
;
int
queryTimes
;
int
childTblCount
;
int
64_t
queryTimes
;
int
64_t
childTblCount
;
char
childTblPrefix
[
MAX_TB_NAME_SIZE
];
int
sqlCount
;
int
64_t
sqlCount
;
char
sql
[
MAX_QUERY_SQL_COUNT
][
MAX_QUERY_SQL_LENGTH
+
1
];
char
result
[
MAX_QUERY_SQL_COUNT
][
MAX_FILE_NAME_LEN
+
1
];
TAOS_SUB
*
tsub
[
MAX_QUERY_SQL_COUNT
];
char
*
childTblName
;
int
totalQueried
;
int
64_t
totalQueried
;
}
SuperQueryInfo
;
typedef
struct
SQueryMetaInfo_S
{
...
...
@@ -400,7 +400,7 @@ typedef struct SQueryMetaInfo_S {
SpecifiedQueryInfo
specifiedQueryInfo
;
SuperQueryInfo
superQueryInfo
;
int
totalQueried
;
int
64_t
totalQueried
;
}
SQueryMetaInfo
;
typedef
struct
SThreadInfo_S
{
...
...
@@ -410,11 +410,11 @@ typedef struct SThreadInfo_S {
uint32_t
time_precision
;
char
fp
[
4096
];
char
tb_prefix
[
MAX_TB_NAME_SIZE
];
int
start_table_from
;
int
end_table_to
;
int
ntables
;
int
data_of_rate
;
u
int64_t
start_time
;
int
64_t
start_table_from
;
int
64_t
end_table_to
;
int
64_t
ntables
;
int
64_t
data_of_rate
;
int64_t
start_time
;
char
*
cols
;
bool
use_metric
;
SSuperTable
*
superTblInfo
;
...
...
@@ -427,7 +427,7 @@ typedef struct SThreadInfo_S {
int64_t
lastTs
;
// sample data
int
samplePos
;
int
64_t
samplePos
;
// statistics
int64_t
totalInsertRows
;
int64_t
totalAffectedRows
;
...
...
@@ -440,7 +440,7 @@ typedef struct SThreadInfo_S {
int64_t
minDelay
;
// query
int
querySeq
;
// sequence number of sql command
int
64_t
querySeq
;
// sequence number of sql command
}
threadInfo
;
#ifdef WINDOWS
...
...
@@ -1001,13 +1001,18 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
break
;
printf
(
"
\n
"
);
}
printf
(
"# Insertion interval: %d
\n
"
,
arguments
->
insert_interval
);
printf
(
"# Number of records per req: %d
\n
"
,
arguments
->
num_of_RPR
);
printf
(
"# Max SQL length: %d
\n
"
,
arguments
->
max_sql_len
);
printf
(
"# Insertion interval: %"
PRId64
"
\n
"
,
arguments
->
insert_interval
);
printf
(
"# Number of records per req: %"
PRId64
"
\n
"
,
arguments
->
num_of_RPR
);
printf
(
"# Max SQL length: %"
PRId64
"
\n
"
,
arguments
->
max_sql_len
);
printf
(
"# Length of Binary: %d
\n
"
,
arguments
->
len_of_binary
);
printf
(
"# Number of Threads: %d
\n
"
,
arguments
->
num_of_threads
);
printf
(
"# Number of Tables: %d
\n
"
,
arguments
->
num_of_tables
);
printf
(
"# Number of Data per Table: %d
\n
"
,
arguments
->
num_of_DPT
);
printf
(
"# Number of Tables: %"
PRId64
"
\n
"
,
arguments
->
num_of_tables
);
printf
(
"# Number of Data per Table: %"
PRId64
"
\n
"
,
arguments
->
num_of_DPT
);
printf
(
"# Database name: %s
\n
"
,
arguments
->
database
);
printf
(
"# Table prefix: %s
\n
"
,
arguments
->
tb_prefix
);
if
(
arguments
->
disorderRatio
)
{
...
...
@@ -1244,9 +1249,12 @@ static int printfInsertMeta() {
printf
(
"resultFile:
\033
[33m%s
\033
[0m
\n
"
,
g_Dbs
.
resultFile
);
printf
(
"thread num of insert data:
\033
[33m%d
\033
[0m
\n
"
,
g_Dbs
.
threadCount
);
printf
(
"thread num of create table:
\033
[33m%d
\033
[0m
\n
"
,
g_Dbs
.
threadCountByCreateTbl
);
printf
(
"top insert interval:
\033
[33m%d
\033
[0m
\n
"
,
g_args
.
insert_interval
);
printf
(
"number of records per req:
\033
[33m%d
\033
[0m
\n
"
,
g_args
.
num_of_RPR
);
printf
(
"max sql length:
\033
[33m%d
\033
[0m
\n
"
,
g_args
.
max_sql_len
);
printf
(
"top insert interval:
\033
[33m%"
PRId64
"
\033
[0m
\n
"
,
g_args
.
insert_interval
);
printf
(
"number of records per req:
\033
[33m%"
PRId64
"
\033
[0m
\n
"
,
g_args
.
num_of_RPR
);
printf
(
"max sql length:
\033
[33m%"
PRId64
"
\033
[0m
\n
"
,
g_args
.
max_sql_len
);
printf
(
"database count:
\033
[33m%d
\033
[0m
\n
"
,
g_Dbs
.
dbCount
);
...
...
@@ -1307,10 +1315,10 @@ static int printfInsertMeta() {
}
}
printf
(
" super table count:
\033
[33m%
d
\033
[0m
\n
"
,
printf
(
" super table count:
\033
[33m%
"
PRId64
"
\033
[0m
\n
"
,
g_Dbs
.
db
[
i
].
superTblCount
);
for
(
int
j
=
0
;
j
<
g_Dbs
.
db
[
i
].
superTblCount
;
j
++
)
{
printf
(
" super table[
\033
[33m%
d
\033
[0m]:
\n
"
,
j
);
for
(
int
64_t
j
=
0
;
j
<
g_Dbs
.
db
[
i
].
superTblCount
;
j
++
)
{
printf
(
" super table[
\033
[33m%
"
PRId64
"
\033
[0m]:
\n
"
,
j
);
printf
(
" stbName:
\033
[33m%s
\033
[0m
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
sTblName
);
...
...
@@ -1331,7 +1339,7 @@ static int printfInsertMeta() {
printf
(
" childTblExists:
\033
[33m%s
\033
[0m
\n
"
,
"error"
);
}
printf
(
" childTblCount:
\033
[33m%
d
\033
[0m
\n
"
,
printf
(
" childTblCount:
\033
[33m%
"
PRId64
"
\033
[0m
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblCount
);
printf
(
" childTblPrefix:
\033
[33m%s
\033
[0m
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblPrefix
);
...
...
@@ -1340,26 +1348,27 @@ static int printfInsertMeta() {
printf
(
" insertMode:
\033
[33m%s
\033
[0m
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
insertMode
);
if
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblLimit
>
0
)
{
printf
(
" childTblLimit:
\033
[33m%
d
\033
[0m
\n
"
,
printf
(
" childTblLimit:
\033
[33m%
"
PRId64
"
\033
[0m
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblLimit
);
}
if
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblOffset
>=
0
)
{
printf
(
" childTblOffset:
\033
[33m%
d
\033
[0m
\n
"
,
printf
(
" childTblOffset:
\033
[33m%
"
PRId64
"
\033
[0m
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblOffset
);
}
printf
(
" insertRows:
\033
[33m%"
PRId64
"
\033
[0m
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
insertRows
);
/*
if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) {
printf(" multiThreadWriteOneTbl: \033[33mno\033[0m\n");
}else {
printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n");
}
printf
(
" interlaceRows:
\033
[33m%d
\033
[0m
\n
"
,
*/
printf
(
" interlaceRows:
\033
[33m%"
PRId64
"
\033
[0m
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
);
if
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
>
0
)
{
printf
(
" stable insert interval:
\033
[33m%
d
\033
[0m
\n
"
,
printf
(
" stable insert interval:
\033
[33m%
"
PRId64
"
\033
[0m
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
insertInterval
);
}
...
...
@@ -1367,7 +1376,7 @@ static int printfInsertMeta() {
g_Dbs
.
db
[
i
].
superTbls
[
j
].
disorderRange
);
printf
(
" disorderRatio:
\033
[33m%d
\033
[0m
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
disorderRatio
);
printf
(
" maxSqlLen:
\033
[33m%
d
\033
[0m
\n
"
,
printf
(
" maxSqlLen:
\033
[33m%
"
PRId64
"
\033
[0m
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
maxSqlLen
);
printf
(
" timeStampStep:
\033
[33m%"
PRId64
"
\033
[0m
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
timeStampStep
);
...
...
@@ -1433,8 +1442,8 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf
(
fp
,
"resultFile: %s
\n
"
,
g_Dbs
.
resultFile
);
fprintf
(
fp
,
"thread num of insert data: %d
\n
"
,
g_Dbs
.
threadCount
);
fprintf
(
fp
,
"thread num of create table: %d
\n
"
,
g_Dbs
.
threadCountByCreateTbl
);
fprintf
(
fp
,
"number of records per req: %
d
\n
"
,
g_args
.
num_of_RPR
);
fprintf
(
fp
,
"max sql length: %
d
\n
"
,
g_args
.
max_sql_len
);
fprintf
(
fp
,
"number of records per req: %
"
PRId64
"
\n
"
,
g_args
.
num_of_RPR
);
fprintf
(
fp
,
"max sql length: %
"
PRId64
"
\n
"
,
g_args
.
max_sql_len
);
fprintf
(
fp
,
"database count: %d
\n
"
,
g_Dbs
.
dbCount
);
for
(
int
i
=
0
;
i
<
g_Dbs
.
dbCount
;
i
++
)
{
...
...
@@ -1491,7 +1500,7 @@ static void printfInsertMetaToFile(FILE* fp) {
}
}
fprintf
(
fp
,
" super table count: %
d
\n
"
,
g_Dbs
.
db
[
i
].
superTblCount
);
fprintf
(
fp
,
" super table count: %
"
PRId64
"
\n
"
,
g_Dbs
.
db
[
i
].
superTblCount
);
for
(
int
j
=
0
;
j
<
g_Dbs
.
db
[
i
].
superTblCount
;
j
++
)
{
fprintf
(
fp
,
" super table[%d]:
\n
"
,
j
);
...
...
@@ -1513,7 +1522,7 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf
(
fp
,
" childTblExists: %s
\n
"
,
"error"
);
}
fprintf
(
fp
,
" childTblCount: %
d
\n
"
,
fprintf
(
fp
,
" childTblCount: %
"
PRId64
"
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblCount
);
fprintf
(
fp
,
" childTblPrefix: %s
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblPrefix
);
...
...
@@ -1523,26 +1532,30 @@ static void printfInsertMetaToFile(FILE* fp) {
g_Dbs
.
db
[
i
].
superTbls
[
j
].
insertMode
);
fprintf
(
fp
,
" insertRows: %"
PRId64
"
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
insertRows
);
fprintf
(
fp
,
" interlace rows: %
d
\n
"
,
fprintf
(
fp
,
" interlace rows: %
"
PRId64
"
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
);
if
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
>
0
)
{
fprintf
(
fp
,
" stable insert interval: %
d
\n
"
,
fprintf
(
fp
,
" stable insert interval: %
"
PRId64
"
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
insertInterval
);
}
/*
if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) {
fprintf(fp, " multiThreadWriteOneTbl: no\n");
}else {
fprintf(fp, " multiThreadWriteOneTbl: yes\n");
}
fprintf
(
fp
,
" interlaceRows: %d
\n
"
,
*/
fprintf
(
fp
,
" interlaceRows: %"
PRId64
"
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
);
fprintf
(
fp
,
" disorderRange: %d
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
disorderRange
);
fprintf
(
fp
,
" disorderRatio: %d
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
disorderRatio
);
fprintf
(
fp
,
" maxSqlLen: %d
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
maxSqlLen
);
fprintf
(
fp
,
" maxSqlLen: %"
PRId64
"
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
maxSqlLen
);
fprintf
(
fp
,
" timeStampStep: %"
PRId64
"
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
timeStampStep
);
fprintf
(
fp
,
" startTimestamp: %s
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
startTimestamp
);
fprintf
(
fp
,
" timeStampStep: %"
PRId64
"
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
timeStampStep
);
fprintf
(
fp
,
" startTimestamp: %s
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
startTimestamp
);
fprintf
(
fp
,
" sampleFormat: %s
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
sampleFormat
);
fprintf
(
fp
,
" sampleFile: %s
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
sampleFile
);
fprintf
(
fp
,
" tagsFile: %s
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
tagsFile
);
...
...
@@ -1597,21 +1610,21 @@ static void printfQueryMeta() {
printf
(
"
\n
"
);
printf
(
"specified table query info:
\n
"
);
printf
(
"query interval:
\033
[33m%
d
ms
\033
[0m
\n
"
,
printf
(
"query interval:
\033
[33m%
"
PRId64
"
ms
\033
[0m
\n
"
,
g_queryInfo
.
specifiedQueryInfo
.
queryInterval
);
printf
(
"top query times:
\033
[33m%
d
\033
[0m
\n
"
,
g_args
.
query_times
);
printf
(
"concurrent:
\033
[33m%
d
\033
[0m
\n
"
,
printf
(
"top query times:
\033
[33m%
"
PRId64
"
\033
[0m
\n
"
,
g_args
.
query_times
);
printf
(
"concurrent:
\033
[33m%
"
PRId64
"
\033
[0m
\n
"
,
g_queryInfo
.
specifiedQueryInfo
.
concurrent
);
printf
(
"sqlCount:
\033
[33m%
d
\033
[0m
\n
"
,
printf
(
"sqlCount:
\033
[33m%
"
PRId64
"
\033
[0m
\n
"
,
g_queryInfo
.
specifiedQueryInfo
.
sqlCount
);
printf
(
"specified tbl query times:
\n
"
);
printf
(
"
\033
[33m%
d
\033
[0m
\n
"
,
printf
(
"
\033
[33m%
"
PRId64
"
\033
[0m
\n
"
,
g_queryInfo
.
specifiedQueryInfo
.
queryTimes
);
if
(
SUBSCRIBE_TEST
==
g_args
.
test_mode
)
{
printf
(
"mod:
\033
[33m%d
\033
[0m
\n
"
,
g_queryInfo
.
specifiedQueryInfo
.
mode
);
printf
(
"interval:
\033
[33m%
d
\033
[0m
\n
"
,
printf
(
"interval:
\033
[33m%
"
PRId64
"
\033
[0m
\n
"
,
g_queryInfo
.
specifiedQueryInfo
.
subscribeInterval
);
printf
(
"restart:
\033
[33m%d
\033
[0m
\n
"
,
g_queryInfo
.
specifiedQueryInfo
.
subscribeRestart
);
...
...
@@ -1619,27 +1632,27 @@ static void printfQueryMeta() {
g_queryInfo
.
specifiedQueryInfo
.
subscribeKeepProgress
);
}
for
(
int
i
=
0
;
i
<
g_queryInfo
.
specifiedQueryInfo
.
sqlCount
;
i
++
)
{
printf
(
" sql[%
d
]:
\033
[33m%s
\033
[0m
\n
"
,
for
(
int
64_t
i
=
0
;
i
<
g_queryInfo
.
specifiedQueryInfo
.
sqlCount
;
i
++
)
{
printf
(
" sql[%
"
PRId64
"
]:
\033
[33m%s
\033
[0m
\n
"
,
i
,
g_queryInfo
.
specifiedQueryInfo
.
sql
[
i
]);
}
printf
(
"
\n
"
);
printf
(
"super table query info:
\n
"
);
printf
(
"query interval:
\033
[33m%
d
\033
[0m
\n
"
,
printf
(
"query interval:
\033
[33m%
"
PRId64
"
\033
[0m
\n
"
,
g_queryInfo
.
superQueryInfo
.
queryInterval
);
printf
(
"threadCnt:
\033
[33m%d
\033
[0m
\n
"
,
g_queryInfo
.
superQueryInfo
.
threadCnt
);
printf
(
"childTblCount:
\033
[33m%
d
\033
[0m
\n
"
,
printf
(
"childTblCount:
\033
[33m%
"
PRId64
"
\033
[0m
\n
"
,
g_queryInfo
.
superQueryInfo
.
childTblCount
);
printf
(
"stable name:
\033
[33m%s
\033
[0m
\n
"
,
g_queryInfo
.
superQueryInfo
.
sTblName
);
printf
(
"stb query times:
\033
[33m%
d
\033
[0m
\n
"
,
printf
(
"stb query times:
\033
[33m%
"
PRId64
"
\033
[0m
\n
"
,
g_queryInfo
.
superQueryInfo
.
queryTimes
);
if
(
SUBSCRIBE_TEST
==
g_args
.
test_mode
)
{
printf
(
"mod:
\033
[33m%d
\033
[0m
\n
"
,
g_queryInfo
.
superQueryInfo
.
mode
);
printf
(
"interval:
\033
[33m%
d
\033
[0m
\n
"
,
printf
(
"interval:
\033
[33m%
"
PRId64
"
\033
[0m
\n
"
,
g_queryInfo
.
superQueryInfo
.
subscribeInterval
);
printf
(
"restart:
\033
[33m%d
\033
[0m
\n
"
,
g_queryInfo
.
superQueryInfo
.
subscribeRestart
);
...
...
@@ -1647,7 +1660,7 @@ static void printfQueryMeta() {
g_queryInfo
.
superQueryInfo
.
subscribeKeepProgress
);
}
printf
(
"sqlCount:
\033
[33m%
d
\033
[0m
\n
"
,
printf
(
"sqlCount:
\033
[33m%
"
PRId64
"
\033
[0m
\n
"
,
g_queryInfo
.
superQueryInfo
.
sqlCount
);
for
(
int
i
=
0
;
i
<
g_queryInfo
.
superQueryInfo
.
sqlCount
;
i
++
)
{
printf
(
" sql[%d]:
\033
[33m%s
\033
[0m
\n
"
,
...
...
@@ -2278,7 +2291,7 @@ static int calcRowLen(SSuperTable* superTbls) {
static
int
getChildNameOfSuperTableWithLimitAndOffset
(
TAOS
*
taos
,
char
*
dbName
,
char
*
sTblName
,
char
**
childTblNameOfSuperTbl
,
int
*
childTblCountOfSuperTbl
,
int
limit
,
in
t
offset
)
{
int
64_t
*
childTblCountOfSuperTbl
,
int64_t
limit
,
int64_
t
offset
)
{
char
command
[
BUFFER_SIZE
]
=
"
\0
"
;
char
limitBuf
[
100
]
=
"
\0
"
;
...
...
@@ -2289,7 +2302,8 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
char
*
childTblName
=
*
childTblNameOfSuperTbl
;
if
(
offset
>=
0
)
{
snprintf
(
limitBuf
,
100
,
" limit %d offset %d"
,
limit
,
offset
);
snprintf
(
limitBuf
,
100
,
" limit %"
PRId64
" offset %"
PRId64
""
,
limit
,
offset
);
}
//get all child table name use cmd: select tbname from superTblName;
...
...
@@ -2354,7 +2368,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
static
int
getAllChildNameOfSuperTable
(
TAOS
*
taos
,
char
*
dbName
,
char
*
sTblName
,
char
**
childTblNameOfSuperTbl
,
int
*
childTblCountOfSuperTbl
)
{
int
64_t
*
childTblCountOfSuperTbl
)
{
return
getChildNameOfSuperTableWithLimitAndOffset
(
taos
,
dbName
,
sTblName
,
childTblNameOfSuperTbl
,
childTblCountOfSuperTbl
,
...
...
@@ -2694,7 +2708,7 @@ static int createDatabasesAndStables() {
printf
(
"
\n
create database %s success!
\n\n
"
,
g_Dbs
.
db
[
i
].
dbName
);
}
debugPrint
(
"%s()
%d supertbl count:%d
\n
"
,
debugPrint
(
"%s()
LN%d supertbl count:%"
PRId64
"
\n
"
,
__func__
,
__LINE__
,
g_Dbs
.
db
[
i
].
superTblCount
);
int
validStbCount
=
0
;
...
...
@@ -2753,14 +2767,15 @@ static void* createTable(void *sarg)
int
len
=
0
;
int
batchNum
=
0
;
verbosePrint
(
"%s() LN%d: Creating table from %
d to %d
\n
"
,
verbosePrint
(
"%s() LN%d: Creating table from %
"
PRId64
" to %"
PRId64
"
\n
"
,
__func__
,
__LINE__
,
pThreadInfo
->
start_table_from
,
pThreadInfo
->
end_table_to
);
for
(
int
i
=
pThreadInfo
->
start_table_from
;
i
<=
pThreadInfo
->
end_table_to
;
i
++
)
{
for
(
int64_t
i
=
pThreadInfo
->
start_table_from
;
i
<=
pThreadInfo
->
end_table_to
;
i
++
)
{
if
(
0
==
g_Dbs
.
use_metric
)
{
snprintf
(
buffer
,
buff_len
,
"create table if not exists %s.%s%
d
%s;"
,
"create table if not exists %s.%s%
"
PRId64
"
%s;"
,
pThreadInfo
->
db_name
,
g_args
.
tb_prefix
,
i
,
pThreadInfo
->
cols
);
...
...
@@ -2791,7 +2806,7 @@ static void* createTable(void *sarg)
}
len
+=
snprintf
(
buffer
+
len
,
buff_len
-
len
,
"if not exists %s.%s%
d
using %s.%s tags %s "
,
"if not exists %s.%s%
"
PRId64
"
using %s.%s tags %s "
,
pThreadInfo
->
db_name
,
superTblInfo
->
childTblPrefix
,
i
,
pThreadInfo
->
db_name
,
superTblInfo
->
sTblName
,
tagsValBuf
);
...
...
@@ -2815,7 +2830,7 @@ static void* createTable(void *sarg)
int64_t
currentPrintTime
=
taosGetTimestampMs
();
if
(
currentPrintTime
-
lastPrintTime
>
30
*
1000
)
{
printf
(
"thread[%d] already create %
d - %d
tables
\n
"
,
printf
(
"thread[%d] already create %
"
PRId64
" - %"
PRId64
"
tables
\n
"
,
pThreadInfo
->
threadID
,
pThreadInfo
->
start_table_from
,
i
);
lastPrintTime
=
currentPrintTime
;
}
...
...
@@ -2833,7 +2848,7 @@ static void* createTable(void *sarg)
}
static
int
startMultiThreadCreateChildTable
(
char
*
cols
,
int
threads
,
int
startFrom
,
in
t
ntables
,
char
*
cols
,
int
threads
,
int
64_t
startFrom
,
int64_
t
ntables
,
char
*
db_name
,
SSuperTable
*
superTblInfo
)
{
pthread_t
*
pids
=
malloc
(
threads
*
sizeof
(
pthread_t
));
...
...
@@ -2848,16 +2863,16 @@ static int startMultiThreadCreateChildTable(
threads
=
1
;
}
int
a
=
ntables
/
threads
;
int
64_t
a
=
ntables
/
threads
;
if
(
a
<
1
)
{
threads
=
ntables
;
a
=
1
;
}
int
b
=
0
;
int
64_t
b
=
0
;
b
=
ntables
%
threads
;
for
(
int
i
=
0
;
i
<
threads
;
i
++
)
{
for
(
int
64_t
i
=
0
;
i
<
threads
;
i
++
)
{
threadInfo
*
t_info
=
infos
+
i
;
t_info
->
threadID
=
i
;
tstrncpy
(
t_info
->
db_name
,
db_name
,
MAX_DB_NAME_SIZE
);
...
...
@@ -2949,7 +2964,7 @@ static void createChildTables() {
snprintf
(
tblColsBuf
+
len
,
MAX_SQL_SIZE
-
len
,
")"
);
verbosePrint
(
"%s() LN%d: dbName: %s num of tb: %
d
schema: %s
\n
"
,
verbosePrint
(
"%s() LN%d: dbName: %s num of tb: %
"
PRId64
"
schema: %s
\n
"
,
__func__
,
__LINE__
,
g_Dbs
.
db
[
i
].
dbName
,
g_args
.
num_of_tables
,
tblColsBuf
);
startMultiThreadCreateChildTable
(
...
...
@@ -3077,7 +3092,7 @@ static int readSampleFromCsvFileToMem(
}
if
(
readLen
>
superTblInfo
->
lenOfOneRow
)
{
printf
(
"sample row len[%d] overflow define schema len[%
d
], so discard this row
\n
"
,
printf
(
"sample row len[%d] overflow define schema len[%
"
PRId64
"
], so discard this row
\n
"
,
(
int32_t
)
readLen
,
superTblInfo
->
lenOfOneRow
);
continue
;
}
...
...
@@ -3344,9 +3359,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
// rows per table need be less than insert batch
if
(
g_args
.
interlace_rows
>
g_args
.
num_of_RPR
)
{
printf
(
"NOTICE: interlace rows value %
d > num_of_records_per_req %d
\n\n
"
,
printf
(
"NOTICE: interlace rows value %
"
PRId64
" > num_of_records_per_req %"
PRId64
"
\n\n
"
,
g_args
.
interlace_rows
,
g_args
.
num_of_RPR
);
printf
(
" interlace rows value will be set to num_of_records_per_req %
d
\n\n
"
,
printf
(
" interlace rows value will be set to num_of_records_per_req %
"
PRId64
"
\n\n
"
,
g_args
.
num_of_RPR
);
printf
(
" press Enter key to continue or Ctrl-C to stop."
);
(
void
)
getchar
();
...
...
@@ -3375,7 +3390,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
if
(
numRecPerReq
&&
numRecPerReq
->
type
==
cJSON_Number
)
{
g_args
.
num_of_RPR
=
numRecPerReq
->
valueint
;
}
else
if
(
!
numRecPerReq
)
{
g_args
.
num_of_RPR
=
INT
32
_MAX
;
g_args
.
num_of_RPR
=
INT
64
_MAX
;
}
else
{
errorPrint
(
"%s() LN%d, failed to read json, num_of_records_per_req not found
\n
"
,
__func__
,
__LINE__
);
...
...
@@ -3847,9 +3862,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
=
interlaceRows
->
valueint
;
// rows per table need be less than insert batch
if
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
>
g_args
.
num_of_RPR
)
{
printf
(
"NOTICE: db[%d].superTbl[%d]'s interlace rows value %
d > num_of_records_per_req %d
\n\n
"
,
printf
(
"NOTICE: db[%d].superTbl[%d]'s interlace rows value %
"
PRId64
" > num_of_records_per_req %"
PRId64
"
\n\n
"
,
i
,
j
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
,
g_args
.
num_of_RPR
);
printf
(
" interlace rows value will be set to num_of_records_per_req %
d
\n\n
"
,
printf
(
" interlace rows value will be set to num_of_records_per_req %
"
PRId64
"
\n\n
"
,
g_args
.
num_of_RPR
);
printf
(
" press Enter key to continue or Ctrl-C to stop."
);
(
void
)
getchar
();
...
...
@@ -3905,7 +3920,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
if
(
insertInterval
&&
insertInterval
->
type
==
cJSON_Number
)
{
g_Dbs
.
db
[
i
].
superTbls
[
j
].
insertInterval
=
insertInterval
->
valueint
;
}
else
if
(
!
insertInterval
)
{
verbosePrint
(
"%s() LN%d: stable insert interval be overrided by global %
d
.
\n
"
,
verbosePrint
(
"%s() LN%d: stable insert interval be overrided by global %
"
PRId64
"
.
\n
"
,
__func__
,
__LINE__
,
g_args
.
insert_interval
);
g_Dbs
.
db
[
i
].
superTbls
[
j
].
insertInterval
=
g_args
.
insert_interval
;
}
else
{
...
...
@@ -4045,7 +4060,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if
(
concurrent
&&
concurrent
->
type
==
cJSON_Number
)
{
g_queryInfo
.
specifiedQueryInfo
.
concurrent
=
concurrent
->
valueint
;
if
(
g_queryInfo
.
specifiedQueryInfo
.
concurrent
<=
0
)
{
errorPrint
(
"%s() LN%d, query sqlCount %
d or concurrent %d
is not correct.
\n
"
,
errorPrint
(
"%s() LN%d, query sqlCount %
"
PRId64
" or concurrent %"
PRId64
"
is not correct.
\n
"
,
__func__
,
__LINE__
,
g_queryInfo
.
specifiedQueryInfo
.
sqlCount
,
g_queryInfo
.
specifiedQueryInfo
.
concurrent
);
goto
PARSE_OVER
;
...
...
@@ -4410,8 +4425,9 @@ static void postFreeResource() {
}
}
static
int
getRowDataFromSample
(
char
*
dataBuf
,
int
maxLen
,
int64_t
timestamp
,
SSuperTable
*
superTblInfo
,
int
*
sampleUsePos
)
{
static
int
getRowDataFromSample
(
char
*
dataBuf
,
int64_t
maxLen
,
int64_t
timestamp
,
SSuperTable
*
superTblInfo
,
int64_t
*
sampleUsePos
)
{
if
((
*
sampleUsePos
)
==
MAX_SAMPLES_ONCE_FROM_FILE
)
{
/* int ret = readSampleFromCsvFileToMem(superTblInfo);
if (0 != ret) {
...
...
@@ -4436,10 +4452,10 @@ static int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp,
return
dataLen
;
}
static
int
generateRowData
(
char
*
recBuf
,
int64_t
timestamp
,
SSuperTable
*
stbInfo
)
{
int
dataLen
=
0
;
static
int
64_t
generateRowData
(
char
*
recBuf
,
int64_t
timestamp
,
SSuperTable
*
stbInfo
)
{
int
64_t
dataLen
=
0
;
char
*
pstr
=
recBuf
;
int
maxLen
=
MAX_DATA_SIZE
;
int
64_t
maxLen
=
MAX_DATA_SIZE
;
dataLen
+=
snprintf
(
pstr
+
dataLen
,
maxLen
-
dataLen
,
"(%"
PRId64
", "
,
timestamp
);
...
...
@@ -4506,7 +4522,7 @@ static int generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stbInfo
return
strlen
(
recBuf
);
}
static
int
32
_t
generateData
(
char
*
recBuf
,
char
**
data_type
,
static
int
64
_t
generateData
(
char
*
recBuf
,
char
**
data_type
,
int
num_of_cols
,
int64_t
timestamp
,
int
lenOfBinary
)
{
memset
(
recBuf
,
0
,
MAX_DATA_SIZE
);
char
*
pstr
=
recBuf
;
...
...
@@ -4572,7 +4588,7 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
sampleDataBuf
=
calloc
(
superTblInfo
->
lenOfOneRow
*
MAX_SAMPLES_ONCE_FROM_FILE
,
1
);
if
(
sampleDataBuf
==
NULL
)
{
errorPrint
(
"%s() LN%d, Failed to calloc %
d
Bytes, reason:%s
\n
"
,
errorPrint
(
"%s() LN%d, Failed to calloc %
"
PRId64
"
Bytes, reason:%s
\n
"
,
__func__
,
__LINE__
,
superTblInfo
->
lenOfOneRow
*
MAX_SAMPLES_ONCE_FROM_FILE
,
strerror
(
errno
));
...
...
@@ -4593,7 +4609,7 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
return
0
;
}
static
int
execInsert
(
threadInfo
*
pThreadInfo
,
char
*
buffer
,
int
k
)
static
int
64_t
execInsert
(
threadInfo
*
pThreadInfo
,
char
*
buffer
,
int
k
)
{
int
affectedRows
;
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
...
...
@@ -4619,7 +4635,7 @@ static int execInsert(threadInfo *pThreadInfo, char *buffer, int k)
return
affectedRows
;
}
static
void
getTableName
(
char
*
pTblName
,
threadInfo
*
pThreadInfo
,
int
tableSeq
)
static
void
getTableName
(
char
*
pTblName
,
threadInfo
*
pThreadInfo
,
int
64_t
tableSeq
)
{
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
if
(
superTblInfo
)
{
...
...
@@ -4630,7 +4646,7 @@ static void getTableName(char *pTblName, threadInfo* pThreadInfo, int tableSeq)
(
tableSeq
-
superTblInfo
->
childTblOffset
)
*
TSDB_TABLE_NAME_LEN
);
}
else
{
verbosePrint
(
"[%d] %s() LN%d: from=%
d count=%d seq=%d
\n
"
,
verbosePrint
(
"[%d] %s() LN%d: from=%
"
PRId64
" count=%"
PRId64
" seq=%"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
pThreadInfo
->
start_table_from
,
pThreadInfo
->
ntables
,
tableSeq
);
...
...
@@ -4638,16 +4654,16 @@ static void getTableName(char *pTblName, threadInfo* pThreadInfo, int tableSeq)
superTblInfo
->
childTblName
+
tableSeq
*
TSDB_TABLE_NAME_LEN
);
}
}
else
{
snprintf
(
pTblName
,
TSDB_TABLE_NAME_LEN
,
"%s%
d
"
,
snprintf
(
pTblName
,
TSDB_TABLE_NAME_LEN
,
"%s%
"
PRId64
"
"
,
g_args
.
tb_prefix
,
tableSeq
);
}
}
static
int
generateDataTail
(
static
int
64_t
generateDataTail
(
SSuperTable
*
superTblInfo
,
int
batch
,
char
*
buffer
,
in
t
remainderBufLen
,
int64_t
insertRows
,
int64_t
startFrom
,
int64_t
startTime
,
int
*
pSamplePos
,
in
t
*
dataLen
)
{
int
len
=
0
;
int
64_t
batch
,
char
*
buffer
,
int64_
t
remainderBufLen
,
int64_t
insertRows
,
int64_t
startFrom
,
int64_t
startTime
,
int
64_t
*
pSamplePos
,
int64_
t
*
dataLen
)
{
int
64_t
len
=
0
;
int
ncols_per_record
=
1
;
// count first col ts
char
*
pstr
=
buffer
;
...
...
@@ -4660,14 +4676,14 @@ static int generateDataTail(
}
}
verbosePrint
(
"%s() LN%d batch=%
d
\n
"
,
__func__
,
__LINE__
,
batch
);
verbosePrint
(
"%s() LN%d batch=%
"
PRId64
"
\n
"
,
__func__
,
__LINE__
,
batch
);
int
k
=
0
;
int
64_t
k
=
0
;
for
(
k
=
0
;
k
<
batch
;)
{
char
data
[
MAX_DATA_SIZE
];
memset
(
data
,
0
,
MAX_DATA_SIZE
);
int
retLen
=
0
;
int
64_t
retLen
=
0
;
if
(
superTblInfo
)
{
if
(
0
==
strncasecmp
(
superTblInfo
->
dataSource
,
...
...
@@ -4681,16 +4697,16 @@ static int generateDataTail(
}
else
if
(
0
==
strncasecmp
(
superTblInfo
->
dataSource
,
"rand"
,
strlen
(
"rand"
)))
{
int
randTail
=
superTblInfo
->
timeStampStep
*
k
;
int
64_t
randTail
=
superTblInfo
->
timeStampStep
*
k
;
if
(
superTblInfo
->
disorderRatio
>
0
)
{
int
rand_num
=
taosRandom
()
%
100
;
if
(
rand_num
<
superTblInfo
->
disorderRatio
)
{
randTail
=
(
randTail
+
(
taosRandom
()
%
superTblInfo
->
disorderRange
+
1
))
*
(
-
1
);
debugPrint
(
"rand data generated, back %
d
\n
"
,
randTail
);
debugPrint
(
"rand data generated, back %
"
PRId64
"
\n
"
,
randTail
);
}
}
u
int64_t
d
=
startTime
int64_t
d
=
startTime
+
randTail
;
retLen
=
generateRowData
(
data
,
...
...
@@ -4710,14 +4726,15 @@ static int generateDataTail(
char
**
data_type
=
g_args
.
datatype
;
int
lenOfBinary
=
g_args
.
len_of_binary
;
int
rand_num
=
taosRandom
()
%
100
;
int
randTail
;
int64_t
randTail
=
DEFAULT_TIMESTAMP_STEP
*
k
;
if
(
g_args
.
disorderRatio
!=
0
)
{
int
rand_num
=
taosRandom
()
%
100
;
if
(
rand_num
<
g_args
.
disorderRatio
)
{
randTail
=
(
randTail
+
(
taosRandom
()
%
g_args
.
disorderRange
+
1
))
*
(
-
1
);
if
((
g_args
.
disorderRatio
!=
0
)
&&
(
rand_num
<
g_args
.
disorderRatio
))
{
randTail
=
(
DEFAULT_TIMESTAMP_STEP
*
k
+
(
taosRandom
()
%
g_args
.
disorderRange
+
1
))
*
(
-
1
);
debugPrint
(
"rand data generated, back %d
\n
"
,
randTail
);
debugPrint
(
"rand data generated, back %"
PRId64
"
\n
"
,
randTail
);
}
}
else
{
randTail
=
DEFAULT_TIMESTAMP_STEP
*
k
;
}
...
...
@@ -4736,7 +4753,7 @@ static int generateDataTail(
remainderBufLen
-=
retLen
;
}
verbosePrint
(
"%s() LN%d len=%
d k=%d
\n
buffer=%s
\n
"
,
verbosePrint
(
"%s() LN%d len=%
"
PRId64
" k=%"
PRId64
"
\n
buffer=%s
\n
"
,
__func__
,
__LINE__
,
len
,
k
,
buffer
);
startFrom
++
;
...
...
@@ -4817,13 +4834,13 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
return
len
;
}
static
int
generateInterlaceDataBuffer
(
char
*
tableName
,
int
batchPerTbl
,
int
i
,
in
t
batchPerTblTimes
,
int
32
_t
tableSeq
,
static
int
64_t
generateInterlaceDataBuffer
(
char
*
tableName
,
int
64_t
batchPerTbl
,
int64_t
i
,
int64_
t
batchPerTblTimes
,
int
64
_t
tableSeq
,
threadInfo
*
pThreadInfo
,
char
*
buffer
,
int64_t
insertRows
,
int64_t
startTime
,
int
*
pRemainderBufLen
)
int
64_t
*
pRemainderBufLen
)
{
assert
(
buffer
);
char
*
pstr
=
buffer
;
...
...
@@ -4836,15 +4853,15 @@ static int generateInterlaceDataBuffer(
return
0
;
}
// generate data buffer
verbosePrint
(
"[%d] %s() LN%d i=%
d
buffer:
\n
%s
\n
"
,
verbosePrint
(
"[%d] %s() LN%d i=%
"
PRId64
"
buffer:
\n
%s
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
i
,
buffer
);
pstr
+=
headLen
;
*
pRemainderBufLen
-=
headLen
;
int
dataLen
=
0
;
int
64_t
dataLen
=
0
;
verbosePrint
(
"[%d] %s() LN%d i=%
d batchPerTblTimes=%d batchPerTbl = %d
\n
"
,
verbosePrint
(
"[%d] %s() LN%d i=%
"
PRId64
" batchPerTblTimes=%"
PRId64
" batchPerTbl = %"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
i
,
batchPerTblTimes
,
batchPerTbl
);
...
...
@@ -4856,7 +4873,7 @@ static int generateInterlaceDataBuffer(
startTime
=
1500000000000
;
}
int
k
=
generateDataTail
(
int
64_t
k
=
generateDataTail
(
superTblInfo
,
batchPerTbl
,
pstr
,
*
pRemainderBufLen
,
insertRows
,
0
,
startTime
,
...
...
@@ -4866,7 +4883,7 @@ static int generateInterlaceDataBuffer(
pstr
+=
dataLen
;
*
pRemainderBufLen
-=
dataLen
;
}
else
{
debugPrint
(
"%s() LN%d, generated data tail: %
d, not equal batch per table: %d
\n
"
,
debugPrint
(
"%s() LN%d, generated data tail: %
"
PRId64
", not equal batch per table: %"
PRId64
"
\n
"
,
__func__
,
__LINE__
,
k
,
batchPerTbl
);
pstr
-=
headLen
;
pstr
[
0
]
=
'\0'
;
...
...
@@ -4878,11 +4895,11 @@ static int generateInterlaceDataBuffer(
static
int
generateProgressiveDataBuffer
(
char
*
tableName
,
int
32
_t
tableSeq
,
int
64
_t
tableSeq
,
threadInfo
*
pThreadInfo
,
char
*
buffer
,
int64_t
insertRows
,
int64_t
startFrom
,
int64_t
startTime
,
int
*
pSamplePos
,
int
*
pRemainderBufLen
)
int64_t
startFrom
,
int64_t
startTime
,
int
64_t
*
pSamplePos
,
int
64_t
*
pRemainderBufLen
)
{
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
...
...
@@ -4899,11 +4916,11 @@ static int generateProgressiveDataBuffer(
assert
(
buffer
!=
NULL
);
char
*
pstr
=
buffer
;
int
k
=
0
;
int
64_t
k
=
0
;
memset
(
buffer
,
0
,
*
pRemainderBufLen
);
int
headLen
=
generateSQLHead
(
tableName
,
tableSeq
,
pThreadInfo
,
superTblInfo
,
int
64_t
headLen
=
generateSQLHead
(
tableName
,
tableSeq
,
pThreadInfo
,
superTblInfo
,
buffer
,
*
pRemainderBufLen
);
if
(
headLen
<=
0
)
{
...
...
@@ -4912,7 +4929,7 @@ static int generateProgressiveDataBuffer(
pstr
+=
headLen
;
*
pRemainderBufLen
-=
headLen
;
int
dataLen
;
int
64_t
dataLen
;
k
=
generateDataTail
(
superTblInfo
,
g_args
.
num_of_RPR
,
pstr
,
*
pRemainderBufLen
,
insertRows
,
startFrom
,
startTime
,
...
...
@@ -4926,7 +4943,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo
->
threadID
,
__func__
,
__LINE__
);
int64_t
insertRows
;
int
interlaceRows
;
int
64_t
interlaceRows
;
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
...
...
@@ -4961,10 +4978,10 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
// TODO: prompt tbl count multple interlace rows and batch
//
int
maxSqlLen
=
superTblInfo
?
superTblInfo
->
maxSqlLen
:
g_args
.
max_sql_len
;
int
64_t
maxSqlLen
=
superTblInfo
?
superTblInfo
->
maxSqlLen
:
g_args
.
max_sql_len
;
char
*
buffer
=
calloc
(
maxSqlLen
,
1
);
if
(
NULL
==
buffer
)
{
errorPrint
(
"%s() LN%d, Failed to alloc %
d
Bytes, reason:%s
\n
"
,
errorPrint
(
"%s() LN%d, Failed to alloc %
"
PRId64
"
Bytes, reason:%s
\n
"
,
__func__
,
__LINE__
,
maxSqlLen
,
strerror
(
errno
));
return
NULL
;
}
...
...
@@ -4978,16 +4995,16 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int
insert_interval
=
superTblInfo
?
superTblInfo
->
insertInterval
:
g_args
.
insert_interval
;
u
int64_t
st
=
0
;
u
int64_t
et
=
0xffffffff
;
int64_t
st
=
0
;
int64_t
et
=
0xffffffff
;
int64_t
lastPrintTime
=
taosGetTimestampMs
();
int64_t
startTs
=
taosGetTimestampMs
();
int64_t
endTs
;
int
tableSeq
=
pThreadInfo
->
start_table_from
;
int
64_t
tableSeq
=
pThreadInfo
->
start_table_from
;
debugPrint
(
"[%d] %s() LN%d: start_table_from=%
d ntables=%d
insertRows=%"
PRId64
"
\n
"
,
debugPrint
(
"[%d] %s() LN%d: start_table_from=%
"
PRId64
" ntables=%"
PRId64
"
insertRows=%"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
pThreadInfo
->
start_table_from
,
pThreadInfo
->
ntables
,
insertRows
);
...
...
@@ -4995,9 +5012,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
assert
(
pThreadInfo
->
ntables
>
0
);
int
batchPerTbl
=
interlaceRows
;
int
64_t
batchPerTbl
=
interlaceRows
;
int
batchPerTblTimes
;
int
64_t
batchPerTblTimes
;
if
((
interlaceRows
>
0
)
&&
(
pThreadInfo
->
ntables
>
1
))
{
batchPerTblTimes
=
g_args
.
num_of_RPR
/
interlaceRows
;
...
...
@@ -5005,9 +5022,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
batchPerTblTimes
=
1
;
}
int
generatedRecPerTbl
=
0
;
int
64_t
generatedRecPerTbl
=
0
;
bool
flagSleep
=
true
;
int
sleepTimeTotal
=
0
;
int
64_t
sleepTimeTotal
=
0
;
char
*
strInsertInto
=
"insert into "
;
int
nInsertBufLen
=
strlen
(
strInsertInto
);
...
...
@@ -5019,7 +5036,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
}
// generate data
memset
(
buffer
,
0
,
maxSqlLen
);
int
remainderBufLen
=
maxSqlLen
;
int
64_t
remainderBufLen
=
maxSqlLen
;
char
*
pstr
=
buffer
;
...
...
@@ -5027,9 +5044,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pstr
+=
len
;
remainderBufLen
-=
len
;
int
recOfBatch
=
0
;
int
64_t
recOfBatch
=
0
;
for
(
int
i
=
0
;
i
<
batchPerTblTimes
;
i
++
)
{
for
(
int
64_t
i
=
0
;
i
<
batchPerTblTimes
;
i
++
)
{
getTableName
(
tableName
,
pThreadInfo
,
tableSeq
);
if
(
0
==
strlen
(
tableName
))
{
errorPrint
(
"[%d] %s() LN%d, getTableName return null
\n
"
,
...
...
@@ -5038,8 +5055,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
return
NULL
;
}
int
oldRemainderLen
=
remainderBufLen
;
int
generated
=
generateInterlaceDataBuffer
(
int
64_t
oldRemainderLen
=
remainderBufLen
;
int
64_t
generated
=
generateInterlaceDataBuffer
(
tableName
,
batchPerTbl
,
i
,
batchPerTblTimes
,
tableSeq
,
pThreadInfo
,
pstr
,
...
...
@@ -5048,7 +5065,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
&
remainderBufLen
);
if
(
generated
<
0
)
{
debugPrint
(
"[%d] %s() LN%d, generated data is %
d
\n
"
,
debugPrint
(
"[%d] %s() LN%d, generated data is %
"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
generated
);
goto
free_and_statistics_interlace
;
}
else
if
(
generated
==
0
)
{
...
...
@@ -5060,7 +5077,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pstr
+=
(
oldRemainderLen
-
remainderBufLen
);
// startTime += batchPerTbl * superTblInfo->timeStampStep;
pThreadInfo
->
totalInsertRows
+=
batchPerTbl
;
verbosePrint
(
"[%d] %s() LN%d batchPerTbl=%
d recOfBatch=%d
\n
"
,
verbosePrint
(
"[%d] %s() LN%d batchPerTbl=%
"
PRId64
" recOfBatch=%"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
batchPerTbl
,
recOfBatch
);
...
...
@@ -5086,7 +5103,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
}
}
verbosePrint
(
"[%d] %s() LN%d generatedRecPerTbl=%
d
insertRows=%"
PRId64
"
\n
"
,
verbosePrint
(
"[%d] %s() LN%d generatedRecPerTbl=%
"
PRId64
"
insertRows=%"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
generatedRecPerTbl
,
insertRows
);
...
...
@@ -5094,7 +5111,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
break
;
}
verbosePrint
(
"[%d] %s() LN%d recOfBatch=%
d
totalInsertRows=%"
PRId64
"
\n
"
,
verbosePrint
(
"[%d] %s() LN%d recOfBatch=%
"
PRId64
"
totalInsertRows=%"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
recOfBatch
,
pThreadInfo
->
totalInsertRows
);
verbosePrint
(
"[%d] %s() LN%d, buffer=%s
\n
"
,
...
...
@@ -5102,7 +5119,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
startTs
=
taosGetTimestampMs
();
int
affectedRows
=
execInsert
(
pThreadInfo
,
buffer
,
recOfBatch
);
int
64_t
affectedRows
=
execInsert
(
pThreadInfo
,
buffer
,
recOfBatch
);
endTs
=
taosGetTimestampMs
();
int64_t
delay
=
endTs
-
startTs
;
...
...
@@ -5114,10 +5131,11 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo
->
cntDelay
++
;
pThreadInfo
->
totalDelay
+=
delay
;
verbosePrint
(
"[%d] %s() LN%d affectedRows=%d
\n
"
,
pThreadInfo
->
threadID
,
verbosePrint
(
"[%d] %s() LN%d affectedRows=%"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
affectedRows
);
if
((
affectedRows
<
0
)
||
(
recOfBatch
!=
affectedRows
))
{
errorPrint
(
"[%d] %s() LN%d execInsert insert %
d, affected rows: %d
\n
%s
\n
"
,
errorPrint
(
"[%d] %s() LN%d execInsert insert %
"
PRId64
", affected rows: %"
PRId64
"
\n
%s
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
recOfBatch
,
affectedRows
,
buffer
);
goto
free_and_statistics_interlace
;
...
...
@@ -5196,7 +5214,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo
->
samplePos
=
0
;
for
(
uint32
_t
tableSeq
=
for
(
int64
_t
tableSeq
=
pThreadInfo
->
start_table_from
;
tableSeq
<=
pThreadInfo
->
end_table_to
;
tableSeq
++
)
{
int64_t
start_time
=
pThreadInfo
->
start_time
;
...
...
@@ -5213,11 +5231,11 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
char
tableName
[
TSDB_TABLE_NAME_LEN
];
getTableName
(
tableName
,
pThreadInfo
,
tableSeq
);
verbosePrint
(
"%s() LN%d: tid=%d seq=%
d
tableName=%s
\n
"
,
verbosePrint
(
"%s() LN%d: tid=%d seq=%
"
PRId64
"
tableName=%s
\n
"
,
__func__
,
__LINE__
,
pThreadInfo
->
threadID
,
tableSeq
,
tableName
);
int
remainderBufLen
=
maxSqlLen
;
int
64_t
remainderBufLen
=
maxSqlLen
;
char
*
pstr
=
buffer
;
int
nInsertBufLen
=
strlen
(
"insert into "
);
...
...
@@ -5241,7 +5259,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
startTs
=
taosGetTimestampMs
();
int
affectedRows
=
execInsert
(
pThreadInfo
,
buffer
,
generated
);
int
64_t
affectedRows
=
execInsert
(
pThreadInfo
,
buffer
,
generated
);
endTs
=
taosGetTimestampMs
();
int64_t
delay
=
endTs
-
startTs
;
...
...
@@ -5287,7 +5305,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
if
((
tableSeq
==
pThreadInfo
->
ntables
-
1
)
&&
superTblInfo
&&
(
0
==
strncasecmp
(
superTblInfo
->
dataSource
,
"sample"
,
strlen
(
"sample"
))))
{
verbosePrint
(
"%s() LN%d samplePos=%
d
\n
"
,
verbosePrint
(
"%s() LN%d samplePos=%
"
PRId64
"
\n
"
,
__func__
,
__LINE__
,
pThreadInfo
->
samplePos
);
}
}
...
...
@@ -5346,7 +5364,8 @@ static void callBack(void *param, TAOS_RES *res, int code) {
char
*
buffer
=
calloc
(
1
,
pThreadInfo
->
superTblInfo
->
maxSqlLen
);
char
data
[
MAX_DATA_SIZE
];
char
*
pstr
=
buffer
;
pstr
+=
sprintf
(
pstr
,
"insert into %s.%s%d values"
,
pThreadInfo
->
db_name
,
pThreadInfo
->
tb_prefix
,
pstr
+=
sprintf
(
pstr
,
"insert into %s.%s%"
PRId64
" values"
,
pThreadInfo
->
db_name
,
pThreadInfo
->
tb_prefix
,
pThreadInfo
->
start_table_from
);
// if (pThreadInfo->counter >= pThreadInfo->superTblInfo->insertRows) {
if
(
pThreadInfo
->
counter
>=
g_args
.
num_of_RPR
)
{
...
...
@@ -5544,7 +5563,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
exit
(
-
1
);
}
int
childTblCount
;
int
64_t
childTblCount
;
getChildNameOfSuperTableWithLimitAndOffset
(
taos
,
db_name
,
superTblInfo
->
sTblName
,
...
...
@@ -5595,18 +5614,19 @@ static void startMultiThreadInsertData(int threads, char* db_name,
t_info
->
taos
=
NULL
;
}
if
((
NULL
==
superTblInfo
)
/*
if ((NULL == superTblInfo)
|| (0 == superTblInfo->multiThreadWriteOneTbl)) {
*/
t_info
->
start_table_from
=
startFrom
;
t_info
->
ntables
=
i
<
b
?
a
+
1
:
a
;
t_info
->
end_table_to
=
i
<
b
?
startFrom
+
a
:
startFrom
+
a
-
1
;
startFrom
=
t_info
->
end_table_to
+
1
;
}
else
{
/*
} else {
t_info->start_table_from = 0;
t_info->ntables = superTblInfo->childTblCount;
t_info->start_time = t_info->start_time + rand_int() % 10000 - rand_tinyint();
}
*/
tsem_init
(
&
(
t_info
->
lock_sem
),
0
,
0
);
if
(
SYNC
==
g_Dbs
.
queryMode
)
{
pthread_create
(
pids
+
i
,
NULL
,
syncWrite
,
t_info
);
...
...
@@ -6108,7 +6128,7 @@ static void *superTableQuery(void *sarg) {
}
}
et
=
taosGetTimestampMs
();
printf
(
"####thread[%"
PRId64
"] complete all sqls to allocate all sub-tables[%
d - %d
] once queries duration:%.4fs
\n\n
"
,
printf
(
"####thread[%"
PRId64
"] complete all sqls to allocate all sub-tables[%
"
PRId64
" - %"
PRId64
"
] once queries duration:%.4fs
\n\n
"
,
taosGetSelfPthreadId
(),
pThreadInfo
->
start_table_from
,
pThreadInfo
->
end_table_to
,
...
...
@@ -6524,7 +6544,7 @@ static int subscribeTestProcess() {
//==== create sub threads for query from super table
if
((
g_queryInfo
.
specifiedQueryInfo
.
sqlCount
<=
0
)
||
(
g_queryInfo
.
specifiedQueryInfo
.
concurrent
<=
0
))
{
errorPrint
(
"%s() LN%d, query sqlCount %
d or concurrent %d
is not correct.
\n
"
,
errorPrint
(
"%s() LN%d, query sqlCount %
"
PRId64
" or concurrent %"
PRId64
"
is not correct.
\n
"
,
__func__
,
__LINE__
,
g_queryInfo
.
specifiedQueryInfo
.
sqlCount
,
g_queryInfo
.
specifiedQueryInfo
.
concurrent
);
exit
(
-
1
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录