Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
58c6a901
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
58c6a901
编写于
5月 29, 2021
作者:
P
Ping Xiao
浏览文件
操作
浏览文件
下载
差异文件
Merge branches 'xiaoping/test_case' and 'master' of
https://github.com/taosdata/TDengine
上级
1465fd9c
934517b0
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
1158 addition
and
520 deletion
+1158
-520
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+4
-0
src/kit/taosdemo/taosdemo.c
src/kit/taosdemo/taosdemo.c
+1100
-513
src/os/src/detail/osTime.c
src/os/src/detail/osTime.c
+2
-2
src/vnode/src/vnodeMgmt.c
src/vnode/src/vnodeMgmt.c
+4
-4
tests/pytest/fulltest.sh
tests/pytest/fulltest.sh
+1
-1
tests/pytest/tag_lite/drop_auto_create.py
tests/pytest/tag_lite/drop_auto_create.py
+47
-0
未找到文件。
src/client/src/tscParseInsert.c
浏览文件 @
58c6a901
...
...
@@ -769,6 +769,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
index
=
0
;
sToken
=
tStrGetToken
(
sql
,
&
index
,
false
);
if
(
sToken
.
type
==
TK_ILLEGAL
)
{
return
tscSQLSyntaxErrMsg
(
pCmd
->
payload
,
"unrecognized token"
,
sToken
.
z
);
}
if
(
sToken
.
type
==
TK_RP
)
{
break
;
}
...
...
src/kit/taosdemo/taosdemo.c
浏览文件 @
58c6a901
...
...
@@ -19,6 +19,7 @@
*/
#include <stdint.h>
#include <taos.h>
#define _GNU_SOURCE
#define CURL_STATICLIB
...
...
@@ -52,6 +53,8 @@
#include "taoserror.h"
#include "tutil.h"
#define STMT_IFACE_ENABLED 0
#define REQ_EXTRA_BUF_LEN 1024
#define RESP_BUF_LEN 4096
...
...
@@ -61,6 +64,8 @@ extern char configDir[];
#define QUERY_JSON_NAME "query.json"
#define SUBSCRIBE_JSON_NAME "subscribe.json"
#define STR_INSERT_INTO "INSERT INTO "
enum
TEST_MODE
{
INSERT_TEST
,
// 0
QUERY_TEST
,
// 1
...
...
@@ -70,6 +75,8 @@ enum TEST_MODE {
#define MAX_RECORDS_PER_REQ 32766
#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into ..
#define MAX_SQL_SIZE 65536
#define BUFFER_SIZE (65536*2)
#define COND_BUF_LEN BUFFER_SIZE - 30
...
...
@@ -120,17 +127,24 @@ enum enumSYNC_MODE {
MODE_BUT
};
enum
enum_TAOS_INTERFACE
{
TAOSC_IFACE
,
REST_IFACE
,
STMT_IFACE
,
INTERFACE_BUT
};
typedef
enum
enumQUERY_CLASS
{
SPECIFIED_CLASS
,
STABLE_CLASS
,
CLASS_BUT
}
QUERY_CLASS
;
typedef
enum
enum_
INSERT_MOD
E
{
typedef
enum
enum_
PROGRESSIVE_OR_INTERLAC
E
{
PROGRESSIVE_INSERT_MODE
,
INTERLACE_INSERT_MODE
,
INVALID_INSERT_MODE
}
INSERT
_MODE
;
}
PROG_OR_INTERLACE
_MODE
;
typedef
enum
enumQUERY_TYPE
{
NO_INSERT_TYPE
,
...
...
@@ -196,6 +210,7 @@ typedef struct SArguments_S {
uint32_t
test_mode
;
char
*
host
;
uint16_t
port
;
uint16_t
iface
;
char
*
user
;
char
*
password
;
char
*
database
;
...
...
@@ -217,13 +232,13 @@ typedef struct SArguments_S {
uint32_t
num_of_threads
;
uint64_t
insert_interval
;
int64_t
query_times
;
uint
64
_t
interlace_rows
;
uint
64
_t
num_of_RPR
;
// num_of_records_per_req
uint
32
_t
interlace_rows
;
uint
32
_t
num_of_RPR
;
// num_of_records_per_req
uint64_t
max_sql_len
;
int64_t
num_of_tables
;
int64_t
num_of_DPT
;
int
abort
;
int
disorderRatio
;
// 0: no disorder, >0: x%
uint32_t
disorderRatio
;
// 0: no disorder, >0: x%
int
disorderRange
;
// ms or us by database precision
uint32_t
method_of_delete
;
char
**
arg_list
;
...
...
@@ -240,18 +255,19 @@ typedef struct SColumn_S {
typedef
struct
SSuperTable_S
{
char
sTblName
[
MAX_TB_NAME_SIZE
+
1
];
char
dataSource
[
MAX_TB_NAME_SIZE
+
1
];
// rand_gen or sample
char
childTblPrefix
[
MAX_TB_NAME_SIZE
];
char
insertMode
[
MAX_TB_NAME_SIZE
];
// taosc, rest
uint16_t
childTblExists
;
int64_t
childTblCount
;
bool
childTblExists
;
// 0: no, 1: yes
uint64_t
batchCreateTableNum
;
// 0: no batch, > 0: batch table number in one sql
uint8_t
autoCreateTable
;
// 0: create sub table, 1: auto create sub table
char
childTblPrefix
[
MAX_TB_NAME_SIZE
];
char
dataSource
[
MAX_TB_NAME_SIZE
+
1
];
// rand_gen or sample
char
insertMode
[
MAX_TB_NAME_SIZE
];
// taosc, rest
uint16_t
iface
;
// 0: taosc, 1: rest, 2: stmt
int64_t
childTblLimit
;
uint64_t
childTblOffset
;
// int multiThreadWriteOneTbl; // 0: no, 1: yes
uint
64
_t
interlaceRows
;
//
uint
32
_t
interlaceRows
;
//
int
disorderRatio
;
// 0: no disorder, >0: x%
int
disorderRange
;
// ms or us by database precision
uint64_t
maxSqlLen
;
//
...
...
@@ -420,6 +436,7 @@ typedef struct SQueryMetaInfo_S {
typedef
struct
SThreadInfo_S
{
TAOS
*
taos
;
TAOS_STMT
*
stmt
;
int
threadID
;
char
db_name
[
MAX_DB_NAME_SIZE
+
1
];
uint32_t
time_precision
;
...
...
@@ -434,6 +451,7 @@ typedef struct SThreadInfo_S {
char
*
cols
;
bool
use_metric
;
SSuperTable
*
superTblInfo
;
char
*
buffer
;
// sql cmd buffer
// for async insert
tsem_t
lock_sem
;
...
...
@@ -541,7 +559,7 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet);
static
int
postProceSql
(
char
*
host
,
struct
sockaddr_in
*
pServAddr
,
uint16_t
port
,
char
*
sqlstr
,
threadInfo
*
pThreadInfo
);
static
int64_t
getTSRandTail
(
int64_t
timeStampStep
,
int32_t
seq
,
int
disorderRatio
,
int
disorderRange
);
int
disorderRatio
,
int
disorderRange
);
/* ************ Global variables ************ */
...
...
@@ -557,6 +575,7 @@ SArguments g_args = {
0
,
// test_mode
"127.0.0.1"
,
// host
6030
,
// port
TAOSC_IFACE
,
// iface
"root"
,
// user
#ifdef _TD_POWER_
"powerdb"
,
// password
...
...
@@ -673,6 +692,8 @@ static void printHelp() {
"The host to connect to TDengine. Default is localhost."
);
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-p"
,
indent
,
"The TCP/IP port number to use for the connection. Default is 0."
);
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-I"
,
indent
,
"The interface (taosc, rest, and stmt) taosdemo uses. Default is 'taosc'."
);
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-d"
,
indent
,
"Destination database. Default is 'test'."
);
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-a"
,
indent
,
...
...
@@ -761,6 +782,23 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
exit
(
EXIT_FAILURE
);
}
arguments
->
port
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-I"
)
==
0
)
{
if
(
argc
==
i
+
1
)
{
printHelp
();
errorPrint
(
"%s"
,
"
\n\t
-I need a valid string following!
\n
"
);
exit
(
EXIT_FAILURE
);
}
++
i
;
if
(
0
==
strcasecmp
(
argv
[
i
],
"taosc"
))
{
arguments
->
iface
=
TAOSC_IFACE
;
}
else
if
(
0
==
strcasecmp
(
argv
[
i
],
"rest"
))
{
arguments
->
iface
=
REST_IFACE
;
}
else
if
(
0
==
strcasecmp
(
argv
[
i
],
"stmt"
))
{
arguments
->
iface
=
STMT_IFACE
;
}
else
{
errorPrint
(
"%s"
,
"
\n\t
-I need a valid string following!
\n
"
);
exit
(
EXIT_FAILURE
);
}
}
else
if
(
strcmp
(
argv
[
i
],
"-u"
)
==
0
)
{
if
(
argc
==
i
+
1
)
{
printHelp
();
...
...
@@ -793,7 +831,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
if
((
argc
==
i
+
1
)
||
(
!
isStringNumber
(
argv
[
i
+
1
])))
{
printHelp
();
errorPrint
(
"%s"
,
"
\n\t
-q need a number following!
\n
Query mode -- 0: SYNC,
1
: ASYNC. Default is SYNC.
\n
"
);
errorPrint
(
"%s"
,
"
\n\t
-q need a number following!
\n
Query mode -- 0: SYNC,
not-0
: ASYNC. Default is SYNC.
\n
"
);
exit
(
EXIT_FAILURE
);
}
arguments
->
async_mode
=
atoi
(
argv
[
++
i
]);
...
...
@@ -897,6 +935,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
&&
strcasecmp
(
argv
[
i
],
"BIGINT"
)
&&
strcasecmp
(
argv
[
i
],
"DOUBLE"
)
&&
strcasecmp
(
argv
[
i
],
"BINARY"
)
&&
strcasecmp
(
argv
[
i
],
"TIMESTAMP"
)
&&
strcasecmp
(
argv
[
i
],
"NCHAR"
))
{
printHelp
();
errorPrint
(
"%s"
,
"-b: Invalid data_type!
\n
"
);
...
...
@@ -918,6 +957,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
&&
strcasecmp
(
token
,
"BIGINT"
)
&&
strcasecmp
(
token
,
"DOUBLE"
)
&&
strcasecmp
(
token
,
"BINARY"
)
&&
strcasecmp
(
token
,
"TIMESTAMP"
)
&&
strcasecmp
(
token
,
"NCHAR"
))
{
printHelp
();
free
(
g_dupstr
);
...
...
@@ -1019,6 +1059,19 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
}
int
columnCount
;
for
(
columnCount
=
0
;
columnCount
<
MAX_NUM_DATATYPE
;
columnCount
++
)
{
if
(
g_args
.
datatype
[
columnCount
]
==
NULL
)
{
break
;
}
}
if
(
0
==
columnCount
)
{
perror
(
"data type error!"
);
exit
(
-
1
);
}
g_args
.
num_of_CPR
=
columnCount
;
if
(((
arguments
->
debug_print
)
&&
(
arguments
->
metaFile
==
NULL
))
||
arguments
->
verbose_print
)
{
printf
(
"###################################################################
\n
"
);
...
...
@@ -1028,7 +1081,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments
->
port
);
printf
(
"# User: %s
\n
"
,
arguments
->
user
);
printf
(
"# Password: %s
\n
"
,
arguments
->
password
);
printf
(
"# Use metric: %s
\n
"
,
arguments
->
use_metric
?
"true"
:
"false"
);
printf
(
"# Use metric: %s
\n
"
,
arguments
->
use_metric
?
"true"
:
"false"
);
if
(
*
(
arguments
->
datatype
))
{
printf
(
"# Specified data type: "
);
for
(
int
i
=
0
;
i
<
MAX_NUM_DATATYPE
;
i
++
)
...
...
@@ -1040,7 +1094,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
printf
(
"# Insertion interval: %"
PRIu64
"
\n
"
,
arguments
->
insert_interval
);
printf
(
"# Number of records per req: %
"
PRIu64
"
\n
"
,
printf
(
"# Number of records per req: %
u
\n
"
,
arguments
->
num_of_RPR
);
printf
(
"# Max SQL length: %"
PRIu64
"
\n
"
,
arguments
->
max_sql_len
);
...
...
@@ -1068,8 +1122,6 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
static
bool
getInfoFromJsonFile
(
char
*
file
);
//static int generateOneRowDataForStb(SSuperTable* stbInfo);
//static int getDataIntoMemForStb(SSuperTable* stbInfo);
static
void
init_rand_data
();
static
void
tmfclose
(
FILE
*
fp
)
{
if
(
NULL
!=
fp
)
{
...
...
@@ -1088,7 +1140,7 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
TAOS_RES
*
res
=
NULL
;
int32_t
code
=
-
1
;
for
(
i
=
0
;
i
<
5
;
i
++
)
{
for
(
i
=
0
;
i
<
5
/* retry */
;
i
++
)
{
if
(
NULL
!=
res
)
{
taos_free_result
(
res
);
res
=
NULL
;
...
...
@@ -1104,7 +1156,8 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
verbosePrint
(
"%s() LN%d - command: %s
\n
"
,
__func__
,
__LINE__
,
command
);
if
(
code
!=
0
)
{
if
(
!
quiet
)
{
errorPrint
(
"Failed to execute %s, reason: %s
\n
"
,
command
,
taos_errstr
(
res
));
errorPrint
(
"Failed to execute %s, reason: %s
\n
"
,
command
,
taos_errstr
(
res
));
}
taos_free_result
(
res
);
//taos_close(taos);
...
...
@@ -1320,6 +1373,8 @@ static void init_rand_data() {
static
int
printfInsertMeta
()
{
SHOW_PARSE_RESULT_START
();
printf
(
"interface:
\033
[33m%s
\033
[0m
\n
"
,
(
g_args
.
iface
==
TAOSC_IFACE
)
?
"taosc"
:
(
g_args
.
iface
==
REST_IFACE
)
?
"rest"
:
"stmt"
);
printf
(
"host:
\033
[33m%s:%u
\033
[0m
\n
"
,
g_Dbs
.
host
,
g_Dbs
.
port
);
printf
(
"user:
\033
[33m%s
\033
[0m
\n
"
,
g_Dbs
.
user
);
...
...
@@ -1331,7 +1386,7 @@ static int printfInsertMeta() {
g_Dbs
.
threadCountByCreateTbl
);
printf
(
"top insert interval:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
g_args
.
insert_interval
);
printf
(
"number of records per req:
\033
[33m%
"
PRIu64
"
\033
[0m
\n
"
,
printf
(
"number of records per req:
\033
[33m%
u
\033
[0m
\n
"
,
g_args
.
num_of_RPR
);
printf
(
"max sql length:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
g_args
.
max_sql_len
);
...
...
@@ -1417,7 +1472,8 @@ static int printfInsertMeta() {
if
(
PRE_CREATE_SUBTBL
==
g_Dbs
.
db
[
i
].
superTbls
[
j
].
autoCreateTable
)
{
printf
(
" autoCreateTable:
\033
[33m%s
\033
[0m
\n
"
,
"no"
);
}
else
if
(
AUTO_CREATE_SUBTBL
==
g_Dbs
.
db
[
i
].
superTbls
[
j
].
autoCreateTable
)
{
}
else
if
(
AUTO_CREATE_SUBTBL
==
g_Dbs
.
db
[
i
].
superTbls
[
j
].
autoCreateTable
)
{
printf
(
" autoCreateTable:
\033
[33m%s
\033
[0m
\n
"
,
"yes"
);
}
else
{
printf
(
" autoCreateTable:
\033
[33m%s
\033
[0m
\n
"
,
"error"
);
...
...
@@ -1437,8 +1493,9 @@ static int printfInsertMeta() {
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblPrefix
);
printf
(
" dataSource:
\033
[33m%s
\033
[0m
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
dataSource
);
printf
(
" insertMode:
\033
[33m%s
\033
[0m
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
insertMode
);
printf
(
" iface:
\033
[33m%s
\033
[0m
\n
"
,
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
iface
==
TAOSC_IFACE
)
?
"taosc"
:
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
iface
==
REST_IFACE
)
?
"rest"
:
"stmt"
);
if
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblLimit
>
0
)
{
printf
(
" childTblLimit:
\033
[33m%"
PRId64
"
\033
[0m
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblLimit
);
...
...
@@ -1456,7 +1513,7 @@ static int printfInsertMeta() {
printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n");
}
*/
printf
(
" interlaceRows:
\033
[33m%
"
PRIu64
"
\033
[0m
\n
"
,
printf
(
" interlaceRows:
\033
[33m%
u
\033
[0m
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
);
if
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
>
0
)
{
...
...
@@ -1534,7 +1591,7 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf
(
fp
,
"resultFile: %s
\n
"
,
g_Dbs
.
resultFile
);
fprintf
(
fp
,
"thread num of insert data: %d
\n
"
,
g_Dbs
.
threadCount
);
fprintf
(
fp
,
"thread num of create table: %d
\n
"
,
g_Dbs
.
threadCountByCreateTbl
);
fprintf
(
fp
,
"number of records per req: %
"
PRIu64
"
\n
"
,
g_args
.
num_of_RPR
);
fprintf
(
fp
,
"number of records per req: %
u
\n
"
,
g_args
.
num_of_RPR
);
fprintf
(
fp
,
"max sql length: %"
PRIu64
"
\n
"
,
g_args
.
max_sql_len
);
fprintf
(
fp
,
"database count: %d
\n
"
,
g_Dbs
.
dbCount
);
...
...
@@ -1626,11 +1683,12 @@ static void printfInsertMetaToFile(FILE* fp) {
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblPrefix
);
fprintf
(
fp
,
" dataSource: %s
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
dataSource
);
fprintf
(
fp
,
" insertMode: %s
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
insertMode
);
fprintf
(
fp
,
" iface: %s
\n
"
,
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
iface
==
TAOSC_IFACE
)
?
"taosc"
:
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
iface
==
REST_IFACE
)
?
"rest"
:
"stmt"
);
fprintf
(
fp
,
" insertRows: %"
PRId64
"
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
insertRows
);
fprintf
(
fp
,
" interlace rows: %
"
PRIu64
"
\n
"
,
fprintf
(
fp
,
" interlace rows: %
u
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
);
if
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
>
0
)
{
fprintf
(
fp
,
" stable insert interval: %"
PRIu64
"
\n
"
,
...
...
@@ -1643,7 +1701,7 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, " multiThreadWriteOneTbl: yes\n");
}
*/
fprintf
(
fp
,
" interlaceRows: %
"
PRIu64
"
\n
"
,
fprintf
(
fp
,
" interlaceRows: %
u
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
);
fprintf
(
fp
,
" disorderRange: %d
\n
"
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
disorderRange
);
...
...
@@ -2803,7 +2861,7 @@ static int createDatabasesAndStables() {
int
validStbCount
=
0
;
for
(
in
t
j
=
0
;
j
<
g_Dbs
.
db
[
i
].
superTblCount
;
j
++
)
{
for
(
uint64_
t
j
=
0
;
j
<
g_Dbs
.
db
[
i
].
superTblCount
;
j
++
)
{
sprintf
(
command
,
"describe %s.%s;"
,
g_Dbs
.
db
[
i
].
dbName
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
sTblName
);
ret
=
queryDbExec
(
taos
,
command
,
NO_INSERT_TYPE
,
true
);
...
...
@@ -2813,7 +2871,7 @@ static int createDatabasesAndStables() {
&
g_Dbs
.
db
[
i
].
superTbls
[
j
]);
if
(
0
!=
ret
)
{
errorPrint
(
"create super table %
d
failed!
\n\n
"
,
j
);
errorPrint
(
"create super table %
"
PRIu64
"
failed!
\n\n
"
,
j
);
continue
;
}
}
...
...
@@ -2841,7 +2899,7 @@ static void* createTable(void *sarg)
threadInfo
*
pThreadInfo
=
(
threadInfo
*
)
sarg
;
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
int64_t
lastPrintTime
=
taosGetTimestampMs
();
u
int64_t
lastPrintTime
=
taosGetTimestampMs
();
int
buff_len
;
buff_len
=
BUFFER_SIZE
/
8
;
...
...
@@ -2915,7 +2973,7 @@ static void* createTable(void *sarg)
return
NULL
;
}
int64_t
currentPrintTime
=
taosGetTimestampMs
();
u
int64_t
currentPrintTime
=
taosGetTimestampMs
();
if
(
currentPrintTime
-
lastPrintTime
>
30
*
1000
)
{
printf
(
"thread[%d] already create %"
PRIu64
" - %"
PRIu64
" tables
\n
"
,
pThreadInfo
->
threadID
,
pThreadInfo
->
start_table_from
,
i
);
...
...
@@ -2934,11 +2992,11 @@ static void* createTable(void *sarg)
}
static
int
startMultiThreadCreateChildTable
(
char
*
cols
,
int
threads
,
uint64_t
start
From
,
int64_t
ntables
,
char
*
cols
,
int
threads
,
uint64_t
table
From
,
int64_t
ntables
,
char
*
db_name
,
SSuperTable
*
superTblInfo
)
{
pthread_t
*
pids
=
malloc
(
threads
*
sizeof
(
pthread_t
));
threadInfo
*
infos
=
malloc
(
threads
*
sizeof
(
threadInfo
));
threadInfo
*
infos
=
calloc
(
1
,
threads
*
sizeof
(
threadInfo
));
if
((
NULL
==
pids
)
||
(
NULL
==
infos
))
{
printf
(
"malloc failed
\n
"
);
...
...
@@ -2978,10 +3036,10 @@ static int startMultiThreadCreateChildTable(
return
-
1
;
}
pThreadInfo
->
start_table_from
=
start
From
;
pThreadInfo
->
start_table_from
=
table
From
;
pThreadInfo
->
ntables
=
i
<
b
?
a
+
1
:
a
;
pThreadInfo
->
end_table_to
=
i
<
b
?
startFrom
+
a
:
start
From
+
a
-
1
;
start
From
=
pThreadInfo
->
end_table_to
+
1
;
pThreadInfo
->
end_table_to
=
i
<
b
?
tableFrom
+
a
:
table
From
+
a
-
1
;
table
From
=
pThreadInfo
->
end_table_to
+
1
;
pThreadInfo
->
use_metric
=
true
;
pThreadInfo
->
cols
=
cols
;
pThreadInfo
->
minDelay
=
UINT64_MAX
;
...
...
@@ -3007,61 +3065,61 @@ static void createChildTables() {
char
tblColsBuf
[
MAX_SQL_SIZE
];
int
len
;
for
(
int
i
=
0
;
i
<
g_Dbs
.
dbCount
;
i
++
)
{
if
(
g_Dbs
.
use_metric
)
{
if
(
g_Dbs
.
db
[
i
].
superTblCount
>
0
)
{
// with super table
for
(
int
j
=
0
;
j
<
g_Dbs
.
db
[
i
].
superTblCount
;
j
++
)
{
if
((
AUTO_CREATE_SUBTBL
==
g_Dbs
.
db
[
i
].
superTbls
[
j
].
autoCreateTable
)
||
(
TBL_ALREADY_EXISTS
==
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblExists
))
{
continue
;
}
for
(
int
i
=
0
;
i
<
g_Dbs
.
dbCount
;
i
++
)
{
if
(
g_Dbs
.
use_metric
)
{
if
(
g_Dbs
.
db
[
i
].
superTblCount
>
0
)
{
// with super table
for
(
int
j
=
0
;
j
<
g_Dbs
.
db
[
i
].
superTblCount
;
j
++
)
{
if
((
AUTO_CREATE_SUBTBL
==
g_Dbs
.
db
[
i
].
superTbls
[
j
].
autoCreateTable
)
||
(
TBL_ALREADY_EXISTS
==
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblExists
))
{
continue
;
}
verbosePrint
(
"%s() LN%d: %s
\n
"
,
__func__
,
__LINE__
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
colsOfCreateChildTable
);
uint64_t
startFrom
=
0
;
g_totalChildTables
+=
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblCount
;
verbosePrint
(
"%s() LN%d: create %"
PRId64
" child tables from %"
PRIu64
"
\n
"
,
__func__
,
__LINE__
,
g_totalChildTables
,
startFrom
);
startMultiThreadCreateChildTable
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
colsOfCreateChildTable
,
g_Dbs
.
threadCountByCreateTbl
,
startFrom
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblCount
,
g_Dbs
.
db
[
i
].
dbName
,
&
(
g_Dbs
.
db
[
i
].
superTbls
[
j
]));
}
}
}
else
{
// normal table
len
=
snprintf
(
tblColsBuf
,
MAX_SQL_SIZE
,
"(TS TIMESTAMP"
);
for
(
int
j
=
0
;
j
<
g_args
.
num_of_CPR
;
j
++
)
{
if
((
strncasecmp
(
g_args
.
datatype
[
j
],
"BINARY"
,
strlen
(
"BINARY"
))
==
0
)
||
(
strncasecmp
(
g_args
.
datatype
[
j
],
"NCHAR"
,
strlen
(
"NCHAR"
))
==
0
))
{
snprintf
(
tblColsBuf
+
len
,
MAX_SQL_SIZE
-
len
,
", COL%d %s(%d)"
,
j
,
g_args
.
datatype
[
j
],
g_args
.
len_of_binary
);
}
else
{
snprintf
(
tblColsBuf
+
len
,
MAX_SQL_SIZE
-
len
,
", COL%d %s"
,
j
,
g_args
.
datatype
[
j
]);
}
len
=
strlen
(
tblColsBuf
);
}
verbosePrint
(
"%s() LN%d: %s
\n
"
,
__func__
,
__LINE__
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
colsOfCreateChildTable
);
uint64_t
startFrom
=
0
;
g_totalChildTables
+=
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblCount
;
verbosePrint
(
"%s() LN%d: create %"
PRId64
" child tables from %"
PRIu64
"
\n
"
,
__func__
,
__LINE__
,
g_totalChildTables
,
startFrom
);
startMultiThreadCreateChildTable
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
colsOfCreateChildTable
,
g_Dbs
.
threadCountByCreateTbl
,
startFrom
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblCount
,
g_Dbs
.
db
[
i
].
dbName
,
&
(
g_Dbs
.
db
[
i
].
superTbls
[
j
]));
snprintf
(
tblColsBuf
+
len
,
MAX_SQL_SIZE
-
len
,
")"
);
verbosePrint
(
"%s() LN%d: dbName: %s num of tb: %"
PRId64
" schema: %s
\n
"
,
__func__
,
__LINE__
,
g_Dbs
.
db
[
i
].
dbName
,
g_args
.
num_of_tables
,
tblColsBuf
);
startMultiThreadCreateChildTable
(
tblColsBuf
,
g_Dbs
.
threadCountByCreateTbl
,
0
,
g_args
.
num_of_tables
,
g_Dbs
.
db
[
i
].
dbName
,
NULL
);
}
}
}
else
{
// normal table
len
=
snprintf
(
tblColsBuf
,
MAX_SQL_SIZE
,
"(TS TIMESTAMP"
);
for
(
int
j
=
0
;
j
<
g_args
.
num_of_CPR
;
j
++
)
{
if
((
strncasecmp
(
g_args
.
datatype
[
j
],
"BINARY"
,
strlen
(
"BINARY"
))
==
0
)
||
(
strncasecmp
(
g_args
.
datatype
[
j
],
"NCHAR"
,
strlen
(
"NCHAR"
))
==
0
))
{
snprintf
(
tblColsBuf
+
len
,
MAX_SQL_SIZE
-
len
,
", COL%d %s(%d)"
,
j
,
g_args
.
datatype
[
j
],
g_args
.
len_of_binary
);
}
else
{
snprintf
(
tblColsBuf
+
len
,
MAX_SQL_SIZE
-
len
,
", COL%d %s"
,
j
,
g_args
.
datatype
[
j
]);
}
len
=
strlen
(
tblColsBuf
);
}
snprintf
(
tblColsBuf
+
len
,
MAX_SQL_SIZE
-
len
,
")"
);
verbosePrint
(
"%s() LN%d: dbName: %s num of tb: %"
PRId64
" schema: %s
\n
"
,
__func__
,
__LINE__
,
g_Dbs
.
db
[
i
].
dbName
,
g_args
.
num_of_tables
,
tblColsBuf
);
startMultiThreadCreateChildTable
(
tblColsBuf
,
g_Dbs
.
threadCountByCreateTbl
,
0
,
g_args
.
num_of_tables
,
g_Dbs
.
db
[
i
].
dbName
,
NULL
);
}
}
}
/*
...
...
@@ -3132,10 +3190,12 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
return
0
;
}
#if 0
int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) {
// TODO
return 0;
}
#endif
/*
Read 10000 lines at most. If more than 10000 lines, continue to read after using
...
...
@@ -3520,9 +3580,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
// rows per table need be less than insert batch
if
(
g_args
.
interlace_rows
>
g_args
.
num_of_RPR
)
{
printf
(
"NOTICE: interlace rows value %
"
PRIu64
" > num_of_records_per_req %"
PRIu64
"
\n\n
"
,
printf
(
"NOTICE: interlace rows value %
u > num_of_records_per_req %u
\n\n
"
,
g_args
.
interlace_rows
,
g_args
.
num_of_RPR
);
printf
(
" interlace rows value will be set to num_of_records_per_req %
"
PRIu64
"
\n\n
"
,
printf
(
" interlace rows value will be set to num_of_records_per_req %
u
\n\n
"
,
g_args
.
num_of_RPR
);
prompt
();
g_args
.
interlace_rows
=
g_args
.
num_of_RPR
;
...
...
@@ -3753,36 +3813,40 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
// dbinfo
cJSON
*
stbName
=
cJSON_GetObjectItem
(
stbInfo
,
"name"
);
if
(
!
stbName
||
stbName
->
type
!=
cJSON_String
||
stbName
->
valuestring
==
NULL
)
{
if
(
!
stbName
||
stbName
->
type
!=
cJSON_String
||
stbName
->
valuestring
==
NULL
)
{
errorPrint
(
"%s() LN%d, failed to read json, stb name not found
\n
"
,
__func__
,
__LINE__
);
goto
PARSE_OVER
;
}
tstrncpy
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
sTblName
,
stbName
->
valuestring
,
MAX_TB_NAME_SIZE
);
tstrncpy
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
sTblName
,
stbName
->
valuestring
,
MAX_TB_NAME_SIZE
);
cJSON
*
prefix
=
cJSON_GetObjectItem
(
stbInfo
,
"childtable_prefix"
);
if
(
!
prefix
||
prefix
->
type
!=
cJSON_String
||
prefix
->
valuestring
==
NULL
)
{
printf
(
"ERROR: failed to read json, childtable_prefix not found
\n
"
);
goto
PARSE_OVER
;
}
tstrncpy
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblPrefix
,
prefix
->
valuestring
,
MAX_DB_NAME_SIZE
);
tstrncpy
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblPrefix
,
prefix
->
valuestring
,
MAX_DB_NAME_SIZE
);
cJSON
*
autoCreateTbl
=
cJSON_GetObjectItem
(
stbInfo
,
"auto_create_table"
);
// yes, no, null
cJSON
*
autoCreateTbl
=
cJSON_GetObjectItem
(
stbInfo
,
"auto_create_table"
);
if
(
autoCreateTbl
&&
autoCreateTbl
->
type
==
cJSON_String
&&
autoCreateTbl
->
valuestring
!=
NULL
)
{
if
(
0
==
strncasecmp
(
autoCreateTbl
->
valuestring
,
"yes"
,
3
))
{
g_Dbs
.
db
[
i
].
superTbls
[
j
].
autoCreateTable
=
AUTO_CREATE_SUBTBL
;
}
else
if
(
0
==
strncasecmp
(
autoCreateTbl
->
valuestring
,
"no"
,
2
))
{
g_Dbs
.
db
[
i
].
superTbls
[
j
].
autoCreateTable
=
PRE_CREATE_SUBTBL
;
}
else
{
g_Dbs
.
db
[
i
].
superTbls
[
j
].
autoCreateTable
=
PRE_CREATE_SUBTBL
;
}
if
((
0
==
strncasecmp
(
autoCreateTbl
->
valuestring
,
"yes"
,
3
))
&&
(
TBL_ALREADY_EXISTS
!=
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblExists
))
{
g_Dbs
.
db
[
i
].
superTbls
[
j
].
autoCreateTable
=
AUTO_CREATE_SUBTBL
;
}
else
if
(
0
==
strncasecmp
(
autoCreateTbl
->
valuestring
,
"no"
,
2
))
{
g_Dbs
.
db
[
i
].
superTbls
[
j
].
autoCreateTable
=
PRE_CREATE_SUBTBL
;
}
else
{
g_Dbs
.
db
[
i
].
superTbls
[
j
].
autoCreateTable
=
PRE_CREATE_SUBTBL
;
}
}
else
if
(
!
autoCreateTbl
)
{
g_Dbs
.
db
[
i
].
superTbls
[
j
].
autoCreateTable
=
PRE_CREATE_SUBTBL
;
g_Dbs
.
db
[
i
].
superTbls
[
j
].
autoCreateTable
=
PRE_CREATE_SUBTBL
;
}
else
{
printf
(
"ERROR: failed to read json, auto_create_table not found
\n
"
);
goto
PARSE_OVER
;
printf
(
"ERROR: failed to read json, auto_create_table not found
\n
"
);
goto
PARSE_OVER
;
}
cJSON
*
batchCreateTbl
=
cJSON_GetObjectItem
(
stbInfo
,
"batch_create_tbl_num"
);
...
...
@@ -3816,6 +3880,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto
PARSE_OVER
;
}
if
(
TBL_ALREADY_EXISTS
==
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblExists
)
{
g_Dbs
.
db
[
i
].
superTbls
[
j
].
autoCreateTable
=
PRE_CREATE_SUBTBL
;
}
cJSON
*
count
=
cJSON_GetObjectItem
(
stbInfo
,
"childtable_count"
);
if
(
!
count
||
count
->
type
!=
cJSON_Number
||
0
>=
count
->
valueint
)
{
errorPrint
(
"%s() LN%d, failed to read json, childtable_count input mistake
\n
"
,
...
...
@@ -3837,15 +3905,24 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto
PARSE_OVER
;
}
cJSON
*
insertMode
=
cJSON_GetObjectItem
(
stbInfo
,
"insert_mode"
);
// taosc , rest
if
(
insertMode
&&
insertMode
->
type
==
cJSON_String
&&
insertMode
->
valuestring
!=
NULL
)
{
tstrncpy
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
insertMode
,
insertMode
->
valuestring
,
MAX_DB_NAME_SIZE
);
}
else
if
(
!
insertMode
)
{
tstrncpy
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
insertMode
,
"taosc"
,
MAX_DB_NAME_SIZE
);
cJSON
*
stbIface
=
cJSON_GetObjectItem
(
stbInfo
,
"insert_mode"
);
// taosc , rest, stmt
if
(
stbIface
&&
stbIface
->
type
==
cJSON_String
&&
stbIface
->
valuestring
!=
NULL
)
{
if
(
0
==
strcasecmp
(
stbIface
->
valuestring
,
"taosc"
))
{
g_Dbs
.
db
[
i
].
superTbls
[
j
].
iface
=
TAOSC_IFACE
;
}
else
if
(
0
==
strcasecmp
(
stbIface
->
valuestring
,
"rest"
))
{
g_Dbs
.
db
[
i
].
superTbls
[
j
].
iface
=
REST_IFACE
;
}
else
if
(
0
==
strcasecmp
(
stbIface
->
valuestring
,
"stmt"
))
{
g_Dbs
.
db
[
i
].
superTbls
[
j
].
iface
=
STMT_IFACE
;
}
else
{
errorPrint
(
"%s() LN%d, failed to read json, insert_mode %s not recognized
\n
"
,
__func__
,
__LINE__
,
stbIface
->
valuestring
);
goto
PARSE_OVER
;
}
}
else
if
(
!
stbIface
)
{
g_Dbs
.
db
[
i
].
superTbls
[
j
].
iface
=
TAOSC_IFACE
;
}
else
{
printf
(
"ERROR:
failed to read json, insert_mode not found
\n
"
);
errorPrint
(
"%s"
,
"
failed to read json, insert_mode not found
\n
"
);
goto
PARSE_OVER
;
}
...
...
@@ -3864,7 +3941,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
cJSON
*
childTbl_offset
=
cJSON_GetObjectItem
(
stbInfo
,
"childtable_offset"
);
if
((
childTbl_offset
)
&&
(
g_Dbs
.
db
[
i
].
drop
!=
true
)
&&
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
childTblExists
==
TBL_ALREADY_EXISTS
))
{
if
(
childTbl_offset
->
type
!=
cJSON_Number
||
0
>
childTbl_offset
->
valueint
)
{
if
((
childTbl_offset
->
type
!=
cJSON_Number
)
||
(
0
>
childTbl_offset
->
valueint
))
{
printf
(
"ERROR: failed to read json, childtable_offset
\n
"
);
goto
PARSE_OVER
;
}
...
...
@@ -3920,7 +3998,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
}
cJSON
*
tagsFile
=
cJSON_GetObjectItem
(
stbInfo
,
"tags_file"
);
if
(
tagsFile
&&
tagsFile
->
type
==
cJSON_String
&&
tagsFile
->
valuestring
!=
NULL
)
{
if
((
tagsFile
&&
tagsFile
->
type
==
cJSON_String
)
&&
(
tagsFile
->
valuestring
!=
NULL
))
{
tstrncpy
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
tagsFile
,
tagsFile
->
valuestring
,
MAX_FILE_NAME_LEN
);
if
(
0
==
g_Dbs
.
db
[
i
].
superTbls
[
j
].
tagsFile
[
0
])
{
...
...
@@ -3936,9 +4015,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto
PARSE_OVER
;
}
cJSON
*
m
axSqlLen
=
cJSON_GetObjectItem
(
stbInfo
,
"max_sql_len"
);
if
(
maxSqlLen
&&
m
axSqlLen
->
type
==
cJSON_Number
)
{
int32_t
len
=
m
axSqlLen
->
valueint
;
cJSON
*
stbM
axSqlLen
=
cJSON_GetObjectItem
(
stbInfo
,
"max_sql_len"
);
if
(
stbMaxSqlLen
&&
stbM
axSqlLen
->
type
==
cJSON_Number
)
{
int32_t
len
=
stbM
axSqlLen
->
valueint
;
if
(
len
>
TSDB_MAX_ALLOWED_SQL_LEN
)
{
len
=
TSDB_MAX_ALLOWED_SQL_LEN
;
}
else
if
(
len
<
5
)
{
...
...
@@ -3948,7 +4027,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
}
else
if
(
!
maxSqlLen
)
{
g_Dbs
.
db
[
i
].
superTbls
[
j
].
maxSqlLen
=
g_args
.
max_sql_len
;
}
else
{
errorPrint
(
"%s() LN%d, failed to read json,
m
axSqlLen input mistake
\n
"
,
errorPrint
(
"%s() LN%d, failed to read json,
stbM
axSqlLen input mistake
\n
"
,
__func__
,
__LINE__
);
goto
PARSE_OVER
;
}
...
...
@@ -3970,24 +4049,25 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER;
}
*/
cJSON
*
i
nterlaceRows
=
cJSON_GetObjectItem
(
stbInfo
,
"interlace_rows"
);
if
(
interlaceRows
&&
i
nterlaceRows
->
type
==
cJSON_Number
)
{
if
(
i
nterlaceRows
->
valueint
<
0
)
{
cJSON
*
stbI
nterlaceRows
=
cJSON_GetObjectItem
(
stbInfo
,
"interlace_rows"
);
if
(
stbInterlaceRows
&&
stbI
nterlaceRows
->
type
==
cJSON_Number
)
{
if
(
stbI
nterlaceRows
->
valueint
<
0
)
{
errorPrint
(
"%s() LN%d, failed to read json, interlace rows input mistake
\n
"
,
__func__
,
__LINE__
);
goto
PARSE_OVER
;
}
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
=
i
nterlaceRows
->
valueint
;
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
=
stbI
nterlaceRows
->
valueint
;
// rows per table need be less than insert batch
if
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
>
g_args
.
num_of_RPR
)
{
printf
(
"NOTICE: db[%d].superTbl[%d]'s interlace rows value %"
PRIu64
" > num_of_records_per_req %"
PRIu64
"
\n\n
"
,
i
,
j
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
,
g_args
.
num_of_RPR
);
printf
(
" interlace rows value will be set to num_of_records_per_req %"
PRIu64
"
\n\n
"
,
printf
(
"NOTICE: db[%d].superTbl[%d]'s interlace rows value %u > num_of_records_per_req %u
\n\n
"
,
i
,
j
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
,
g_args
.
num_of_RPR
);
printf
(
" interlace rows value will be set to num_of_records_per_req %u
\n\n
"
,
g_args
.
num_of_RPR
);
prompt
();
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
=
g_args
.
num_of_RPR
;
}
}
else
if
(
!
i
nterlaceRows
)
{
}
else
if
(
!
stbI
nterlaceRows
)
{
g_Dbs
.
db
[
i
].
superTbls
[
j
].
interlaceRows
=
0
;
// 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
}
else
{
errorPrint
(
...
...
@@ -4613,7 +4693,7 @@ static void prepareSampleData() {
static
void
postFreeResource
()
{
tmfclose
(
g_fpOfInsertResult
);
for
(
int
i
=
0
;
i
<
g_Dbs
.
dbCount
;
i
++
)
{
for
(
in
t
j
=
0
;
j
<
g_Dbs
.
db
[
i
].
superTblCount
;
j
++
)
{
for
(
uint64_
t
j
=
0
;
j
<
g_Dbs
.
db
[
i
].
superTblCount
;
j
++
)
{
if
(
0
!=
g_Dbs
.
db
[
i
].
superTbls
[
j
].
colsOfCreateChildTable
)
{
free
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
colsOfCreateChildTable
);
g_Dbs
.
db
[
i
].
superTbls
[
j
].
colsOfCreateChildTable
=
NULL
;
...
...
@@ -4661,16 +4741,22 @@ static int getRowDataFromSample(
return
dataLen
;
}
static
int64_t
generateRowData
(
char
*
recBuf
,
int64_t
timestamp
,
SSuperTable
*
stbInfo
)
{
static
int64_t
generateStbRowData
(
SSuperTable
*
stbInfo
,
char
*
recBuf
,
int64_t
timestamp
)
{
int64_t
dataLen
=
0
;
char
*
pstr
=
recBuf
;
int64_t
maxLen
=
MAX_DATA_SIZE
;
dataLen
+=
snprintf
(
pstr
+
dataLen
,
maxLen
-
dataLen
,
"(%"
PRId64
","
,
timestamp
);
dataLen
+=
snprintf
(
pstr
+
dataLen
,
maxLen
-
dataLen
,
"(%"
PRId64
","
,
timestamp
);
for
(
int
i
=
0
;
i
<
stbInfo
->
columnCount
;
i
++
)
{
if
((
0
==
strncasecmp
(
stbInfo
->
columns
[
i
].
dataType
,
"BINARY"
,
strlen
(
"BINARY"
)))
||
(
0
==
strncasecmp
(
stbInfo
->
columns
[
i
].
dataType
,
"NCHAR"
,
strlen
(
"NCHAR"
))))
{
if
((
0
==
strncasecmp
(
stbInfo
->
columns
[
i
].
dataType
,
"BINARY"
,
strlen
(
"BINARY"
)))
||
(
0
==
strncasecmp
(
stbInfo
->
columns
[
i
].
dataType
,
"NCHAR"
,
strlen
(
"NCHAR"
))))
{
if
(
stbInfo
->
columns
[
i
].
dataLen
>
TSDB_MAX_BINARY_LEN
)
{
errorPrint
(
"binary or nchar length overflow, max size:%u
\n
"
,
(
uint32_t
)
TSDB_MAX_BINARY_LEN
);
...
...
@@ -4686,23 +4772,23 @@ static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stb
dataLen
+=
snprintf
(
pstr
+
dataLen
,
maxLen
-
dataLen
,
"
\'
%s
\'
,"
,
buf
);
tmfree
(
buf
);
}
else
if
(
0
==
strncasecmp
(
stbInfo
->
columns
[
i
].
dataType
,
"INT"
,
3
))
{
"INT"
,
strlen
(
"INT"
)
))
{
dataLen
+=
snprintf
(
pstr
+
dataLen
,
maxLen
-
dataLen
,
"%d,"
,
rand_int
());
}
else
if
(
0
==
strncasecmp
(
stbInfo
->
columns
[
i
].
dataType
,
"BIGINT"
,
6
))
{
"BIGINT"
,
strlen
(
"BIGINT"
)
))
{
dataLen
+=
snprintf
(
pstr
+
dataLen
,
maxLen
-
dataLen
,
"%"
PRId64
","
,
rand_bigint
());
}
else
if
(
0
==
strncasecmp
(
stbInfo
->
columns
[
i
].
dataType
,
"FLOAT"
,
5
))
{
"FLOAT"
,
strlen
(
"FLOAT"
)
))
{
dataLen
+=
snprintf
(
pstr
+
dataLen
,
maxLen
-
dataLen
,
"%f,"
,
rand_float
());
}
else
if
(
0
==
strncasecmp
(
stbInfo
->
columns
[
i
].
dataType
,
"DOUBLE"
,
6
))
{
"DOUBLE"
,
strlen
(
"DOUBLE"
)
))
{
dataLen
+=
snprintf
(
pstr
+
dataLen
,
maxLen
-
dataLen
,
"%f,"
,
rand_double
());
}
else
if
(
0
==
strncasecmp
(
stbInfo
->
columns
[
i
].
dataType
,
"SMALLINT"
,
8
))
{
"SMALLINT"
,
strlen
(
"SMALLINT"
)
))
{
dataLen
+=
snprintf
(
pstr
+
dataLen
,
maxLen
-
dataLen
,
"%d,"
,
rand_smallint
());
}
else
if
(
0
==
strncasecmp
(
stbInfo
->
columns
[
i
].
dataType
,
...
...
@@ -4732,46 +4818,38 @@ static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stb
}
static
int64_t
generateData
(
char
*
recBuf
,
char
**
data_type
,
int
num_of_cols
,
int
64_t
timestamp
,
int
lenOfBinary
)
{
int64_t
timestamp
,
int
lenOfBinary
)
{
memset
(
recBuf
,
0
,
MAX_DATA_SIZE
);
char
*
pstr
=
recBuf
;
pstr
+=
sprintf
(
pstr
,
"(%"
PRId64
,
timestamp
);
int
c
=
0
;
for
(;
c
<
MAX_NUM_DATATYPE
;
c
++
)
{
if
(
data_type
[
c
]
==
NULL
)
{
break
;
}
}
if
(
0
==
c
)
{
perror
(
"data type error!"
);
exit
(
-
1
);
}
int
columnCount
=
g_args
.
num_of_CPR
;
for
(
int
i
=
0
;
i
<
c
;
i
++
)
{
if
(
strcasecmp
(
data_type
[
i
%
c
],
"TINYINT"
)
==
0
)
{
for
(
int
i
=
0
;
i
<
c
olumnCount
;
i
++
)
{
if
(
strcasecmp
(
data_type
[
i
%
c
olumnCount
],
"TINYINT"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
",%d"
,
rand_tinyint
()
);
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
],
"SMALLINT"
)
==
0
)
{
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
olumnCount
],
"SMALLINT"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
",%d"
,
rand_smallint
());
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
],
"INT"
)
==
0
)
{
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
olumnCount
],
"INT"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
",%d"
,
rand_int
());
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
],
"BIGINT"
)
==
0
)
{
}
else
if
(
strcasecmp
(
data_type
[
i
%
columnCount
],
"BIGINT"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
",%"
PRId64
,
rand_bigint
());
}
else
if
(
strcasecmp
(
data_type
[
i
%
columnCount
],
"TIMESTAMP"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
",%"
PRId64
,
rand_bigint
());
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
],
"FLOAT"
)
==
0
)
{
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
olumnCount
],
"FLOAT"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
",%10.4f"
,
rand_float
());
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
],
"DOUBLE"
)
==
0
)
{
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
olumnCount
],
"DOUBLE"
)
==
0
)
{
double
t
=
rand_double
();
pstr
+=
sprintf
(
pstr
,
",%20.8f"
,
t
);
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
],
"BOOL"
)
==
0
)
{
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
olumnCount
],
"BOOL"
)
==
0
)
{
bool
b
=
rand_bool
()
&
1
;
pstr
+=
sprintf
(
pstr
,
",%s"
,
b
?
"true"
:
"false"
);
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
],
"BINARY"
)
==
0
)
{
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
olumnCount
],
"BINARY"
)
==
0
)
{
char
*
s
=
malloc
(
lenOfBinary
);
rand_string
(
s
,
lenOfBinary
);
pstr
+=
sprintf
(
pstr
,
",
\"
%s
\"
"
,
s
);
free
(
s
);
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
],
"NCHAR"
)
==
0
)
{
}
else
if
(
strcasecmp
(
data_type
[
i
%
c
olumnCount
],
"NCHAR"
)
==
0
)
{
char
*
s
=
malloc
(
lenOfBinary
);
rand_string
(
s
,
lenOfBinary
);
pstr
+=
sprintf
(
pstr
,
",
\"
%s
\"
"
,
s
);
...
...
@@ -4818,146 +4896,212 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
return
0
;
}
static
int
64_t
execInsert
(
threadInfo
*
pThreadInfo
,
char
*
buffer
,
uint64
_t
k
)
static
int
32_t
execInsert
(
threadInfo
*
pThreadInfo
,
uint32
_t
k
)
{
in
t
affectedRows
;
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
int32_
t
affectedRows
;
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
verbosePrint
(
"[%d] %s() LN%d %s
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
buffer
);
if
(
superTblInfo
)
{
if
(
0
==
strncasecmp
(
superTblInfo
->
insertMode
,
"taosc"
,
strlen
(
"taosc"
)))
{
affectedRows
=
queryDbExec
(
pThreadInfo
->
taos
,
buffer
,
INSERT_TYPE
,
false
);
}
else
if
(
0
==
strncasecmp
(
superTblInfo
->
insertMode
,
"rest"
,
strlen
(
"rest"
)))
{
if
(
0
!=
postProceSql
(
g_Dbs
.
host
,
&
g_Dbs
.
serv_addr
,
g_Dbs
.
port
,
buffer
,
NULL
/* not set result file */
))
{
affectedRows
=
-
1
;
printf
(
"========restful return fail, threadID[%d]
\n
"
,
pThreadInfo
->
threadID
);
}
else
{
affectedRows
=
k
;
}
}
else
{
errorPrint
(
"%s() LN%d: unknown insert mode: %s
\n
"
,
__func__
,
__LINE__
,
superTblInfo
->
insertMode
);
affectedRows
=
0
;
verbosePrint
(
"[%d] %s() LN%d %s
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
pThreadInfo
->
buffer
);
uint16_t
iface
;
if
(
superTblInfo
)
iface
=
superTblInfo
->
iface
;
else
iface
=
g_args
.
iface
;
debugPrint
(
"[%d] %s() LN%d %s
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
(
g_args
.
iface
==
TAOSC_IFACE
)
?
"taosc"
:
(
g_args
.
iface
==
REST_IFACE
)
?
"rest"
:
"stmt"
);
switch
(
iface
)
{
case
TAOSC_IFACE
:
affectedRows
=
queryDbExec
(
pThreadInfo
->
taos
,
pThreadInfo
->
buffer
,
INSERT_TYPE
,
false
);
break
;
case
REST_IFACE
:
if
(
0
!=
postProceSql
(
g_Dbs
.
host
,
&
g_Dbs
.
serv_addr
,
g_Dbs
.
port
,
pThreadInfo
->
buffer
,
NULL
/* not set result file */
))
{
affectedRows
=
-
1
;
printf
(
"========restful return fail, threadID[%d]
\n
"
,
pThreadInfo
->
threadID
);
}
else
{
affectedRows
=
k
;
}
break
;
case
STMT_IFACE
:
debugPrint
(
"%s() LN%d, stmt=%p"
,
__func__
,
__LINE__
,
pThreadInfo
->
stmt
);
if
(
0
!=
taos_stmt_execute
(
pThreadInfo
->
stmt
))
{
errorPrint
(
"%s() LN%d, failied to execute insert statement
\n
"
,
__func__
,
__LINE__
);
exit
(
-
1
);
}
affectedRows
=
k
;
break
;
default:
errorPrint
(
"%s() LN%d: unknown insert mode: %d
\n
"
,
__func__
,
__LINE__
,
superTblInfo
->
iface
);
affectedRows
=
0
;
}
}
else
{
affectedRows
=
queryDbExec
(
pThreadInfo
->
taos
,
buffer
,
INSERT_TYPE
,
false
);
}
return
affectedRows
;
return
affectedRows
;
}
static
void
getTableName
(
char
*
pTblName
,
threadInfo
*
pThreadInfo
,
uint64_t
tableSeq
)
{
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
if
((
superTblInfo
)
&&
(
AUTO_CREATE_SUBTBL
!=
superTblInfo
->
autoCreateTable
))
{
if
(
superTblInfo
->
childTblLimit
>
0
)
{
snprintf
(
pTblName
,
TSDB_TABLE_NAME_LEN
,
"%s"
,
superTblInfo
->
childTblName
+
(
tableSeq
-
superTblInfo
->
childTblOffset
)
*
TSDB_TABLE_NAME_LEN
);
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
if
(
superTblInfo
)
{
if
(
AUTO_CREATE_SUBTBL
!=
superTblInfo
->
autoCreateTable
)
{
if
(
superTblInfo
->
childTblLimit
>
0
)
{
snprintf
(
pTblName
,
TSDB_TABLE_NAME_LEN
,
"%s"
,
superTblInfo
->
childTblName
+
(
tableSeq
-
superTblInfo
->
childTblOffset
)
*
TSDB_TABLE_NAME_LEN
);
}
else
{
verbosePrint
(
"[%d] %s() LN%d: from=%"
PRIu64
" count=%"
PRId64
" seq=%"
PRIu64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
pThreadInfo
->
start_table_from
,
pThreadInfo
->
ntables
,
tableSeq
);
snprintf
(
pTblName
,
TSDB_TABLE_NAME_LEN
,
"%s"
,
superTblInfo
->
childTblName
+
tableSeq
*
TSDB_TABLE_NAME_LEN
);
}
}
else
{
snprintf
(
pTblName
,
TSDB_TABLE_NAME_LEN
,
"%s%"
PRIu64
""
,
superTblInfo
->
childTblPrefix
,
tableSeq
);
}
}
else
{
verbosePrint
(
"[%d] %s() LN%d: from=%"
PRIu64
" count=%"
PRId64
" seq=%"
PRIu64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
pThreadInfo
->
start_table_from
,
pThreadInfo
->
ntables
,
tableSeq
);
snprintf
(
pTblName
,
TSDB_TABLE_NAME_LEN
,
"%s"
,
superTblInfo
->
childTblName
+
tableSeq
*
TSDB_TABLE_NAME_LEN
);
snprintf
(
pTblName
,
TSDB_TABLE_NAME_LEN
,
"%s%"
PRIu64
""
,
g_args
.
tb_prefix
,
tableSeq
);
}
}
else
{
snprintf
(
pTblName
,
TSDB_TABLE_NAME_LEN
,
"%s%"
PRIu64
""
,
g_args
.
tb_prefix
,
tableSeq
);
}
}
static
int64_t
generateDataTail
(
SSuperTable
*
superTblInfo
,
uint64_t
batch
,
char
*
buffer
,
int64_t
remainderBufLen
,
int64_t
insertRows
,
uint64_t
startFrom
,
int64_t
startTime
,
int64_t
*
pSamplePos
,
int64_t
*
dataLen
)
{
uint64_t
len
=
0
;
uint32_t
ncols_per_record
=
1
;
// count first col ts
static
int32_t
generateDataTailWithoutStb
(
uint32_t
batch
,
char
*
buffer
,
int64_t
remainderBufLen
,
int64_t
insertRows
,
uint64_t
recordFrom
,
int64_t
startTime
,
/* int64_t *pSamplePos, */
int64_t
*
dataLen
)
{
uint64_t
len
=
0
;
char
*
pstr
=
buffer
;
if
(
superTblInfo
==
NULL
)
{
uint32_t
datatypeSeq
=
0
;
while
(
g_args
.
datatype
[
datatypeSeq
])
{
datatypeSeq
++
;
ncols_per_record
++
;
verbosePrint
(
"%s() LN%d batch=%d
\n
"
,
__func__
,
__LINE__
,
batch
);
int32_t
k
=
0
;
for
(
k
=
0
;
k
<
batch
;)
{
char
data
[
MAX_DATA_SIZE
];
memset
(
data
,
0
,
MAX_DATA_SIZE
);
int64_t
retLen
=
0
;
char
**
data_type
=
g_args
.
datatype
;
int
lenOfBinary
=
g_args
.
len_of_binary
;
retLen
=
generateData
(
data
,
data_type
,
startTime
+
getTSRandTail
(
(
int64_t
)
DEFAULT_TIMESTAMP_STEP
,
k
,
g_args
.
disorderRatio
,
g_args
.
disorderRange
),
lenOfBinary
);
if
(
len
>
remainderBufLen
)
break
;
pstr
+=
sprintf
(
pstr
,
"%s"
,
data
);
k
++
;
len
+=
retLen
;
remainderBufLen
-=
retLen
;
verbosePrint
(
"%s() LN%d len=%"
PRIu64
" k=%d
\n
buffer=%s
\n
"
,
__func__
,
__LINE__
,
len
,
k
,
buffer
);
recordFrom
++
;
if
(
recordFrom
>=
insertRows
)
{
break
;
}
}
verbosePrint
(
"%s() LN%d batch=%"
PRIu64
"
\n
"
,
__func__
,
__LINE__
,
batch
);
*
dataLen
=
len
;
return
k
;
}
static
int64_t
getTSRandTail
(
int64_t
timeStampStep
,
int32_t
seq
,
int
disorderRatio
,
int
disorderRange
)
{
int64_t
randTail
=
timeStampStep
*
seq
;
if
(
disorderRatio
>
0
)
{
int
rand_num
=
taosRandom
()
%
100
;
if
(
rand_num
<
disorderRatio
)
{
randTail
=
(
randTail
+
(
taosRandom
()
%
disorderRange
+
1
))
*
(
-
1
);
debugPrint
(
"rand data generated, back %"
PRId64
"
\n
"
,
randTail
);
}
}
return
randTail
;
}
static
int32_t
generateStbDataTail
(
SSuperTable
*
superTblInfo
,
uint32_t
batch
,
char
*
buffer
,
int64_t
remainderBufLen
,
int64_t
insertRows
,
uint64_t
recordFrom
,
int64_t
startTime
,
int64_t
*
pSamplePos
,
int64_t
*
dataLen
)
{
uint64_t
len
=
0
;
char
*
pstr
=
buffer
;
bool
tsRand
;
if
((
superTblInfo
)
&&
(
0
==
strncasecmp
(
superTblInfo
->
dataSource
,
"rand"
,
strlen
(
"rand"
))))
{
tsRand
=
true
;
}
else
{
tsRand
=
false
;
if
(
0
==
strncasecmp
(
superTblInfo
->
dataSource
,
"rand"
,
strlen
(
"rand"
)))
{
tsRand
=
true
;
}
else
{
tsRand
=
false
;
}
verbosePrint
(
"%s() LN%d batch=%u
\n
"
,
__func__
,
__LINE__
,
batch
);
uint64
_t
k
=
0
;
int32
_t
k
=
0
;
for
(
k
=
0
;
k
<
batch
;)
{
char
data
[
MAX_DATA_SIZE
];
memset
(
data
,
0
,
MAX_DATA_SIZE
);
int64_t
retLen
=
0
;
if
(
superTblInfo
)
{
if
(
tsRand
)
{
retLen
=
generateRowData
(
data
,
startTime
+
getTSRandTail
(
superTblInfo
->
timeStampStep
,
k
,
superTblInfo
->
disorderRatio
,
superTblInfo
->
disorderRange
),
superTblInfo
);
}
else
{
retLen
=
getRowDataFromSample
(
data
,
remainderBufLen
,
startTime
+
superTblInfo
->
timeStampStep
*
k
,
superTblInfo
,
pSamplePos
);
}
if
(
retLen
>
remainderBufLen
)
{
break
;
}
pstr
+=
snprintf
(
pstr
,
retLen
+
1
,
"%s"
,
data
);
k
++
;
len
+=
retLen
;
remainderBufLen
-=
retLen
;
}
else
{
char
**
data_type
=
g_args
.
datatype
;
int
lenOfBinary
=
g_args
.
len_of_binary
;
retLen
=
generateData
(
data
,
data_type
,
ncols_per_record
,
if
(
tsRand
)
{
retLen
=
generateStbRowData
(
superTblInfo
,
data
,
startTime
+
getTSRandTail
(
DEFAULT_TIMESTAMP_STEP
,
k
,
g_args
.
disorderRatio
,
g_args
.
disorderRange
),
lenOfBinary
);
if
(
len
>
remainderBufLen
)
break
;
superTblInfo
->
timeStampStep
,
k
,
superTblInfo
->
disorderRatio
,
superTblInfo
->
disorderRange
)
);
}
else
{
retLen
=
getRowDataFromSample
(
data
,
remainderBufLen
,
startTime
+
superTblInfo
->
timeStampStep
*
k
,
superTblInfo
,
pSamplePos
);
}
pstr
+=
sprintf
(
pstr
,
"%s"
,
data
);
k
++
;
len
+=
retLen
;
remainderBufLen
-=
retLen
;
if
(
retLen
>
remainderBufLen
)
{
break
;
}
verbosePrint
(
"%s() LN%d len=%"
PRIu64
" k=%"
PRIu64
"
\n
buffer=%s
\n
"
,
pstr
+=
snprintf
(
pstr
,
retLen
+
1
,
"%s"
,
data
);
k
++
;
len
+=
retLen
;
remainderBufLen
-=
retLen
;
verbosePrint
(
"%s() LN%d len=%"
PRIu64
" k=%u
\n
buffer=%s
\n
"
,
__func__
,
__LINE__
,
len
,
k
,
buffer
);
start
From
++
;
record
From
++
;
if
(
start
From
>=
insertRows
)
{
if
(
record
From
>=
insertRows
)
{
break
;
}
}
...
...
@@ -4966,17 +5110,42 @@ static int64_t generateDataTail(
return
k
;
}
static
int
generateSQLHead
(
char
*
tableName
,
int32_t
tableSeq
,
threadInfo
*
pThreadInfo
,
SSuperTable
*
superTblInfo
,
static
int
generateSQLHeadWithoutStb
(
char
*
tableName
,
char
*
dbName
,
char
*
buffer
,
int
remainderBufLen
)
{
int
len
;
#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into ..
char
headBuf
[
HEAD_BUFF_LEN
];
if
(
superTblInfo
)
{
if
(
AUTO_CREATE_SUBTBL
==
superTblInfo
->
autoCreateTable
)
{
len
=
snprintf
(
headBuf
,
HEAD_BUFF_LEN
,
"%s.%s values"
,
dbName
,
tableName
);
if
(
len
>
remainderBufLen
)
return
-
1
;
tstrncpy
(
buffer
,
headBuf
,
len
+
1
);
return
len
;
}
static
int
generateStbSQLHead
(
SSuperTable
*
superTblInfo
,
char
*
tableName
,
int32_t
tableSeq
,
char
*
dbName
,
char
*
buffer
,
int
remainderBufLen
)
{
int
len
;
char
headBuf
[
HEAD_BUFF_LEN
];
if
((
AUTO_CREATE_SUBTBL
==
superTblInfo
->
autoCreateTable
)
&&
(
TBL_ALREADY_EXISTS
!=
superTblInfo
->
childTblExists
))
{
char
*
tagsValBuf
=
NULL
;
if
(
0
==
superTblInfo
->
tagSource
)
{
tagsValBuf
=
generateTagVaulesForStb
(
superTblInfo
,
tableSeq
);
...
...
@@ -4995,9 +5164,9 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
headBuf
,
HEAD_BUFF_LEN
,
"%s.%s using %s.%s tags %s values"
,
pThreadInfo
->
db_n
ame
,
dbN
ame
,
tableName
,
pThreadInfo
->
db_n
ame
,
dbN
ame
,
superTblInfo
->
sTblName
,
tagsValBuf
);
tmfree
(
tagsValBuf
);
...
...
@@ -5006,22 +5175,14 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
headBuf
,
HEAD_BUFF_LEN
,
"%s.%s values"
,
pThreadInfo
->
db_n
ame
,
dbN
ame
,
tableName
);
}
else
{
len
=
snprintf
(
headBuf
,
HEAD_BUFF_LEN
,
"%s.%s values"
,
pThreadInfo
->
db_name
,
tableName
);
}
}
else
{
len
=
snprintf
(
headBuf
,
HEAD_BUFF_LEN
,
"%s.%s values"
,
pThreadInfo
->
db_name
,
dbName
,
tableName
);
}
...
...
@@ -5033,8 +5194,11 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
return
len
;
}
static
int64_t
generateInterlaceDataBuffer
(
char
*
tableName
,
uint64_t
batchPerTbl
,
uint64_t
i
,
uint64_t
batchPerTblTimes
,
static
int32_t
generateStbInterlaceData
(
SSuperTable
*
superTblInfo
,
char
*
tableName
,
uint32_t
batchPerTbl
,
uint64_t
i
,
uint32_t
batchPerTblTimes
,
uint64_t
tableSeq
,
threadInfo
*
pThreadInfo
,
char
*
buffer
,
int64_t
insertRows
,
...
...
@@ -5043,10 +5207,11 @@ static int64_t generateInterlaceDataBuffer(
{
assert
(
buffer
);
char
*
pstr
=
buffer
;
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
int
headLen
=
generateSQLHead
(
tableName
,
tableSeq
,
pThreadInfo
,
superTblInfo
,
pstr
,
*
pRemainderBufLen
);
int
headLen
=
generateStbSQLHead
(
superTblInfo
,
tableName
,
tableSeq
,
pThreadInfo
->
db_name
,
pstr
,
*
pRemainderBufLen
);
if
(
headLen
<=
0
)
{
return
0
;
...
...
@@ -5060,29 +5225,25 @@ static int64_t generateInterlaceDataBuffer(
int64_t
dataLen
=
0
;
verbosePrint
(
"[%d] %s() LN%d i=%"
PRIu64
" batchPerTblTimes=%
"
PRIu64
" batchPerTbl = %"
PRIu64
"
\n
"
,
verbosePrint
(
"[%d] %s() LN%d i=%"
PRIu64
" batchPerTblTimes=%
u batchPerTbl = %u
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
i
,
batchPerTblTimes
,
batchPerTbl
);
if
(
superTblInfo
)
{
if
(
0
==
strncasecmp
(
superTblInfo
->
startTimestamp
,
"now"
,
3
))
{
if
(
0
==
strncasecmp
(
superTblInfo
->
startTimestamp
,
"now"
,
3
))
{
startTime
=
taosGetTimestamp
(
pThreadInfo
->
time_precision
);
}
}
else
{
startTime
=
1500000000000
;
}
int
64_t
k
=
generate
DataTail
(
superTblInfo
,
batchPerTbl
,
pstr
,
*
pRemainderBufLen
,
insertRows
,
0
,
startTime
,
&
(
pThreadInfo
->
samplePos
),
&
dataLen
);
int
32_t
k
=
generateStb
DataTail
(
superTblInfo
,
batchPerTbl
,
pstr
,
*
pRemainderBufLen
,
insertRows
,
0
,
startTime
,
&
(
pThreadInfo
->
samplePos
),
&
dataLen
);
if
(
k
==
batchPerTbl
)
{
pstr
+=
dataLen
;
*
pRemainderBufLen
-=
dataLen
;
}
else
{
debugPrint
(
"%s() LN%d, generated data tail: %
"
PRIu64
", not equal batch per table: %"
PRIu64
"
\n
"
,
debugPrint
(
"%s() LN%d, generated data tail: %
u, not equal batch per table: %u
\n
"
,
__func__
,
__LINE__
,
k
,
batchPerTbl
);
pstr
-=
headLen
;
pstr
[
0
]
=
'\0'
;
...
...
@@ -5092,50 +5253,361 @@ static int64_t generateInterlaceDataBuffer(
return
k
;
}
static
int64_t
getTSRandTail
(
int64_t
timeStampStep
,
int32_t
seq
,
int
disorderRatio
,
int
disorderRange
)
static
int64_t
generateInterlaceDataWithoutStb
(
char
*
tableName
,
uint32_t
batch
,
uint64_t
tableSeq
,
char
*
dbName
,
char
*
buffer
,
int64_t
insertRows
,
int64_t
startTime
,
uint64_t
*
pRemainderBufLen
)
{
int64_t
randTail
=
timeStampStep
*
seq
;
if
(
disorderRatio
>
0
)
{
int
rand_num
=
taosRandom
()
%
100
;
if
(
rand_num
<
disorderRatio
)
{
randTail
=
(
randTail
+
(
taosRandom
()
%
disorderRange
+
1
))
*
(
-
1
);
debugPrint
(
"rand data generated, back %"
PRId64
"
\n
"
,
randTail
);
assert
(
buffer
);
char
*
pstr
=
buffer
;
int
headLen
=
generateSQLHeadWithoutStb
(
tableName
,
dbName
,
pstr
,
*
pRemainderBufLen
);
if
(
headLen
<=
0
)
{
return
0
;
}
pstr
+=
headLen
;
*
pRemainderBufLen
-=
headLen
;
int64_t
dataLen
=
0
;
int32_t
k
=
generateDataTailWithoutStb
(
batch
,
pstr
,
*
pRemainderBufLen
,
insertRows
,
0
,
startTime
,
&
dataLen
);
if
(
k
==
batch
)
{
pstr
+=
dataLen
;
*
pRemainderBufLen
-=
dataLen
;
}
else
{
debugPrint
(
"%s() LN%d, generated data tail: %d, not equal batch per table: %u
\n
"
,
__func__
,
__LINE__
,
k
,
batch
);
pstr
-=
headLen
;
pstr
[
0
]
=
'\0'
;
k
=
0
;
}
return
k
;
}
#if STMT_IFACE_ENABLED == 1
static
int32_t
prepareStmtBindArrayByType
(
TAOS_BIND
*
bind
,
char
*
dataType
,
int32_t
dataLen
,
char
**
ptr
)
{
if
(
0
==
strncasecmp
(
dataType
,
"BINARY"
,
strlen
(
"BINARY"
)))
{
if
(
dataLen
>
TSDB_MAX_BINARY_LEN
)
{
errorPrint
(
"binary length overflow, max size:%u
\n
"
,
(
uint32_t
)
TSDB_MAX_BINARY_LEN
);
return
-
1
;
}
char
*
bind_binary
=
(
char
*
)
*
ptr
;
rand_string
(
bind_binary
,
dataLen
);
bind
->
buffer_type
=
TSDB_DATA_TYPE_BINARY
;
bind
->
buffer_length
=
dataLen
;
bind
->
buffer
=
bind_binary
;
bind
->
length
=
&
bind
->
buffer_length
;
bind
->
is_null
=
NULL
;
*
ptr
+=
bind
->
buffer_length
;
}
else
if
(
0
==
strncasecmp
(
dataType
,
"NCHAR"
,
strlen
(
"NCHAR"
)))
{
if
(
dataLen
>
TSDB_MAX_BINARY_LEN
)
{
errorPrint
(
"nchar length overflow, max size:%u
\n
"
,
(
uint32_t
)
TSDB_MAX_BINARY_LEN
);
return
-
1
;
}
char
*
bind_nchar
=
(
char
*
)
*
ptr
;
rand_string
(
bind_nchar
,
dataLen
);
bind
->
buffer_type
=
TSDB_DATA_TYPE_NCHAR
;
bind
->
buffer_length
=
strlen
(
bind_nchar
);
bind
->
buffer
=
bind_nchar
;
bind
->
length
=
&
bind
->
buffer_length
;
bind
->
is_null
=
NULL
;
*
ptr
+=
bind
->
buffer_length
;
}
else
if
(
0
==
strncasecmp
(
dataType
,
"INT"
,
strlen
(
"INT"
)))
{
int32_t
*
bind_int
=
(
int32_t
*
)
*
ptr
;
*
bind_int
=
rand_int
();
bind
->
buffer_type
=
TSDB_DATA_TYPE_INT
;
bind
->
buffer_length
=
sizeof
(
int32_t
);
bind
->
buffer
=
bind_int
;
bind
->
length
=
&
bind
->
buffer_length
;
bind
->
is_null
=
NULL
;
*
ptr
+=
bind
->
buffer_length
;
}
else
if
(
0
==
strncasecmp
(
dataType
,
"BIGINT"
,
strlen
(
"BIGINT"
)))
{
int64_t
*
bind_bigint
=
(
int64_t
*
)
*
ptr
;
*
bind_bigint
=
rand_bigint
();
bind
->
buffer_type
=
TSDB_DATA_TYPE_BIGINT
;
bind
->
buffer_length
=
sizeof
(
int64_t
);
bind
->
buffer
=
bind_bigint
;
bind
->
length
=
&
bind
->
buffer_length
;
bind
->
is_null
=
NULL
;
*
ptr
+=
bind
->
buffer_length
;
}
else
if
(
0
==
strncasecmp
(
dataType
,
"FLOAT"
,
strlen
(
"FLOAT"
)))
{
float
*
bind_float
=
(
float
*
)
*
ptr
;
*
bind_float
=
rand_float
();
bind
->
buffer_type
=
TSDB_DATA_TYPE_FLOAT
;
bind
->
buffer_length
=
sizeof
(
float
);
bind
->
buffer
=
bind_float
;
bind
->
length
=
&
bind
->
buffer_length
;
bind
->
is_null
=
NULL
;
*
ptr
+=
bind
->
buffer_length
;
}
else
if
(
0
==
strncasecmp
(
dataType
,
"DOUBLE"
,
strlen
(
"DOUBLE"
)))
{
double
*
bind_double
=
(
double
*
)
*
ptr
;
*
bind_double
=
rand_double
();
bind
->
buffer_type
=
TSDB_DATA_TYPE_DOUBLE
;
bind
->
buffer_length
=
sizeof
(
double
);
bind
->
buffer
=
bind_double
;
bind
->
length
=
&
bind
->
buffer_length
;
bind
->
is_null
=
NULL
;
*
ptr
+=
bind
->
buffer_length
;
}
else
if
(
0
==
strncasecmp
(
dataType
,
"SMALLINT"
,
strlen
(
"SMALLINT"
)))
{
int16_t
*
bind_smallint
=
(
int16_t
*
)
*
ptr
;
*
bind_smallint
=
rand_smallint
();
bind
->
buffer_type
=
TSDB_DATA_TYPE_SMALLINT
;
bind
->
buffer_length
=
sizeof
(
int16_t
);
bind
->
buffer
=
bind_smallint
;
bind
->
length
=
&
bind
->
buffer_length
;
bind
->
is_null
=
NULL
;
*
ptr
+=
bind
->
buffer_length
;
}
else
if
(
0
==
strncasecmp
(
dataType
,
"TINYINT"
,
strlen
(
"TINYINT"
)))
{
int8_t
*
bind_tinyint
=
(
int8_t
*
)
*
ptr
;
*
bind_tinyint
=
rand_tinyint
();
bind
->
buffer_type
=
TSDB_DATA_TYPE_TINYINT
;
bind
->
buffer_length
=
sizeof
(
int8_t
);
bind
->
buffer
=
bind_tinyint
;
bind
->
length
=
&
bind
->
buffer_length
;
bind
->
is_null
=
NULL
;
*
ptr
+=
bind
->
buffer_length
;
}
else
if
(
0
==
strncasecmp
(
dataType
,
"BOOL"
,
strlen
(
"BOOL"
)))
{
int8_t
*
bind_bool
=
(
int8_t
*
)
*
ptr
;
*
bind_bool
=
rand_bool
();
bind
->
buffer_type
=
TSDB_DATA_TYPE_BOOL
;
bind
->
buffer_length
=
sizeof
(
int8_t
);
bind
->
buffer
=
bind_bool
;
bind
->
length
=
&
bind
->
buffer_length
;
bind
->
is_null
=
NULL
;
*
ptr
+=
bind
->
buffer_length
;
}
else
if
(
0
==
strncasecmp
(
dataType
,
"TIMESTAMP"
,
strlen
(
"TIMESTAMP"
)))
{
int64_t
*
bind_ts2
=
(
int64_t
*
)
*
ptr
;
*
bind_ts2
=
rand_bigint
();
bind
->
buffer_type
=
TSDB_DATA_TYPE_TIMESTAMP
;
bind
->
buffer_length
=
sizeof
(
int64_t
);
bind
->
buffer
=
bind_ts2
;
bind
->
length
=
&
bind
->
buffer_length
;
bind
->
is_null
=
NULL
;
*
ptr
+=
bind
->
buffer_length
;
}
else
{
errorPrint
(
"No support data type: %s
\n
"
,
dataType
);
return
-
1
;
}
return
randTail
;
return
0
;
}
static
int64_t
generateProgressiveDataBuffer
(
static
int32_t
prepareStmtWithoutStb
(
TAOS_STMT
*
stmt
,
char
*
tableName
,
int64_t
tableSeq
,
threadInfo
*
pThreadInfo
,
char
*
buffer
,
uint32_t
batch
,
int64_t
insertRows
,
uint64_t
startFrom
,
int64_t
startTime
,
int64_t
*
pSamplePos
,
int64_t
*
pRemainderBufLen
)
int64_t
recordFrom
,
int64_t
startTime
)
{
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
int
ret
=
taos_stmt_set_tbname
(
stmt
,
tableName
);
if
(
ret
!=
0
)
{
errorPrint
(
"failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s
\n
"
,
tableName
,
ret
,
taos_errstr
(
NULL
));
return
ret
;
}
int
ncols_per_record
=
1
;
// count first col ts
char
**
data_type
=
g_args
.
datatype
;
if
(
superTblInfo
==
NULL
)
{
int
datatypeSeq
=
0
;
while
(
g_args
.
datatype
[
datatypeSeq
])
{
datatypeSeq
++
;
ncols_per_record
++
;
char
*
bindArray
=
malloc
(
sizeof
(
TAOS_BIND
)
*
(
g_args
.
num_of_CPR
+
1
));
if
(
bindArray
==
NULL
)
{
errorPrint
(
"Failed to allocate %d bind params
\n
"
,
(
g_args
.
num_of_CPR
+
1
));
return
-
1
;
}
int32_t
k
=
0
;
for
(
k
=
0
;
k
<
batch
;)
{
/* columnCount + 1 (ts) */
char
data
[
MAX_DATA_SIZE
];
memset
(
data
,
0
,
MAX_DATA_SIZE
);
char
*
ptr
=
data
;
TAOS_BIND
*
bind
=
(
TAOS_BIND
*
)(
bindArray
+
0
);
int64_t
*
bind_ts
;
bind_ts
=
(
int64_t
*
)
ptr
;
bind
->
buffer_type
=
TSDB_DATA_TYPE_TIMESTAMP
;
*
bind_ts
=
startTime
+
getTSRandTail
(
(
int64_t
)
DEFAULT_TIMESTAMP_STEP
,
k
,
g_args
.
disorderRatio
,
g_args
.
disorderRange
);
bind
->
buffer_length
=
sizeof
(
int64_t
);
bind
->
buffer
=
bind_ts
;
bind
->
length
=
&
bind
->
buffer_length
;
bind
->
is_null
=
NULL
;
ptr
+=
bind
->
buffer_length
;
for
(
int
i
=
0
;
i
<
g_args
.
num_of_CPR
;
i
++
)
{
bind
=
(
TAOS_BIND
*
)((
char
*
)
bindArray
+
(
sizeof
(
TAOS_BIND
)
*
(
i
+
1
)));
if
(
-
1
==
prepareStmtBindArrayByType
(
bind
,
data_type
[
i
],
g_args
.
len_of_binary
,
&
ptr
))
{
return
-
1
;
}
}
taos_stmt_bind_param
(
stmt
,
(
TAOS_BIND
*
)
bindArray
);
// if msg > 3MB, break
taos_stmt_add_batch
(
stmt
);
k
++
;
recordFrom
++
;
if
(
recordFrom
>=
insertRows
)
{
break
;
}
}
return
k
;
}
static
int32_t
prepareStbStmt
(
SSuperTable
*
stbInfo
,
TAOS_STMT
*
stmt
,
char
*
tableName
,
uint32_t
batch
,
uint64_t
insertRows
,
uint64_t
recordFrom
,
int64_t
startTime
,
char
*
buffer
)
{
int
ret
=
taos_stmt_set_tbname
(
stmt
,
tableName
);
if
(
ret
!=
0
)
{
errorPrint
(
"failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s
\n
"
,
tableName
,
ret
,
taos_errstr
(
NULL
));
return
ret
;
}
char
*
bindArray
=
malloc
(
sizeof
(
TAOS_BIND
)
*
(
stbInfo
->
columnCount
+
1
));
if
(
bindArray
==
NULL
)
{
errorPrint
(
"Failed to allocate %d bind params
\n
"
,
(
stbInfo
->
columnCount
+
1
));
return
-
1
;
}
bool
tsRand
;
if
(
0
==
strncasecmp
(
stbInfo
->
dataSource
,
"rand"
,
strlen
(
"rand"
)))
{
tsRand
=
true
;
}
else
{
tsRand
=
false
;
}
}
uint32_t
k
;
for
(
k
=
0
;
k
<
batch
;)
{
/* columnCount + 1 (ts) */
char
data
[
MAX_DATA_SIZE
];
memset
(
data
,
0
,
MAX_DATA_SIZE
);
char
*
ptr
=
data
;
TAOS_BIND
*
bind
=
(
TAOS_BIND
*
)(
bindArray
+
0
);
int64_t
*
bind_ts
;
bind_ts
=
(
int64_t
*
)
ptr
;
bind
->
buffer_type
=
TSDB_DATA_TYPE_TIMESTAMP
;
if
(
tsRand
)
{
*
bind_ts
=
startTime
+
getTSRandTail
(
stbInfo
->
timeStampStep
,
k
,
stbInfo
->
disorderRatio
,
stbInfo
->
disorderRange
);
}
else
{
*
bind_ts
=
startTime
+
stbInfo
->
timeStampStep
*
k
;
}
bind
->
buffer_length
=
sizeof
(
int64_t
);
bind
->
buffer
=
bind_ts
;
bind
->
length
=
&
bind
->
buffer_length
;
bind
->
is_null
=
NULL
;
ptr
+=
bind
->
buffer_length
;
for
(
int
i
=
0
;
i
<
stbInfo
->
columnCount
;
i
++
)
{
bind
=
(
TAOS_BIND
*
)((
char
*
)
bindArray
+
(
sizeof
(
TAOS_BIND
)
*
(
i
+
1
)));
if
(
-
1
==
prepareStmtBindArrayByType
(
bind
,
stbInfo
->
columns
[
i
].
dataType
,
stbInfo
->
columns
[
i
].
dataLen
,
&
ptr
))
{
return
-
1
;
}
}
taos_stmt_bind_param
(
stmt
,
(
TAOS_BIND
*
)
bindArray
);
// if msg > 3MB, break
taos_stmt_add_batch
(
stmt
);
k
++
;
recordFrom
++
;
if
(
recordFrom
>=
insertRows
)
{
break
;
}
}
return
k
;
}
#endif
static
int32_t
generateStbProgressiveData
(
SSuperTable
*
superTblInfo
,
char
*
tableName
,
int64_t
tableSeq
,
char
*
dbName
,
char
*
buffer
,
int64_t
insertRows
,
uint64_t
recordFrom
,
int64_t
startTime
,
int64_t
*
pSamplePos
,
int64_t
*
pRemainderBufLen
)
{
assert
(
buffer
!=
NULL
);
char
*
pstr
=
buffer
;
int64_t
k
=
0
;
memset
(
buffer
,
0
,
*
pRemainderBufLen
);
int64_t
headLen
=
generateSQLHead
(
tableName
,
tableSeq
,
pThreadInfo
,
superTblInfo
,
int64_t
headLen
=
generateStbSQLHead
(
superTblInfo
,
tableName
,
tableSeq
,
dbName
,
buffer
,
*
pRemainderBufLen
);
if
(
headLen
<=
0
)
{
...
...
@@ -5145,29 +5617,64 @@ static int64_t generateProgressiveDataBuffer(
*
pRemainderBufLen
-=
headLen
;
int64_t
dataLen
;
k
=
generateDataTail
(
superTblInfo
,
g_args
.
num_of_RPR
,
pstr
,
*
pRemainderBufLen
,
insertRows
,
startFrom
,
return
generateStbDataTail
(
superTblInfo
,
g_args
.
num_of_RPR
,
pstr
,
*
pRemainderBufLen
,
insertRows
,
recordFrom
,
startTime
,
pSamplePos
,
&
dataLen
);
}
return
k
;
static
int32_t
generateProgressiveDataWithoutStb
(
char
*
tableName
,
/* int64_t tableSeq, */
threadInfo
*
pThreadInfo
,
char
*
buffer
,
int64_t
insertRows
,
uint64_t
recordFrom
,
int64_t
startTime
,
/*int64_t *pSamplePos, */
int64_t
*
pRemainderBufLen
)
{
assert
(
buffer
!=
NULL
);
char
*
pstr
=
buffer
;
memset
(
buffer
,
0
,
*
pRemainderBufLen
);
int64_t
headLen
=
generateSQLHeadWithoutStb
(
tableName
,
pThreadInfo
->
db_name
,
buffer
,
*
pRemainderBufLen
);
if
(
headLen
<=
0
)
{
return
0
;
}
pstr
+=
headLen
;
*
pRemainderBufLen
-=
headLen
;
int64_t
dataLen
;
return
generateDataTailWithoutStb
(
g_args
.
num_of_RPR
,
pstr
,
*
pRemainderBufLen
,
insertRows
,
recordFrom
,
startTime
,
/*pSamplePos, */
&
dataLen
);
}
static
void
printStatPerThread
(
threadInfo
*
pThreadInfo
)
{
fprintf
(
stderr
,
"====thread[%d] completed total inserted rows: %"
PRIu64
", total affected rows: %"
PRIu64
". %.2f records/second====
\n
"
,
fprintf
(
stderr
,
"====thread[%d] completed total inserted rows: %"
PRIu64
", total affected rows: %"
PRIu64
". %.2f records/second====
\n
"
,
pThreadInfo
->
threadID
,
pThreadInfo
->
totalInsertRows
,
pThreadInfo
->
totalAffectedRows
,
(
double
)(
pThreadInfo
->
totalAffectedRows
/
(
pThreadInfo
->
totalDelay
/
1000
.
0
)));
}
// sync write interlace data
static
void
*
syncWriteInterlace
(
threadInfo
*
pThreadInfo
)
{
debugPrint
(
"[%d] %s() LN%d: ### interlace write
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
);
int64_t
insertRows
;
uint64_t
interlaceRows
;
uint32_t
interlaceRows
;
uint64_t
maxSqlLen
;
int64_t
nTimeStampStep
;
uint64_t
insert_interval
;
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
...
...
@@ -5180,45 +5687,48 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
}
else
{
interlaceRows
=
superTblInfo
->
interlaceRows
;
}
maxSqlLen
=
superTblInfo
->
maxSqlLen
;
nTimeStampStep
=
superTblInfo
->
timeStampStep
;
insert_interval
=
superTblInfo
->
insertInterval
;
}
else
{
insertRows
=
g_args
.
num_of_DPT
;
interlaceRows
=
g_args
.
interlace_rows
;
maxSqlLen
=
g_args
.
max_sql_len
;
nTimeStampStep
=
DEFAULT_TIMESTAMP_STEP
;
insert_interval
=
g_args
.
insert_interval
;
}
debugPrint
(
"[%d] %s() LN%d: start_table_from=%"
PRIu64
" ntables=%"
PRId64
" insertRows=%"
PRIu64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
pThreadInfo
->
start_table_from
,
pThreadInfo
->
ntables
,
insertRows
);
if
(
interlaceRows
>
insertRows
)
interlaceRows
=
insertRows
;
if
(
interlaceRows
>
g_args
.
num_of_RPR
)
interlaceRows
=
g_args
.
num_of_RPR
;
int
insertMode
;
uint32_t
batchPerTbl
=
interlaceRows
;
uint32_t
batchPerTblTimes
;
if
(
interlaceRows
>
0
)
{
insertMode
=
INTERLACE_INSERT_MODE
;
if
((
interlaceRows
>
0
)
&&
(
pThreadInfo
->
ntables
>
1
))
{
batchPerTblTimes
=
g_args
.
num_of_RPR
/
interlaceRows
;
}
else
{
insertMode
=
PROGRESSIVE_INSERT_MODE
;
batchPerTblTimes
=
1
;
}
// TODO: prompt tbl count multple interlace rows and batch
//
uint64_t
maxSqlLen
=
superTblInfo
?
superTblInfo
->
maxSqlLen
:
g_args
.
max_sql_len
;
char
*
buffer
=
calloc
(
maxSqlLen
,
1
);
if
(
NULL
==
buffer
)
{
pThreadInfo
->
buffer
=
calloc
(
maxSqlLen
,
1
);
if
(
NULL
==
pThreadInfo
->
buffer
)
{
errorPrint
(
"%s() LN%d, Failed to alloc %"
PRIu64
" Bytes, reason:%s
\n
"
,
__func__
,
__LINE__
,
maxSqlLen
,
strerror
(
errno
));
return
NULL
;
}
char
tableName
[
TSDB_TABLE_NAME_LEN
];
pThreadInfo
->
totalInsertRows
=
0
;
pThreadInfo
->
totalAffectedRows
=
0
;
int64_t
nTimeStampStep
=
superTblInfo
?
superTblInfo
->
timeStampStep
:
DEFAULT_TIMESTAMP_STEP
;
uint64_t
insert_interval
=
superTblInfo
?
superTblInfo
->
insertInterval
:
g_args
.
insert_interval
;
uint64_t
st
=
0
;
uint64_t
et
=
UINT64_MAX
;
...
...
@@ -5227,69 +5737,98 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
uint64_t
endTs
;
uint64_t
tableSeq
=
pThreadInfo
->
start_table_from
;
debugPrint
(
"[%d] %s() LN%d: start_table_from=%"
PRIu64
" ntables=%"
PRId64
" insertRows=%"
PRIu64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
pThreadInfo
->
start_table_from
,
pThreadInfo
->
ntables
,
insertRows
);
int64_t
startTime
=
pThreadInfo
->
start_time
;
uint64_t
batchPerTbl
=
interlaceRows
;
uint64_t
batchPerTblTimes
;
if
((
interlaceRows
>
0
)
&&
(
pThreadInfo
->
ntables
>
1
))
{
batchPerTblTimes
=
g_args
.
num_of_RPR
/
interlaceRows
;
}
else
{
batchPerTblTimes
=
1
;
}
uint64_t
generatedRecPerTbl
=
0
;
bool
flagSleep
=
true
;
uint64_t
sleepTimeTotal
=
0
;
char
*
strInsertInto
=
"insert into "
;
int
nInsertBufLen
=
strlen
(
strInsertInto
);
while
(
pThreadInfo
->
totalInsertRows
<
pThreadInfo
->
ntables
*
insertRows
)
{
if
((
flagSleep
)
&&
(
insert_interval
))
{
st
=
taosGetTimestampMs
();
flagSleep
=
false
;
}
// generate data
memset
(
buffer
,
0
,
maxSqlLen
);
memset
(
pThreadInfo
->
buffer
,
0
,
maxSqlLen
);
uint64_t
remainderBufLen
=
maxSqlLen
;
char
*
pstr
=
buffer
;
char
*
pstr
=
pThreadInfo
->
buffer
;
int
len
=
snprintf
(
pstr
,
nInsertBufLen
+
1
,
"%s"
,
strInsertInto
);
int
len
=
snprintf
(
pstr
,
strlen
(
STR_INSERT_INTO
)
+
1
,
"%s"
,
STR_INSERT_INTO
);
pstr
+=
len
;
remainderBufLen
-=
len
;
uint64_t
recOfBatch
=
0
;
uint32_t
recOfBatch
=
0
;
for
(
uint32_t
i
=
0
;
i
<
batchPerTblTimes
;
i
++
)
{
char
tableName
[
TSDB_TABLE_NAME_LEN
];
for
(
uint64_t
i
=
0
;
i
<
batchPerTblTimes
;
i
++
)
{
getTableName
(
tableName
,
pThreadInfo
,
tableSeq
);
if
(
0
==
strlen
(
tableName
))
{
errorPrint
(
"[%d] %s() LN%d, getTableName return null
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
);
free
(
buffer
);
free
(
pThreadInfo
->
buffer
);
return
NULL
;
}
uint64_t
oldRemainderLen
=
remainderBufLen
;
int64_t
generated
=
generateInterlaceDataBuffer
(
tableName
,
batchPerTbl
,
i
,
batchPerTblTimes
,
tableSeq
,
pThreadInfo
,
pstr
,
insertRows
,
startTime
,
&
remainderBufLen
);
debugPrint
(
"[%d] %s() LN%d, generated records is %"
PRId64
"
\n
"
,
int32_t
generated
;
if
(
superTblInfo
)
{
if
(
superTblInfo
->
iface
==
STMT_IFACE
)
{
#if STMT_IFACE_ENABLED == 1
generated
=
prepareStbStmt
(
superTblInfo
,
pThreadInfo
->
stmt
,
tableName
,
batchPerTbl
,
insertRows
,
i
,
startTime
,
pThreadInfo
->
buffer
);
#else
generated
=
-
1
;
#endif
}
else
{
generated
=
generateStbInterlaceData
(
superTblInfo
,
tableName
,
batchPerTbl
,
i
,
batchPerTblTimes
,
tableSeq
,
pThreadInfo
,
pstr
,
insertRows
,
startTime
,
&
remainderBufLen
);
}
}
else
{
if
(
g_args
.
iface
==
STMT_IFACE
)
{
debugPrint
(
"[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
tableName
,
batchPerTbl
,
startTime
);
#if STMT_IFACE_ENABLED == 1
generated
=
prepareStmtWithoutStb
(
pThreadInfo
->
stmt
,
tableName
,
batchPerTbl
,
insertRows
,
i
,
startTime
);
#else
generated
=
-
1
;
#endif
}
else
{
generated
=
generateInterlaceDataWithoutStb
(
tableName
,
batchPerTbl
,
tableSeq
,
pThreadInfo
->
db_name
,
pstr
,
insertRows
,
startTime
,
&
remainderBufLen
);
}
}
debugPrint
(
"[%d] %s() LN%d, generated records is %d
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
generated
);
if
(
generated
<
0
)
{
errorPrint
(
"[%d] %s() LN%d, generated records is %
"
PRId64
"
\n
"
,
errorPrint
(
"[%d] %s() LN%d, generated records is %
d
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
generated
);
goto
free_of_interlace
;
}
else
if
(
generated
==
0
)
{
...
...
@@ -5298,15 +5837,15 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
tableSeq
++
;
recOfBatch
+=
batchPerTbl
;
pstr
+=
(
oldRemainderLen
-
remainderBufLen
);
// startTime += batchPerTbl * superTblInfo->timeStampStep;
pThreadInfo
->
totalInsertRows
+=
batchPerTbl
;
verbosePrint
(
"[%d] %s() LN%d batchPerTbl=%"
PRId64
" recOfBatch=%"
PRId64
"
\n
"
,
verbosePrint
(
"[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
batchPerTbl
,
recOfBatch
);
if
(
insertMode
==
INTERLACE_INSERT_MODE
)
{
if
(
tableSeq
==
pThreadInfo
->
start_table_from
+
pThreadInfo
->
ntables
)
{
if
(
tableSeq
==
pThreadInfo
->
start_table_from
+
pThreadInfo
->
ntables
)
{
// turn to first table
tableSeq
=
pThreadInfo
->
start_table_from
;
generatedRecPerTbl
+=
batchPerTbl
;
...
...
@@ -5318,13 +5857,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
if
(
generatedRecPerTbl
>=
insertRows
)
break
;
int
remainRows
=
insertRows
-
generatedRecPerTbl
;
int
64_t
remainRows
=
insertRows
-
generatedRecPerTbl
;
if
((
remainRows
>
0
)
&&
(
batchPerTbl
>
remainRows
))
batchPerTbl
=
remainRows
;
if
(
pThreadInfo
->
ntables
*
batchPerTbl
<
g_args
.
num_of_RPR
)
break
;
}
}
verbosePrint
(
"[%d] %s() LN%d generatedRecPerTbl=%"
PRId64
" insertRows=%"
PRId64
"
\n
"
,
...
...
@@ -5335,22 +5873,22 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
break
;
}
verbosePrint
(
"[%d] %s() LN%d recOfBatch=%
"
PRIu64
"
totalInsertRows=%"
PRIu64
"
\n
"
,
verbosePrint
(
"[%d] %s() LN%d recOfBatch=%
d
totalInsertRows=%"
PRIu64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
recOfBatch
,
pThreadInfo
->
totalInsertRows
);
verbosePrint
(
"[%d] %s() LN%d, buffer=%s
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
buffer
);
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
pThreadInfo
->
buffer
);
startTs
=
taosGetTimestampMs
();
if
(
recOfBatch
==
0
)
{
errorPrint
(
"[%d] %s() LN%d try inserting records of batch is %
"
PRIu64
"
\n
"
,
errorPrint
(
"[%d] %s() LN%d try inserting records of batch is %
d
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
recOfBatch
);
errorPrint
(
"%s
\n
"
,
"
\t
Please check if the batch or the buffer length is proper value!
\n
"
);
goto
free_of_interlace
;
}
int64_t
affectedRows
=
execInsert
(
pThreadInfo
,
buffer
,
recOfBatch
);
int64_t
affectedRows
=
execInsert
(
pThreadInfo
,
recOfBatch
);
endTs
=
taosGetTimestampMs
();
uint64_t
delay
=
endTs
-
startTs
;
...
...
@@ -5366,9 +5904,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo
->
totalDelay
+=
delay
;
if
(
recOfBatch
!=
affectedRows
)
{
errorPrint
(
"[%d] %s() LN%d execInsert insert %
"
PRIu64
"
, affected rows: %"
PRId64
"
\n
%s
\n
"
,
errorPrint
(
"[%d] %s() LN%d execInsert insert %
d
, affected rows: %"
PRId64
"
\n
%s
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
recOfBatch
,
affectedRows
,
buffer
);
recOfBatch
,
affectedRows
,
pThreadInfo
->
buffer
);
goto
free_of_interlace
;
}
...
...
@@ -5387,8 +5925,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
et
=
taosGetTimestampMs
();
if
(
insert_interval
>
(
et
-
st
)
)
{
in
t
sleepTime
=
insert_interval
-
(
et
-
st
);
performancePrint
(
"%s() LN%d sleep: %
d
ms for insert interval
\n
"
,
uint64_
t
sleepTime
=
insert_interval
-
(
et
-
st
);
performancePrint
(
"%s() LN%d sleep: %
"
PRId64
"
ms for insert interval
\n
"
,
__func__
,
__LINE__
,
sleepTime
);
taosMsleep
(
sleepTime
);
// ms
sleepTimeTotal
+=
insert_interval
;
...
...
@@ -5397,27 +5935,26 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
}
free_of_interlace:
tmfree
(
buffer
);
tmfree
(
pThreadInfo
->
buffer
);
printStatPerThread
(
pThreadInfo
);
return
NULL
;
}
// sync insertion
/*
1 thread: 100 tables * 2000 rows/s
1 thread: 10 tables * 20000 rows/s
6 thread: 300 tables * 2000 rows/s
2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s
*/
// sync insertion progressive data
static
void
*
syncWriteProgressive
(
threadInfo
*
pThreadInfo
)
{
debugPrint
(
"%s() LN%d: ### progressive write
\n
"
,
__func__
,
__LINE__
);
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
uint64_t
maxSqlLen
=
superTblInfo
?
superTblInfo
->
maxSqlLen
:
g_args
.
max_sql_len
;
int64_t
timeStampStep
=
superTblInfo
?
superTblInfo
->
timeStampStep
:
DEFAULT_TIMESTAMP_STEP
;
int64_t
insertRows
=
(
superTblInfo
)
?
superTblInfo
->
insertRows
:
g_args
.
num_of_DPT
;
verbosePrint
(
"%s() LN%d insertRows=%"
PRId64
"
\n
"
,
__func__
,
__LINE__
,
insertRows
);
char
*
buffer
=
calloc
(
maxSqlLen
,
1
);
if
(
NULL
==
buffer
)
{
pThreadInfo
->
buffer
=
calloc
(
maxSqlLen
,
1
);
if
(
NULL
==
pThreadInfo
->
buffer
)
{
errorPrint
(
"Failed to alloc %"
PRIu64
" Bytes, reason:%s
\n
"
,
maxSqlLen
,
strerror
(
errno
));
...
...
@@ -5428,35 +5965,17 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
uint64_t
startTs
=
taosGetTimestampMs
();
uint64_t
endTs
;
int64_t
timeStampStep
=
superTblInfo
?
superTblInfo
->
timeStampStep
:
DEFAULT_TIMESTAMP_STEP
;
/* int insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
uint64_t st = 0;
uint64_t et = 0xffffffff;
*/
pThreadInfo
->
totalInsertRows
=
0
;
pThreadInfo
->
totalAffectedRows
=
0
;
pThreadInfo
->
samplePos
=
0
;
for
(
uint64_t
tableSeq
=
pThreadInfo
->
start_table_from
;
tableSeq
<=
pThreadInfo
->
end_table_to
;
tableSeq
++
)
{
for
(
uint64_t
tableSeq
=
pThreadInfo
->
start_table_from
;
tableSeq
<=
pThreadInfo
->
end_table_to
;
tableSeq
++
)
{
int64_t
start_time
=
pThreadInfo
->
start_time
;
int64_t
insertRows
=
(
superTblInfo
)
?
superTblInfo
->
insertRows
:
g_args
.
num_of_DPT
;
verbosePrint
(
"%s() LN%d insertRows=%"
PRId64
"
\n
"
,
__func__
,
__LINE__
,
insertRows
);
for
(
uint64_t
i
=
0
;
i
<
insertRows
;)
{
/*
if (insert_interval) {
st = taosGetTimestampMs();
}
*/
char
tableName
[
TSDB_TABLE_NAME_LEN
];
getTableName
(
tableName
,
pThreadInfo
,
tableSeq
);
verbosePrint
(
"%s() LN%d: tid=%d seq=%"
PRId64
" tableName=%s
\n
"
,
...
...
@@ -5464,19 +5983,57 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo
->
threadID
,
tableSeq
,
tableName
);
int64_t
remainderBufLen
=
maxSqlLen
;
char
*
pstr
=
buffer
;
int
nInsertBufLen
=
strlen
(
"insert into "
);
char
*
pstr
=
pThreadInfo
->
buffer
;
int
len
=
snprintf
(
pstr
,
nInsertBufLen
+
1
,
"%s"
,
"insert into "
);
int
len
=
snprintf
(
pstr
,
strlen
(
STR_INSERT_INTO
)
+
1
,
"%s"
,
STR_INSERT_INTO
);
pstr
+=
len
;
remainderBufLen
-=
len
;
int64_t
generated
=
generateProgressiveDataBuffer
(
tableName
,
tableSeq
,
pThreadInfo
,
pstr
,
insertRows
,
i
,
start_time
,
&
(
pThreadInfo
->
samplePos
),
&
remainderBufLen
);
int32_t
generated
;
if
(
superTblInfo
)
{
if
(
superTblInfo
->
iface
==
STMT_IFACE
)
{
#if STMT_IFACE_ENABLED == 1
generated
=
prepareStbStmt
(
superTblInfo
,
pThreadInfo
->
stmt
,
tableName
,
g_args
.
num_of_RPR
,
insertRows
,
i
,
start_time
,
pstr
);
#else
generated
=
-
1
;
#endif
}
else
{
generated
=
generateStbProgressiveData
(
superTblInfo
,
tableName
,
tableSeq
,
pThreadInfo
->
db_name
,
pstr
,
insertRows
,
i
,
start_time
,
&
(
pThreadInfo
->
samplePos
),
&
remainderBufLen
);
}
}
else
{
if
(
g_args
.
iface
==
STMT_IFACE
)
{
#if STMT_IFACE_ENABLED == 1
generated
=
prepareStmtWithoutStb
(
pThreadInfo
->
stmt
,
tableName
,
g_args
.
num_of_RPR
,
insertRows
,
i
,
start_time
);
#else
generated
=
-
1
;
#endif
}
else
{
generated
=
generateProgressiveDataWithoutStb
(
tableName
,
/* tableSeq, */
pThreadInfo
,
pstr
,
insertRows
,
i
,
start_time
,
/* &(pThreadInfo->samplePos), */
&
remainderBufLen
);
}
}
if
(
generated
>
0
)
i
+=
generated
;
else
...
...
@@ -5487,13 +6044,13 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
startTs
=
taosGetTimestampMs
();
int
64_t
affectedRows
=
execInsert
(
pThreadInfo
,
buffer
,
generated
);
int
32_t
affectedRows
=
execInsert
(
pThreadInfo
,
generated
);
endTs
=
taosGetTimestampMs
();
uint64_t
delay
=
endTs
-
startTs
;
performancePrint
(
"%s() LN%d, insert execution time is %"
PRId64
"ms
\n
"
,
__func__
,
__LINE__
,
delay
);
verbosePrint
(
"[%d] %s() LN%d affectedRows=%
"
PRId64
"
\n
"
,
verbosePrint
(
"[%d] %s() LN%d affectedRows=%
d
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
affectedRows
);
...
...
@@ -5503,7 +6060,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo
->
totalDelay
+=
delay
;
if
(
affectedRows
<
0
)
{
errorPrint
(
"%s() LN%d, affected rows: %
"
PRId64
"
\n
"
,
errorPrint
(
"%s() LN%d, affected rows: %
d
\n
"
,
__func__
,
__LINE__
,
affectedRows
);
goto
free_of_progressive
;
}
...
...
@@ -5521,32 +6078,19 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
if
(
i
>=
insertRows
)
break
;
/*
if (insert_interval) {
et = taosGetTimestampMs();
if (insert_interval > ((et - st)) ) {
int sleep_time = insert_interval - (et -st);
performancePrint("%s() LN%d sleep: %d ms for insert interval\n",
__func__, __LINE__, sleep_time);
taosMsleep(sleep_time); // ms
}
}
*/
}
// num_of_DPT
if
(
g_args
.
verbose_print
)
{
if
((
tableSeq
==
pThreadInfo
->
ntables
-
1
)
&&
superTblInfo
&&
if
(
(
g_args
.
verbose_print
)
&&
(
tableSeq
==
pThreadInfo
->
ntables
-
1
)
&&
(
superTblInfo
)
&&
(
0
==
strncasecmp
(
superTblInfo
->
dataSource
,
"sample"
,
strlen
(
"sample"
))))
{
verbosePrint
(
"%s() LN%d samplePos=%"
PRId64
"
\n
"
,
__func__
,
__LINE__
,
pThreadInfo
->
samplePos
);
}
}
}
// tableSeq
free_of_progressive:
tmfree
(
buffer
);
tmfree
(
pThreadInfo
->
buffer
);
printStatPerThread
(
pThreadInfo
);
return
NULL
;
}
...
...
@@ -5556,7 +6100,7 @@ static void* syncWrite(void *sarg) {
threadInfo
*
pThreadInfo
=
(
threadInfo
*
)
sarg
;
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
in
t
interlaceRows
;
uint32_
t
interlaceRows
;
if
(
superTblInfo
)
{
if
((
superTblInfo
->
interlaceRows
==
0
)
...
...
@@ -5576,7 +6120,6 @@ static void* syncWrite(void *sarg) {
// progressive mode
return
syncWriteProgressive
(
pThreadInfo
);
}
}
static
void
callBack
(
void
*
param
,
TAOS_RES
*
res
,
int
code
)
{
...
...
@@ -5616,9 +6159,10 @@ static void callBack(void *param, TAOS_RES *res, int code) {
&&
rand_num
<
pThreadInfo
->
superTblInfo
->
disorderRatio
)
{
int64_t
d
=
pThreadInfo
->
lastTs
-
(
taosRandom
()
%
pThreadInfo
->
superTblInfo
->
disorderRange
+
1
);
generate
RowData
(
data
,
d
,
pThreadInfo
->
superTblInfo
);
generate
StbRowData
(
pThreadInfo
->
superTblInfo
,
data
,
d
);
}
else
{
generateRowData
(
data
,
pThreadInfo
->
lastTs
+=
1000
,
pThreadInfo
->
superTblInfo
);
generateStbRowData
(
pThreadInfo
->
superTblInfo
,
data
,
pThreadInfo
->
lastTs
+=
1000
);
}
pstr
+=
sprintf
(
pstr
,
"%s"
,
data
);
pThreadInfo
->
counter
++
;
...
...
@@ -5686,24 +6230,6 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *
static
void
startMultiThreadInsertData
(
int
threads
,
char
*
db_name
,
char
*
precision
,
SSuperTable
*
superTblInfo
)
{
pthread_t
*
pids
=
malloc
(
threads
*
sizeof
(
pthread_t
));
assert
(
pids
!=
NULL
);
threadInfo
*
infos
=
malloc
(
threads
*
sizeof
(
threadInfo
));
assert
(
infos
!=
NULL
);
memset
(
pids
,
0
,
threads
*
sizeof
(
pthread_t
));
memset
(
infos
,
0
,
threads
*
sizeof
(
threadInfo
));
//TAOS* taos;
//if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
// taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
// if (NULL == taos) {
// printf("connect to server fail, reason: %s\n", taos_errstr(NULL));
// exit(-1);
// }
//}
int32_t
timePrec
=
TSDB_TIME_PRECISION_MILLI
;
if
(
0
!=
precision
[
0
])
{
if
(
0
==
strncasecmp
(
precision
,
"ms"
,
2
))
{
...
...
@@ -5755,17 +6281,17 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
}
TAOS
*
taos
=
taos_connect
(
TAOS
*
taos
0
=
taos_connect
(
g_Dbs
.
host
,
g_Dbs
.
user
,
g_Dbs
.
password
,
db_name
,
g_Dbs
.
port
);
if
(
NULL
==
taos
)
{
if
(
NULL
==
taos
0
)
{
errorPrint
(
"%s() LN%d, connect to server fail , reason: %s
\n
"
,
__func__
,
__LINE__
,
taos_errstr
(
NULL
));
exit
(
-
1
);
}
int64_t
ntables
=
0
;
uint64_t
start
From
;
uint64_t
table
From
;
if
(
superTblInfo
)
{
int64_t
limit
;
...
...
@@ -5792,7 +6318,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
ntables
=
limit
;
start
From
=
offset
;
table
From
=
offset
;
if
((
superTblInfo
->
childTblExists
!=
TBL_NO_EXISTS
)
&&
((
superTblInfo
->
childTblOffset
+
superTblInfo
->
childTblLimit
)
...
...
@@ -5811,23 +6337,23 @@ static void startMultiThreadInsertData(int threads, char* db_name,
limit
*
TSDB_TABLE_NAME_LEN
);
if
(
superTblInfo
->
childTblName
==
NULL
)
{
errorPrint
(
"%s() LN%d, alloc memory failed!
\n
"
,
__func__
,
__LINE__
);
taos_close
(
taos
);
taos_close
(
taos
0
);
exit
(
-
1
);
}
int64_t
childTblCount
;
getChildNameOfSuperTableWithLimitAndOffset
(
taos
,
taos
0
,
db_name
,
superTblInfo
->
sTblName
,
&
superTblInfo
->
childTblName
,
&
childTblCount
,
limit
,
offset
);
}
else
{
ntables
=
g_args
.
num_of_tables
;
start
From
=
0
;
table
From
=
0
;
}
taos_close
(
taos
);
taos_close
(
taos
0
);
int64_t
a
=
ntables
/
threads
;
if
(
a
<
1
)
{
...
...
@@ -5841,11 +6367,22 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
if
((
superTblInfo
)
&&
(
0
==
strncasecmp
(
superTblInfo
->
insertMode
,
"rest"
,
strlen
(
"rest"
))))
{
if
(
convertHostToServAddr
(
g_Dbs
.
host
,
g_Dbs
.
port
,
&
(
g_Dbs
.
serv_addr
))
!=
0
)
exit
(
-
1
);
&&
(
superTblInfo
->
iface
==
REST_IFACE
))
{
if
(
convertHostToServAddr
(
g_Dbs
.
host
,
g_Dbs
.
port
,
&
(
g_Dbs
.
serv_addr
))
!=
0
)
{
exit
(
-
1
);
}
}
pthread_t
*
pids
=
malloc
(
threads
*
sizeof
(
pthread_t
));
assert
(
pids
!=
NULL
);
threadInfo
*
infos
=
calloc
(
1
,
threads
*
sizeof
(
threadInfo
));
assert
(
infos
!=
NULL
);
memset
(
pids
,
0
,
threads
*
sizeof
(
pthread_t
));
memset
(
infos
,
0
,
threads
*
sizeof
(
threadInfo
));
for
(
int
i
=
0
;
i
<
threads
;
i
++
)
{
threadInfo
*
pThreadInfo
=
infos
+
i
;
pThreadInfo
->
threadID
=
i
;
...
...
@@ -5857,17 +6394,59 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pThreadInfo
->
minDelay
=
UINT64_MAX
;
if
((
NULL
==
superTblInfo
)
||
(
0
==
strncasecmp
(
superTblInfo
->
insertMode
,
"taosc"
,
5
)
))
{
//
pThreadI
nfo->taos = taos;
(
superTblInfo
->
iface
!=
REST_IFACE
))
{
//
t_i
nfo->taos = taos;
pThreadInfo
->
taos
=
taos_connect
(
g_Dbs
.
host
,
g_Dbs
.
user
,
g_Dbs
.
password
,
db_name
,
g_Dbs
.
port
);
if
(
NULL
==
pThreadInfo
->
taos
)
{
errorPrint
(
"connect to server fail from insert sub thread, reason: %s
\n
"
,
"%s() LN%d, connect to server fail from insert sub thread, reason: %s
\n
"
,
__func__
,
__LINE__
,
taos_errstr
(
NULL
));
free
(
infos
);
exit
(
-
1
);
}
if
((
g_args
.
iface
==
STMT_IFACE
)
||
((
superTblInfo
)
&&
(
superTblInfo
->
iface
==
STMT_IFACE
)))
{
int
columnCount
;
if
(
superTblInfo
)
{
columnCount
=
superTblInfo
->
columnCount
;
}
else
{
columnCount
=
g_args
.
num_of_CPR
;
}
pThreadInfo
->
stmt
=
taos_stmt_init
(
pThreadInfo
->
taos
);
if
(
NULL
==
pThreadInfo
->
stmt
)
{
errorPrint
(
"%s() LN%d, failed init stmt, reason: %s
\n
"
,
__func__
,
__LINE__
,
taos_errstr
(
NULL
));
free
(
pids
);
free
(
infos
);
exit
(
-
1
);
}
char
buffer
[
3000
];
char
*
pstr
=
buffer
;
pstr
+=
sprintf
(
pstr
,
"INSERT INTO ? values(?"
);
for
(
int
col
=
0
;
col
<
columnCount
;
col
++
)
{
pstr
+=
sprintf
(
pstr
,
",?"
);
}
pstr
+=
sprintf
(
pstr
,
")"
);
int
ret
=
taos_stmt_prepare
(
pThreadInfo
->
stmt
,
buffer
,
0
);
if
(
ret
!=
0
){
errorPrint
(
"failed to execute taos_stmt_prepare. return 0x%x. reason: %s
\n
"
,
ret
,
taos_errstr
(
NULL
));
free
(
pids
);
free
(
infos
);
exit
(
-
1
);
}
}
}
else
{
pThreadInfo
->
taos
=
NULL
;
}
...
...
@@ -5875,10 +6454,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
/* if ((NULL == superTblInfo)
|| (0 == superTblInfo->multiThreadWriteOneTbl)) {
*/
pThreadInfo
->
start_table_from
=
start
From
;
pThreadInfo
->
start_table_from
=
table
From
;
pThreadInfo
->
ntables
=
i
<
b
?
a
+
1
:
a
;
pThreadInfo
->
end_table_to
=
i
<
b
?
startFrom
+
a
:
start
From
+
a
-
1
;
start
From
=
pThreadInfo
->
end_table_to
+
1
;
pThreadInfo
->
end_table_to
=
i
<
b
?
tableFrom
+
a
:
table
From
+
a
-
1
;
table
From
=
pThreadInfo
->
end_table_to
+
1
;
/* } else {
pThreadInfo->start_table_from = 0;
pThreadInfo->ntables = superTblInfo->childTblCount;
...
...
@@ -5906,6 +6485,11 @@ static void startMultiThreadInsertData(int threads, char* db_name,
for
(
int
i
=
0
;
i
<
threads
;
i
++
)
{
threadInfo
*
pThreadInfo
=
infos
+
i
;
tsem_destroy
(
&
(
pThreadInfo
->
lock_sem
));
if
(
pThreadInfo
->
stmt
)
{
taos_stmt_close
(
pThreadInfo
->
stmt
);
}
tsem_destroy
(
&
(
pThreadInfo
->
lock_sem
));
taos_close
(
pThreadInfo
->
taos
);
...
...
@@ -6182,13 +6766,12 @@ static int insertTestProcess() {
}
}
taosMsleep
(
1000
);
// create sub threads for inserting data
//start = taosGetTimestampMs();
for
(
int
i
=
0
;
i
<
g_Dbs
.
dbCount
;
i
++
)
{
if
(
g_Dbs
.
use_metric
)
{
if
(
g_Dbs
.
db
[
i
].
superTblCount
>
0
)
{
for
(
in
t
j
=
0
;
j
<
g_Dbs
.
db
[
i
].
superTblCount
;
j
++
)
{
for
(
uint64_
t
j
=
0
;
j
<
g_Dbs
.
db
[
i
].
superTblCount
;
j
++
)
{
SSuperTable
*
superTblInfo
=
&
g_Dbs
.
db
[
i
].
superTbls
[
j
];
...
...
@@ -6512,15 +7095,15 @@ static int queryTestProcess() {
b
=
ntables
%
threads
;
}
uint64_t
start
From
=
0
;
uint64_t
table
From
=
0
;
for
(
int
i
=
0
;
i
<
threads
;
i
++
)
{
threadInfo
*
pThreadInfo
=
infosOfSub
+
i
;
pThreadInfo
->
threadID
=
i
;
pThreadInfo
->
start_table_from
=
start
From
;
pThreadInfo
->
start_table_from
=
table
From
;
pThreadInfo
->
ntables
=
i
<
b
?
a
+
1
:
a
;
pThreadInfo
->
end_table_to
=
i
<
b
?
startFrom
+
a
:
start
From
+
a
-
1
;
start
From
=
pThreadInfo
->
end_table_to
+
1
;
pThreadInfo
->
end_table_to
=
i
<
b
?
tableFrom
+
a
:
table
From
+
a
-
1
;
table
From
=
pThreadInfo
->
end_table_to
+
1
;
pThreadInfo
->
taos
=
NULL
;
// TODO: workaround to use separate taos connection;
pthread_create
(
pidsOfSub
+
i
,
NULL
,
superTableQuery
,
pThreadInfo
);
}
...
...
@@ -6826,11 +7409,14 @@ static void *specifiedSubscribe(void *sarg) {
g_queryInfo
.
specifiedQueryInfo
.
res
[
pThreadInfo
->
threadID
]
=
taos_consume
(
g_queryInfo
.
specifiedQueryInfo
.
tsub
[
pThreadInfo
->
threadID
]);
if
(
g_queryInfo
.
specifiedQueryInfo
.
res
[
pThreadInfo
->
threadID
])
{
if
(
g_queryInfo
.
specifiedQueryInfo
.
result
[
pThreadInfo
->
querySeq
][
0
]
!=
0
)
{
if
(
g_queryInfo
.
specifiedQueryInfo
.
result
[
pThreadInfo
->
querySeq
][
0
]
!=
0
)
{
sprintf
(
pThreadInfo
->
filePath
,
"%s-%d"
,
g_queryInfo
.
specifiedQueryInfo
.
result
[
pThreadInfo
->
querySeq
],
pThreadInfo
->
threadID
);
fetchResult
(
g_queryInfo
.
specifiedQueryInfo
.
res
[
pThreadInfo
->
threadID
],
pThreadInfo
);
fetchResult
(
g_queryInfo
.
specifiedQueryInfo
.
res
[
pThreadInfo
->
threadID
],
pThreadInfo
);
}
g_queryInfo
.
specifiedQueryInfo
.
consumed
[
pThreadInfo
->
threadID
]
++
;
...
...
@@ -6843,16 +7429,17 @@ static void *specifiedSubscribe(void *sarg) {
g_queryInfo
.
specifiedQueryInfo
.
consumed
[
pThreadInfo
->
threadID
]
=
0
;
taos_unsubscribe
(
g_queryInfo
.
specifiedQueryInfo
.
tsub
[
pThreadInfo
->
threadID
],
g_queryInfo
.
specifiedQueryInfo
.
subscribeKeepProgress
);
g_queryInfo
.
specifiedQueryInfo
.
tsub
[
pThreadInfo
->
threadID
]
=
subscribeImpl
(
SPECIFIED_CLASS
,
pThreadInfo
,
g_queryInfo
.
specifiedQueryInfo
.
sql
[
pThreadInfo
->
querySeq
],
g_queryInfo
.
specifiedQueryInfo
.
topic
[
pThreadInfo
->
threadID
],
g_queryInfo
.
specifiedQueryInfo
.
subscribeRestart
,
g_queryInfo
.
specifiedQueryInfo
.
subscribeInterval
);
g_queryInfo
.
specifiedQueryInfo
.
tsub
[
pThreadInfo
->
threadID
]
=
subscribeImpl
(
SPECIFIED_CLASS
,
pThreadInfo
,
g_queryInfo
.
specifiedQueryInfo
.
sql
[
pThreadInfo
->
querySeq
],
g_queryInfo
.
specifiedQueryInfo
.
topic
[
pThreadInfo
->
threadID
],
g_queryInfo
.
specifiedQueryInfo
.
subscribeRestart
,
g_queryInfo
.
specifiedQueryInfo
.
subscribeInterval
);
if
(
NULL
==
g_queryInfo
.
specifiedQueryInfo
.
tsub
[
pThreadInfo
->
threadID
])
{
taos_close
(
pThreadInfo
->
taos
);
return
NULL
;
taos_close
(
pThreadInfo
->
taos
);
return
NULL
;
}
}
}
...
...
@@ -6975,17 +7562,17 @@ static int subscribeTestProcess() {
}
for
(
uint64_t
i
=
0
;
i
<
g_queryInfo
.
superQueryInfo
.
sqlCount
;
i
++
)
{
uint64_t
start
From
=
0
;
uint64_t
table
From
=
0
;
for
(
int
j
=
0
;
j
<
threads
;
j
++
)
{
uint64_t
seq
=
i
*
threads
+
j
;
threadInfo
*
pThreadInfo
=
infosOfStable
+
seq
;
pThreadInfo
->
threadID
=
seq
;
pThreadInfo
->
querySeq
=
i
;
pThreadInfo
->
start_table_from
=
start
From
;
pThreadInfo
->
start_table_from
=
table
From
;
pThreadInfo
->
ntables
=
j
<
b
?
a
+
1
:
a
;
pThreadInfo
->
end_table_to
=
j
<
b
?
startFrom
+
a
:
start
From
+
a
-
1
;
start
From
=
pThreadInfo
->
end_table_to
+
1
;
pThreadInfo
->
end_table_to
=
j
<
b
?
tableFrom
+
a
:
table
From
+
a
-
1
;
table
From
=
pThreadInfo
->
end_table_to
+
1
;
pThreadInfo
->
taos
=
NULL
;
// TODO: workaround to use separate taos connection;
pthread_create
(
pidsOfStable
+
seq
,
NULL
,
superSubscribe
,
pThreadInfo
);
...
...
@@ -7065,7 +7652,7 @@ static void setParaFromArg(){
g_Dbs
.
threadCountByCreateTbl
=
g_args
.
num_of_threads
;
g_Dbs
.
dbCount
=
1
;
g_Dbs
.
db
[
0
].
drop
=
1
;
g_Dbs
.
db
[
0
].
drop
=
true
;
tstrncpy
(
g_Dbs
.
db
[
0
].
dbName
,
g_args
.
database
,
MAX_DB_NAME_SIZE
);
g_Dbs
.
db
[
0
].
dbCfg
.
replica
=
g_args
.
replica
;
...
...
@@ -7104,7 +7691,7 @@ static void setParaFromArg(){
tstrncpy
(
g_Dbs
.
db
[
0
].
superTbls
[
0
].
childTblPrefix
,
g_args
.
tb_prefix
,
MAX_TB_NAME_SIZE
);
tstrncpy
(
g_Dbs
.
db
[
0
].
superTbls
[
0
].
dataSource
,
"rand"
,
MAX_TB_NAME_SIZE
);
tstrncpy
(
g_Dbs
.
db
[
0
].
superTbls
[
0
].
insertMode
,
"taosc"
,
MAX_TB_NAME_SIZE
)
;
g_Dbs
.
db
[
0
].
superTbls
[
0
].
iface
=
g_args
.
iface
;
tstrncpy
(
g_Dbs
.
db
[
0
].
superTbls
[
0
].
startTimestamp
,
"2017-07-14 10:40:00.000"
,
MAX_TB_NAME_SIZE
);
g_Dbs
.
db
[
0
].
superTbls
[
0
].
timeStampStep
=
DEFAULT_TIMESTAMP_STEP
;
...
...
src/os/src/detail/osTime.c
浏览文件 @
58c6a901
...
...
@@ -87,12 +87,12 @@ static int32_t (*parseLocaltimeFp[]) (char* timestr, int64_t* time, int32_t time
int32_t
taosGetTimestampSec
()
{
return
(
int32_t
)
time
(
NULL
);
}
int32_t
taosParseTime
(
char
*
timestr
,
int64_t
*
time
,
int32_t
len
,
int32_t
timePrec
,
int8_t
daylight
)
{
int32_t
taosParseTime
(
char
*
timestr
,
int64_t
*
time
,
int32_t
len
,
int32_t
timePrec
,
int8_t
day
_
light
)
{
/* parse datatime string in with tz */
if
(
strnchr
(
timestr
,
'T'
,
len
,
false
)
!=
NULL
)
{
return
parseTimeWithTz
(
timestr
,
time
,
timePrec
);
}
else
{
return
(
*
parseLocaltimeFp
[
daylight
])(
timestr
,
time
,
timePrec
);
return
(
*
parseLocaltimeFp
[
day
_
light
])(
timestr
,
time
,
timePrec
);
}
}
...
...
src/vnode/src/vnodeMgmt.c
浏览文件 @
58c6a901
...
...
@@ -91,18 +91,18 @@ static void vnodeIncRef(void *ptNode) {
}
void
*
vnodeAcquire
(
int32_t
vgId
)
{
SVnodeObj
*
*
p
pVnode
=
NULL
;
SVnodeObj
*
pVnode
=
NULL
;
if
(
tsVnodesHash
!=
NULL
)
{
ppVnode
=
taosHashGetClone
(
tsVnodesHash
,
&
vgId
,
sizeof
(
int32_t
),
vnodeIncRef
,
NULL
,
sizeof
(
void
*
));
taosHashGetClone
(
tsVnodesHash
,
&
vgId
,
sizeof
(
int32_t
),
vnodeIncRef
,
&
pVnode
,
sizeof
(
void
*
));
}
if
(
p
pVnode
==
NULL
||
*
pp
Vnode
==
NULL
)
{
if
(
pVnode
==
NULL
)
{
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
vDebug
(
"vgId:%d, not exist"
,
vgId
);
return
NULL
;
}
return
*
p
pVnode
;
return
pVnode
;
}
void
vnodeRelease
(
void
*
vparam
)
{
...
...
tests/pytest/fulltest.sh
浏览文件 @
58c6a901
...
...
@@ -332,5 +332,5 @@ python3 ./test.py -f tag_lite/alter_tag.py
python3 test.py
-f
tools/taosdemoAllTest/taosdemoTestInsertWithJson.py
python3 test.py
-f
tools/taosdemoAllTest/taosdemoTestQueryWithJson.py
python3 ./test.py
-f
tag_lite/drop_auto_create.py
#======================p4-end===============
tests/pytest/tag_lite/drop_auto_create.py
0 → 100644
浏览文件 @
58c6a901
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import
sys
from
util.log
import
*
from
util.cases
import
*
from
util.sql
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
(),
logSql
)
def
run
(
self
):
tdSql
.
prepare
()
tdSql
.
execute
(
'create table m1(ts timestamp, k int) tags(a binary(12), b int, c double);'
)
tdSql
.
execute
(
'insert into tm0 using m1(b,c) tags(1, 99) values(now, 1);'
)
tdSql
.
execute
(
'insert into tm1 using m1(b,c) tags(2, 100) values(now, 2);'
)
tdLog
.
info
(
"2 rows inserted"
)
tdSql
.
query
(
'select * from m1;'
)
tdSql
.
checkRows
(
2
)
tdSql
.
query
(
'select *,tbname from m1;'
)
tdSql
.
execute
(
"drop table tm0; "
)
tdSql
.
query
(
'select * from m1'
)
tdSql
.
checkRows
(
1
)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录