Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
763f7126
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
763f7126
编写于
5月 07, 2021
作者:
Y
yihaoDeng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'ms'
上级
2242be05
c4bc5649
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
235 addition
and
217 deletion
+235
-217
.drone.yml
.drone.yml
+7
-15
src/kit/taosdemo/taosdemo.c
src/kit/taosdemo/taosdemo.c
+107
-86
src/kit/taosdump/taosdump.c
src/kit/taosdump/taosdump.c
+117
-113
src/plugins/http/inc/httpInt.h
src/plugins/http/inc/httpInt.h
+1
-1
src/util/src/tnettest.c
src/util/src/tnettest.c
+3
-2
未找到文件。
.drone.yml
浏览文件 @
763f7126
...
...
@@ -7,27 +7,19 @@ platform:
arch
:
amd64
steps
:
-
name
:
build
image
:
gcc
commands
:
-
apt-get update
-
apt-get install -y cmake build-essential
-
mkdir debug
-
cd debug
-
cmake ..
-
make
when
:
branch
:
-
develop
-
master
-
name
:
smoke_test
image
:
python:3.8
commands
:
-
apt-get update
-
apt-get install -y cmake build-essential gcc
-
pip3 install psutil
-
pip3 install guppy3
-
pip3 install src/connector/python/linux/python3/
-
cd tests
-
mkdir debug
-
cd debug
-
cmake ..
-
make
-
cd ../tests
-
./test-all.sh smoke
when
:
branch
:
...
...
src/kit/taosdemo/taosdemo.c
浏览文件 @
763f7126
...
...
@@ -4672,7 +4672,7 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
return
0
;
}
static
int64_t
execInsert
(
threadInfo
*
pThreadInfo
,
char
*
buffer
,
in
t
k
)
static
int64_t
execInsert
(
threadInfo
*
pThreadInfo
,
char
*
buffer
,
uint64_
t
k
)
{
int
affectedRows
;
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
...
...
@@ -4744,7 +4744,7 @@ static int64_t generateDataTail(
verbosePrint
(
"%s() LN%d batch=%"
PRIu64
"
\n
"
,
__func__
,
__LINE__
,
batch
);
u
int64_t
k
=
0
;
int64_t
k
=
0
;
for
(
k
=
0
;
k
<
batch
;)
{
char
data
[
MAX_DATA_SIZE
];
memset
(
data
,
0
,
MAX_DATA_SIZE
);
...
...
@@ -4959,7 +4959,7 @@ static int64_t generateInterlaceDataBuffer(
return
k
;
}
static
int
generateProgressiveDataBuffer
(
static
int
64_t
generateProgressiveDataBuffer
(
char
*
tableName
,
int64_t
tableSeq
,
threadInfo
*
pThreadInfo
,
char
*
buffer
,
...
...
@@ -5004,12 +5004,21 @@ static int generateProgressiveDataBuffer(
return
k
;
}
static
void
printStatPerThread
(
threadInfo
*
pThreadInfo
)
{
fprintf
(
stderr
,
"====thread[%d] completed total inserted rows: %"
PRIu64
", total affected rows: %"
PRIu64
". %.2f records/second====
\n
"
,
pThreadInfo
->
threadID
,
pThreadInfo
->
totalInsertRows
,
pThreadInfo
->
totalAffectedRows
,
(
double
)(
pThreadInfo
->
totalAffectedRows
/
(
pThreadInfo
->
totalDelay
/
1000
.
0
)));
}
static
void
*
syncWriteInterlace
(
threadInfo
*
pThreadInfo
)
{
debugPrint
(
"[%d] %s() LN%d: ### interlace write
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
);
int64_t
insertRows
;
int64_t
interlaceRows
;
u
int64_t
insertRows
;
u
int64_t
interlaceRows
;
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
...
...
@@ -5078,9 +5087,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
assert
(
pThreadInfo
->
ntables
>
0
);
int64_t
batchPerTbl
=
interlaceRows
;
uint64_t
batchPerTbl
=
interlaceRows
;
uint64_t
batchPerTblTimes
;
int64_t
batchPerTblTimes
;
if
((
interlaceRows
>
0
)
&&
(
pThreadInfo
->
ntables
>
1
))
{
batchPerTblTimes
=
g_args
.
num_of_RPR
/
interlaceRows
;
...
...
@@ -5088,9 +5097,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
batchPerTblTimes
=
1
;
}
int64_t
generatedRecPerTbl
=
0
;
u
int64_t
generatedRecPerTbl
=
0
;
bool
flagSleep
=
true
;
int64_t
sleepTimeTotal
=
0
;
u
int64_t
sleepTimeTotal
=
0
;
char
*
strInsertInto
=
"insert into "
;
int
nInsertBufLen
=
strlen
(
strInsertInto
);
...
...
@@ -5110,9 +5119,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pstr
+=
len
;
remainderBufLen
-=
len
;
int64_t
recOfBatch
=
0
;
u
int64_t
recOfBatch
=
0
;
for
(
int64_t
i
=
0
;
i
<
batchPerTblTimes
;
i
++
)
{
for
(
u
int64_t
i
=
0
;
i
<
batchPerTblTimes
;
i
++
)
{
getTableName
(
tableName
,
pThreadInfo
,
tableSeq
);
if
(
0
==
strlen
(
tableName
))
{
errorPrint
(
"[%d] %s() LN%d, getTableName return null
\n
"
,
...
...
@@ -5130,10 +5139,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
startTime
,
&
remainderBufLen
);
debugPrint
(
"[%d] %s() LN%d, generated records is %"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
generated
);
if
(
generated
<
0
)
{
debugPrint
(
"[%d] %s() LN%d, generated data
is %"
PRId64
"
\n
"
,
errorPrint
(
"[%d] %s() LN%d, generated records
is %"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
generated
);
goto
free_
and_statistics
_interlace
;
goto
free_
of
_interlace
;
}
else
if
(
generated
==
0
)
{
break
;
}
...
...
@@ -5177,7 +5188,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
break
;
}
verbosePrint
(
"[%d] %s() LN%d recOfBatch=%"
PRI
d64
" totalInsertRows=%"
PRId
64
"
\n
"
,
verbosePrint
(
"[%d] %s() LN%d recOfBatch=%"
PRI
u64
" totalInsertRows=%"
PRIu
64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
recOfBatch
,
pThreadInfo
->
totalInsertRows
);
verbosePrint
(
"[%d] %s() LN%d, buffer=%s
\n
"
,
...
...
@@ -5188,30 +5199,30 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int64_t
affectedRows
=
execInsert
(
pThreadInfo
,
buffer
,
recOfBatch
);
endTs
=
taosGetTimestampMs
();
int64_t
delay
=
endTs
-
startTs
;
performancePrint
(
"%s() LN%d, insert execution time is %"
PRI
d
64
"ms
\n
"
,
u
int64_t
delay
=
endTs
-
startTs
;
performancePrint
(
"%s() LN%d, insert execution time is %"
PRI
u
64
"ms
\n
"
,
__func__
,
__LINE__
,
delay
);
verbosePrint
(
"[%d] %s() LN%d affectedRows=%"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
affectedRows
);
if
(
delay
>
pThreadInfo
->
maxDelay
)
pThreadInfo
->
maxDelay
=
delay
;
if
(
delay
<
pThreadInfo
->
minDelay
)
pThreadInfo
->
minDelay
=
delay
;
pThreadInfo
->
cntDelay
++
;
pThreadInfo
->
totalDelay
+=
delay
;
verbosePrint
(
"[%d] %s() LN%d affectedRows=%"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
affectedRows
);
if
((
affectedRows
<
0
)
||
(
recOfBatch
!=
affectedRows
))
{
errorPrint
(
"[%d] %s() LN%d execInsert insert %"
PRId64
", affected rows: %"
PRId64
"
\n
%s
\n
"
,
if
(
recOfBatch
!=
affectedRows
)
{
errorPrint
(
"[%d] %s() LN%d execInsert insert %"
PRIu64
", affected rows: %"
PRId64
"
\n
%s
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
recOfBatch
,
affectedRows
,
buffer
);
goto
free_
and_statistics
_interlace
;
goto
free_
of
_interlace
;
}
pThreadInfo
->
totalAffectedRows
+=
affectedRows
;
int64_t
currentPrintTime
=
taosGetTimestampMs
();
if
(
currentPrintTime
-
lastPrintTime
>
30
*
1000
)
{
printf
(
"thread[%d] has currently inserted rows: %"
PRI
d64
", affected rows: %"
PRId
64
"
\n
"
,
printf
(
"thread[%d] has currently inserted rows: %"
PRI
u64
", affected rows: %"
PRIu
64
"
\n
"
,
pThreadInfo
->
threadID
,
pThreadInfo
->
totalInsertRows
,
pThreadInfo
->
totalAffectedRows
);
...
...
@@ -5231,13 +5242,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
}
}
free_
and_statistics
_interlace:
free_
of
_interlace:
tmfree
(
buffer
);
printf
(
"====thread[%d] completed total inserted rows: %"
PRId64
", total affected rows: %"
PRId64
"====
\n
"
,
pThreadInfo
->
threadID
,
pThreadInfo
->
totalInsertRows
,
pThreadInfo
->
totalAffectedRows
);
printStatPerThread
(
pThreadInfo
);
return
NULL
;
}
...
...
@@ -5253,19 +5260,19 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint
(
"%s() LN%d: ### progressive write
\n
"
,
__func__
,
__LINE__
);
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
in
t
maxSqlLen
=
superTblInfo
?
superTblInfo
->
maxSqlLen
:
g_args
.
max_sql_len
;
uint64_
t
maxSqlLen
=
superTblInfo
?
superTblInfo
->
maxSqlLen
:
g_args
.
max_sql_len
;
char
*
buffer
=
calloc
(
maxSqlLen
,
1
);
if
(
NULL
==
buffer
)
{
errorPrint
(
"Failed to alloc %
d
Bytes, reason:%s
\n
"
,
errorPrint
(
"Failed to alloc %
"
PRIu64
"
Bytes, reason:%s
\n
"
,
maxSqlLen
,
strerror
(
errno
));
return
NULL
;
}
int64_t
lastPrintTime
=
taosGetTimestampMs
();
int64_t
startTs
=
taosGetTimestampMs
();
int64_t
endTs
;
u
int64_t
lastPrintTime
=
taosGetTimestampMs
();
u
int64_t
startTs
=
taosGetTimestampMs
();
u
int64_t
endTs
;
int64_t
timeStampStep
=
superTblInfo
?
superTblInfo
->
timeStampStep
:
DEFAULT_TIMESTAMP_STEP
;
...
...
@@ -5280,15 +5287,15 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo
->
samplePos
=
0
;
for
(
int64_t
tableSeq
=
for
(
u
int64_t
tableSeq
=
pThreadInfo
->
start_table_from
;
tableSeq
<=
pThreadInfo
->
end_table_to
;
tableSeq
++
)
{
int64_t
start_time
=
pThreadInfo
->
start_time
;
int64_t
insertRows
=
(
superTblInfo
)
?
superTblInfo
->
insertRows
:
g_args
.
num_of_DPT
;
u
int64_t
insertRows
=
(
superTblInfo
)
?
superTblInfo
->
insertRows
:
g_args
.
num_of_DPT
;
verbosePrint
(
"%s() LN%d insertRows=%"
PRId64
"
\n
"
,
__func__
,
__LINE__
,
insertRows
);
for
(
int64_t
i
=
0
;
i
<
insertRows
;)
{
for
(
u
int64_t
i
=
0
;
i
<
insertRows
;)
{
/*
if (insert_interval) {
st = taosGetTimestampMs();
...
...
@@ -5310,7 +5317,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pstr
+=
len
;
remainderBufLen
-=
len
;
int
generated
=
generateProgressiveDataBuffer
(
int
64_t
generated
=
generateProgressiveDataBuffer
(
tableName
,
tableSeq
,
pThreadInfo
,
pstr
,
insertRows
,
i
,
start_time
,
&
(
pThreadInfo
->
samplePos
),
...
...
@@ -5318,7 +5325,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
if
(
generated
>
0
)
i
+=
generated
;
else
goto
free_
and_statistics_2
;
goto
free_
of_progressive
;
start_time
+=
generated
*
timeStampStep
;
pThreadInfo
->
totalInsertRows
+=
generated
;
...
...
@@ -5328,17 +5335,23 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
int64_t
affectedRows
=
execInsert
(
pThreadInfo
,
buffer
,
generated
);
endTs
=
taosGetTimestampMs
();
int64_t
delay
=
endTs
-
startTs
;
u
int64_t
delay
=
endTs
-
startTs
;
performancePrint
(
"%s() LN%d, insert execution time is %"
PRId64
"ms
\n
"
,
__func__
,
__LINE__
,
delay
);
verbosePrint
(
"[%d] %s() LN%d affectedRows=%"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
affectedRows
);
if
(
delay
>
pThreadInfo
->
maxDelay
)
pThreadInfo
->
maxDelay
=
delay
;
if
(
delay
<
pThreadInfo
->
minDelay
)
pThreadInfo
->
minDelay
=
delay
;
pThreadInfo
->
cntDelay
++
;
pThreadInfo
->
totalDelay
+=
delay
;
if
(
affectedRows
<
0
)
goto
free_and_statistics_2
;
if
(
affectedRows
<
0
)
{
errorPrint
(
"%s() LN%d, affected rows: %"
PRId64
"
\n
"
,
__func__
,
__LINE__
,
affectedRows
);
goto
free_of_progressive
;
}
pThreadInfo
->
totalAffectedRows
+=
affectedRows
;
...
...
@@ -5377,13 +5390,9 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
}
}
// tableSeq
free_
and_statistics_2
:
free_
of_progressive
:
tmfree
(
buffer
);
printf
(
"====thread[%d] completed total inserted rows: %"
PRId64
", total affected rows: %"
PRId64
"====
\n
"
,
pThreadInfo
->
threadID
,
pThreadInfo
->
totalInsertRows
,
pThreadInfo
->
totalAffectedRows
);
printStatPerThread
(
pThreadInfo
);
return
NULL
;
}
...
...
@@ -5412,6 +5421,7 @@ static void* syncWrite(void *sarg) {
// progressive mode
return
syncWriteProgressive
(
pThreadInfo
);
}
}
static
void
callBack
(
void
*
param
,
TAOS_RES
*
res
,
int
code
)
{
...
...
@@ -5737,10 +5747,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pthread_join
(
pids
[
i
],
NULL
);
}
int64_t
totalDelay
=
0
;
int64_t
maxDelay
=
0
;
int64_t
minDelay
=
UINT64_MAX
;
int64_t
cntDelay
=
1
;
u
int64_t
totalDelay
=
0
;
u
int64_t
maxDelay
=
0
;
u
int64_t
minDelay
=
UINT64_MAX
;
u
int64_t
cntDelay
=
1
;
double
avgDelay
=
0
;
for
(
int
i
=
0
;
i
<
threads
;
i
++
)
{
...
...
@@ -5749,7 +5759,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
tsem_destroy
(
&
(
t_info
->
lock_sem
));
taos_close
(
t_info
->
taos
);
debugPrint
(
"%s() LN%d, [%d] totalInsert=%"
PRI
d64
" totalAffected=%"
PRId
64
"
\n
"
,
debugPrint
(
"%s() LN%d, [%d] totalInsert=%"
PRI
u64
" totalAffected=%"
PRIu
64
"
\n
"
,
__func__
,
__LINE__
,
t_info
->
threadID
,
t_info
->
totalInsertRows
,
t_info
->
totalAffectedRows
);
...
...
@@ -5775,35 +5785,42 @@ static void startMultiThreadInsertData(int threads, char* db_name,
int64_t
t
=
end
-
start
;
if
(
superTblInfo
)
{
printf
(
"Spent %.2f seconds to insert rows: %"
PRId64
", affected rows: %"
PRId64
" with %d thread(s) into %s.%s. %2.
f records/second
\n\n
"
,
fprintf
(
stderr
,
"Spent %.2f seconds to insert rows: %"
PRIu64
", affected rows: %"
PRIu64
" with %d thread(s) into %s.%s. %.2
f records/second
\n\n
"
,
t
/
1000
.
0
,
superTblInfo
->
totalInsertRows
,
superTblInfo
->
totalAffectedRows
,
threads
,
db_name
,
superTblInfo
->
sTblName
,
(
double
)
superTblInfo
->
totalInsertRows
/
(
t
/
1000
.
0
));
fprintf
(
g_fpOfInsertResult
,
"Spent %.2f seconds to insert rows: %"
PRId64
", affected rows: %"
PRId64
" with %d thread(s) into %s.%s. %2.f records/second
\n\n
"
,
if
(
g_fpOfInsertResult
)
{
fprintf
(
g_fpOfInsertResult
,
"Spent %.2f seconds to insert rows: %"
PRIu64
", affected rows: %"
PRIu64
" with %d thread(s) into %s.%s. %.2f records/second
\n\n
"
,
t
/
1000
.
0
,
superTblInfo
->
totalInsertRows
,
superTblInfo
->
totalAffectedRows
,
threads
,
db_name
,
superTblInfo
->
sTblName
,
(
double
)
superTblInfo
->
totalInsertRows
/
(
t
/
1000
.
0
));
}
}
else
{
printf
(
"Spent %.2f seconds to insert rows: %"
PRId64
", affected rows: %"
PRId64
" with %d thread(s) into %s %2.
f records/second
\n\n
"
,
fprintf
(
stderr
,
"Spent %.2f seconds to insert rows: %"
PRIu64
", affected rows: %"
PRIu64
" with %d thread(s) into %s %.2
f records/second
\n\n
"
,
t
/
1000
.
0
,
g_args
.
totalInsertRows
,
g_args
.
totalAffectedRows
,
threads
,
db_name
,
(
double
)
g_args
.
totalInsertRows
/
(
t
/
1000
.
0
));
fprintf
(
g_fpOfInsertResult
,
"Spent %.2f seconds to insert rows: %"
PRId64
", affected rows: %"
PRId64
" with %d thread(s) into %s %2.f records/second
\n\n
"
,
if
(
g_fpOfInsertResult
)
{
fprintf
(
g_fpOfInsertResult
,
"Spent %.2f seconds to insert rows: %"
PRIu64
", affected rows: %"
PRIu64
" with %d thread(s) into %s %.2f records/second
\n\n
"
,
t
*
1000
.
0
,
g_args
.
totalInsertRows
,
g_args
.
totalAffectedRows
,
threads
,
db_name
,
(
double
)
g_args
.
totalInsertRows
/
(
t
/
1000
.
0
));
}
}
printf
(
"insert delay, avg: %10.2fms, max: %"
PRId64
"ms, min: %"
PRId
64
"ms
\n\n
"
,
fprintf
(
stderr
,
"insert delay, avg: %10.2fms, max: %"
PRIu64
"ms, min: %"
PRIu
64
"ms
\n\n
"
,
avgDelay
,
maxDelay
,
minDelay
);
fprintf
(
g_fpOfInsertResult
,
"insert delay, avg:%10.2fms, max: %"
PRId64
"ms, min: %"
PRId64
"ms
\n\n
"
,
if
(
g_fpOfInsertResult
)
{
fprintf
(
g_fpOfInsertResult
,
"insert delay, avg:%10.2fms, max: %"
PRIu64
"ms, min: %"
PRIu64
"ms
\n\n
"
,
avgDelay
,
maxDelay
,
minDelay
);
}
//taos_close(taos);
...
...
@@ -5973,7 +5990,8 @@ static int insertTestProcess() {
return
-
1
;
}
printfInsertMetaToFile
(
g_fpOfInsertResult
);
if
(
g_fpOfInsertResult
)
printfInsertMetaToFile
(
g_fpOfInsertResult
);
if
(
!
g_args
.
answer_yes
)
{
printf
(
"Press enter key to continue
\n\n
"
);
...
...
@@ -5984,7 +6002,8 @@ static int insertTestProcess() {
// create database and super tables
if
(
createDatabasesAndStables
()
!=
0
)
{
fclose
(
g_fpOfInsertResult
);
if
(
g_fpOfInsertResult
)
fclose
(
g_fpOfInsertResult
);
return
-
1
;
}
...
...
@@ -6000,11 +6019,13 @@ static int insertTestProcess() {
end
=
taosGetTimestampMs
();
if
(
g_totalChildTables
>
0
)
{
printf
(
"Spent %.4f seconds to create %d tables with %d thread(s)
\n\n
"
,
fprintf
(
stderr
,
"Spent %.4f seconds to create %d tables with %d thread(s)
\n\n
"
,
(
end
-
start
)
/
1000
.
0
,
g_totalChildTables
,
g_Dbs
.
threadCountByCreateTbl
);
fprintf
(
g_fpOfInsertResult
,
if
(
g_fpOfInsertResult
)
{
fprintf
(
g_fpOfInsertResult
,
"Spent %.4f seconds to create %d tables with %d thread(s)
\n\n
"
,
(
end
-
start
)
/
1000
.
0
,
g_totalChildTables
,
g_Dbs
.
threadCountByCreateTbl
);
}
}
taosMsleep
(
1000
);
...
...
@@ -6077,14 +6098,14 @@ static void *specifiedTableQuery(void *sarg) {
return
NULL
;
}
int64_t
st
=
0
;
int64_t
et
=
0
;
u
int64_t
st
=
0
;
u
int64_t
et
=
0
;
in
t
queryTimes
=
g_queryInfo
.
specifiedQueryInfo
.
queryTimes
;
uint64_
t
queryTimes
=
g_queryInfo
.
specifiedQueryInfo
.
queryTimes
;
in
t
totalQueried
=
0
;
uint64_t
lastPrintTime
=
taosGetTimestampMs
();
uint64_t
startTs
=
taosGetTimestampMs
();
uint64_
t
totalQueried
=
0
;
uint64_t
lastPrintTime
=
taosGetTimestampMs
();
uint64_t
startTs
=
taosGetTimestampMs
();
while
(
queryTimes
--
)
{
if
(
g_queryInfo
.
specifiedQueryInfo
.
queryInterval
&&
(
et
-
st
)
<
...
...
@@ -6135,7 +6156,7 @@ static void *specifiedTableQuery(void *sarg) {
if
(
currentPrintTime
-
lastPrintTime
>
30
*
1000
)
{
debugPrint
(
"%s() LN%d, endTs=%"
PRIu64
"ms, startTs=%"
PRIu64
"ms
\n
"
,
__func__
,
__LINE__
,
endTs
,
startTs
);
printf
(
"thread[%d] has currently completed queries: %
d
, QPS: %10.6f
\n
"
,
printf
(
"thread[%d] has currently completed queries: %
"
PRIu64
"
, QPS: %10.6f
\n
"
,
pThreadInfo
->
threadID
,
totalQueried
,
(
double
)(
totalQueried
/
((
endTs
-
startTs
)
/
1000
.
0
)));
...
...
@@ -6187,14 +6208,14 @@ static void *superTableQuery(void *sarg) {
}
}
int64_t
st
=
0
;
int64_t
et
=
(
int64_t
)
g_queryInfo
.
superQueryInfo
.
queryInterval
;
u
int64_t
st
=
0
;
u
int64_t
et
=
(
int64_t
)
g_queryInfo
.
superQueryInfo
.
queryInterval
;
in
t
queryTimes
=
g_queryInfo
.
superQueryInfo
.
queryTimes
;
in
t
totalQueried
=
0
;
int64_t
startTs
=
taosGetTimestampMs
();
uint64_
t
queryTimes
=
g_queryInfo
.
superQueryInfo
.
queryTimes
;
uint64_
t
totalQueried
=
0
;
u
int64_t
startTs
=
taosGetTimestampMs
();
int64_t
lastPrintTime
=
taosGetTimestampMs
();
u
int64_t
lastPrintTime
=
taosGetTimestampMs
();
while
(
queryTimes
--
)
{
if
(
g_queryInfo
.
superQueryInfo
.
queryInterval
&&
(
et
-
st
)
<
(
int64_t
)
g_queryInfo
.
superQueryInfo
.
queryInterval
)
{
...
...
@@ -6221,7 +6242,7 @@ static void *superTableQuery(void *sarg) {
int64_t
currentPrintTime
=
taosGetTimestampMs
();
int64_t
endTs
=
taosGetTimestampMs
();
if
(
currentPrintTime
-
lastPrintTime
>
30
*
1000
)
{
printf
(
"thread[%d] has currently completed queries: %
d
, QPS: %10.3f
\n
"
,
printf
(
"thread[%d] has currently completed queries: %
"
PRIu64
"
, QPS: %10.3f
\n
"
,
pThreadInfo
->
threadID
,
totalQueried
,
(
double
)(
totalQueried
/
((
endTs
-
startTs
)
/
1000
.
0
)));
...
...
@@ -6285,7 +6306,7 @@ static int queryTestProcess() {
int
nConcurrent
=
g_queryInfo
.
specifiedQueryInfo
.
concurrent
;
int
nSqlCount
=
g_queryInfo
.
specifiedQueryInfo
.
sqlCount
;
int64_t
startTs
=
taosGetTimestampMs
();
u
int64_t
startTs
=
taosGetTimestampMs
();
if
((
nSqlCount
>
0
)
&&
(
nConcurrent
>
0
))
{
...
...
@@ -6345,16 +6366,16 @@ static int queryTestProcess() {
ERROR_EXIT
(
"memory allocation failed for create threads
\n
"
);
}
in
t
ntables
=
g_queryInfo
.
superQueryInfo
.
childTblCount
;
uint64_
t
ntables
=
g_queryInfo
.
superQueryInfo
.
childTblCount
;
int
threads
=
g_queryInfo
.
superQueryInfo
.
threadCnt
;
in
t
a
=
ntables
/
threads
;
uint64_
t
a
=
ntables
/
threads
;
if
(
a
<
1
)
{
threads
=
ntables
;
a
=
1
;
}
in
t
b
=
0
;
uint64_
t
b
=
0
;
if
(
threads
!=
0
)
{
b
=
ntables
%
threads
;
}
...
...
@@ -6396,12 +6417,12 @@ static int queryTestProcess() {
tmfree
((
char
*
)
infosOfSub
);
// taos_close(taos);// TODO: workaround to use separate taos connection;
int64_t
endTs
=
taosGetTimestampMs
();
u
int64_t
endTs
=
taosGetTimestampMs
();
in
t
totalQueried
=
g_queryInfo
.
specifiedQueryInfo
.
totalQueried
+
uint64_
t
totalQueried
=
g_queryInfo
.
specifiedQueryInfo
.
totalQueried
+
g_queryInfo
.
superQueryInfo
.
totalQueried
;
printf
(
"==== completed total queries: %d
, the QPS of all threads: %10.3f====
\n
"
,
fprintf
(
stderr
,
"==== completed total queries: %"
PRIu64
"
, the QPS of all threads: %10.3f====
\n
"
,
totalQueried
,
(
double
)(
totalQueried
/
((
endTs
-
startTs
)
/
1000
.
0
)));
return
0
;
...
...
src/kit/taosdump/taosdump.c
浏览文件 @
763f7126
...
...
@@ -72,7 +72,8 @@ enum _show_db_index {
TSDB_SHOW_DB_WALLEVEL_INDEX
,
TSDB_SHOW_DB_FSYNC_INDEX
,
TSDB_SHOW_DB_COMP_INDEX
,
TSDB_SHOW_DB_PRECISION_INDEX
,
TSDB_SHOW_DB_CACHELAST_INDEX
,
TSDB_SHOW_DB_PRECISION_INDEX
,
TSDB_SHOW_DB_UPDATE_INDEX
,
TSDB_SHOW_DB_STATUS_INDEX
,
TSDB_MAX_SHOW_DB
...
...
@@ -83,10 +84,10 @@ enum _show_tables_index {
TSDB_SHOW_TABLES_NAME_INDEX
,
TSDB_SHOW_TABLES_CREATED_TIME_INDEX
,
TSDB_SHOW_TABLES_COLUMNS_INDEX
,
TSDB_SHOW_TABLES_METRIC_INDEX
,
TSDB_SHOW_TABLES_UID_INDEX
,
TSDB_SHOW_TABLES_METRIC_INDEX
,
TSDB_SHOW_TABLES_UID_INDEX
,
TSDB_SHOW_TABLES_TID_INDEX
,
TSDB_SHOW_TABLES_VGID_INDEX
,
TSDB_SHOW_TABLES_VGID_INDEX
,
TSDB_MAX_SHOW_TABLES
};
...
...
@@ -134,6 +135,7 @@ typedef struct {
int8_t
wallevel
;
int32_t
fsync
;
int8_t
comp
;
int8_t
cachelast
;
char
precision
[
8
];
// time resolution
int8_t
update
;
char
status
[
16
];
...
...
@@ -360,19 +362,19 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
arguments
->
data_batch
=
atoi
(
arg
);
if
(
arguments
->
data_batch
>=
INT16_MAX
)
{
arguments
->
data_batch
=
INT16_MAX
-
1
;
}
}
break
;
case
'L'
:
case
'L'
:
{
int32_t
len
=
atoi
(
arg
);
if
(
len
>
TSDB_MAX_ALLOWED_SQL_LEN
)
{
len
=
TSDB_MAX_ALLOWED_SQL_LEN
;
}
else
if
(
len
<
TSDB_MAX_SQL_LEN
)
{
len
=
TSDB_MAX_SQL_LEN
;
}
}
arguments
->
max_sql_len
=
len
;
break
;
}
}
case
't'
:
arguments
->
table_batch
=
atoi
(
arg
);
break
;
...
...
@@ -415,12 +417,12 @@ static void taosStartDumpOutWorkThreads(void* taosCon, struct arguments* args, i
struct
arguments
g_args
=
{
// connection option
NULL
,
"root"
,
NULL
,
"root"
,
#ifdef _TD_POWER_
"powerdb"
,
"powerdb"
,
#else
"taosdata"
,
"taosdata"
,
#endif
0
,
""
,
...
...
@@ -677,10 +679,10 @@ int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo, TAOS
}
sprintf
(
tempCommand
,
"show tables like %s"
,
table
);
result
=
taos_query
(
taosCon
,
tempCommand
);
result
=
taos_query
(
taosCon
,
tempCommand
);
int32_t
code
=
taos_errno
(
result
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s
\n
"
,
tempCommand
);
free
(
tempCommand
);
...
...
@@ -707,12 +709,12 @@ int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo, TAOS
free
(
tempCommand
);
return
0
;
}
sprintf
(
tempCommand
,
"show stables like %s"
,
table
);
result
=
taos_query
(
taosCon
,
tempCommand
);
result
=
taos_query
(
taosCon
,
tempCommand
);
code
=
taos_errno
(
result
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s
\n
"
,
tempCommand
);
free
(
tempCommand
);
...
...
@@ -750,7 +752,7 @@ int32_t taosSaveAllNormalTableToTempFile(TAOS *taosCon, char*meter, char* metric
return
-
1
;
}
}
memset
(
&
tableRecord
,
0
,
sizeof
(
STableRecord
));
tstrncpy
(
tableRecord
.
name
,
meter
,
TSDB_TABLE_NAME_LEN
);
tstrncpy
(
tableRecord
.
metric
,
metric
,
TSDB_TABLE_NAME_LEN
);
...
...
@@ -772,7 +774,7 @@ int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric, struct argu
}
sprintf
(
tmpCommand
,
"select tbname from %s"
,
metric
);
TAOS_RES
*
res
=
taos_query
(
taosCon
,
tmpCommand
);
int32_t
code
=
taos_errno
(
res
);
if
(
code
!=
0
)
{
...
...
@@ -794,20 +796,20 @@ int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric, struct argu
}
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
res
);
int32_t
numOfTable
=
0
;
while
((
row
=
taos_fetch_row
(
res
))
!=
NULL
)
{
while
((
row
=
taos_fetch_row
(
res
))
!=
NULL
)
{
memset
(
&
tableRecord
,
0
,
sizeof
(
STableRecord
));
tstrncpy
(
tableRecord
.
name
,
(
char
*
)
row
[
0
],
fields
[
0
].
bytes
);
tstrncpy
(
tableRecord
.
metric
,
metric
,
TSDB_TABLE_NAME_LEN
);
taosWrite
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
));
taosWrite
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
));
numOfTable
++
;
}
taos_free_result
(
res
);
lseek
(
fd
,
0
,
SEEK_SET
);
int
maxThreads
=
arguments
->
thread_num
;
int
tableOfPerFile
;
if
(
numOfTable
<=
arguments
->
thread_num
)
{
...
...
@@ -817,16 +819,16 @@ int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric, struct argu
tableOfPerFile
=
numOfTable
/
arguments
->
thread_num
;
if
(
0
!=
numOfTable
%
arguments
->
thread_num
)
{
tableOfPerFile
+=
1
;
}
}
}
char
*
tblBuf
=
(
char
*
)
calloc
(
1
,
tableOfPerFile
*
sizeof
(
STableRecord
));
if
(
NULL
==
tblBuf
){
fprintf
(
stderr
,
"failed to calloc %"
PRIzu
"
\n
"
,
tableOfPerFile
*
sizeof
(
STableRecord
));
fprintf
(
stderr
,
"failed to calloc %"
PRIzu
"
\n
"
,
tableOfPerFile
*
sizeof
(
STableRecord
));
close
(
fd
);
return
-
1
;
}
int32_t
numOfThread
=
*
totalNumOfThread
;
int
subFd
=
-
1
;
for
(;
numOfThread
<
maxThreads
;
numOfThread
++
)
{
...
...
@@ -840,7 +842,7 @@ int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric, struct argu
(
void
)
remove
(
tmpBuf
);
}
sprintf
(
tmpBuf
,
".select-tbname.tmp"
);
(
void
)
remove
(
tmpBuf
);
(
void
)
remove
(
tmpBuf
);
free
(
tblBuf
);
close
(
fd
);
return
-
1
;
...
...
@@ -858,11 +860,11 @@ int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric, struct argu
sprintf
(
tmpBuf
,
".select-tbname.tmp"
);
(
void
)
remove
(
tmpBuf
);
if
(
fd
>=
0
)
{
close
(
fd
);
fd
=
-
1
;
}
}
*
totalNumOfThread
=
numOfThread
;
...
...
@@ -886,7 +888,7 @@ int taosDumpOut(struct arguments *arguments) {
}
else
{
sprintf
(
tmpBuf
,
"dbs.sql"
);
}
fp
=
fopen
(
tmpBuf
,
"w"
);
if
(
fp
==
NULL
)
{
fprintf
(
stderr
,
"failed to open file %s
\n
"
,
tmpBuf
);
...
...
@@ -918,9 +920,9 @@ int taosDumpOut(struct arguments *arguments) {
taosDumpCharset
(
fp
);
sprintf
(
command
,
"show databases"
);
result
=
taos_query
(
taos
,
command
);
result
=
taos_query
(
taos
,
command
);
int32_t
code
=
taos_errno
(
result
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command: %s, reason: %s
\n
"
,
command
,
taos_errstr
(
result
));
goto
_exit_failure
;
...
...
@@ -960,12 +962,12 @@ int taosDumpOut(struct arguments *arguments) {
strncpy
(
dbInfos
[
count
]
->
name
,
(
char
*
)
row
[
TSDB_SHOW_DB_NAME_INDEX
],
fields
[
TSDB_SHOW_DB_NAME_INDEX
].
bytes
);
if
(
arguments
->
with_property
)
{
dbInfos
[
count
]
->
ntables
=
*
((
int32_t
*
)
row
[
TSDB_SHOW_DB_NTABLES_INDEX
]);
dbInfos
[
count
]
->
vgroups
=
*
((
int32_t
*
)
row
[
TSDB_SHOW_DB_VGROUPS_INDEX
]);
dbInfos
[
count
]
->
vgroups
=
*
((
int32_t
*
)
row
[
TSDB_SHOW_DB_VGROUPS_INDEX
]);
dbInfos
[
count
]
->
replica
=
*
((
int16_t
*
)
row
[
TSDB_SHOW_DB_REPLICA_INDEX
]);
dbInfos
[
count
]
->
quorum
=
*
((
int16_t
*
)
row
[
TSDB_SHOW_DB_QUORUM_INDEX
]);
dbInfos
[
count
]
->
days
=
*
((
int16_t
*
)
row
[
TSDB_SHOW_DB_DAYS_INDEX
]);
dbInfos
[
count
]
->
days
=
*
((
int16_t
*
)
row
[
TSDB_SHOW_DB_DAYS_INDEX
]);
strncpy
(
dbInfos
[
count
]
->
keeplist
,
(
char
*
)
row
[
TSDB_SHOW_DB_KEEP_INDEX
],
fields
[
TSDB_SHOW_DB_KEEP_INDEX
].
bytes
);
strncpy
(
dbInfos
[
count
]
->
keeplist
,
(
char
*
)
row
[
TSDB_SHOW_DB_KEEP_INDEX
],
fields
[
TSDB_SHOW_DB_KEEP_INDEX
].
bytes
);
//dbInfos[count]->daysToKeep = *((int16_t *)row[TSDB_SHOW_DB_KEEP_INDEX]);
//dbInfos[count]->daysToKeep1;
//dbInfos[count]->daysToKeep2;
...
...
@@ -976,8 +978,9 @@ int taosDumpOut(struct arguments *arguments) {
dbInfos
[
count
]
->
wallevel
=
*
((
int8_t
*
)
row
[
TSDB_SHOW_DB_WALLEVEL_INDEX
]);
dbInfos
[
count
]
->
fsync
=
*
((
int32_t
*
)
row
[
TSDB_SHOW_DB_FSYNC_INDEX
]);
dbInfos
[
count
]
->
comp
=
(
int8_t
)(
*
((
int8_t
*
)
row
[
TSDB_SHOW_DB_COMP_INDEX
]));
dbInfos
[
count
]
->
cachelast
=
(
int8_t
)(
*
((
int8_t
*
)
row
[
TSDB_SHOW_DB_CACHELAST_INDEX
]));
strncpy
(
dbInfos
[
count
]
->
precision
,
(
char
*
)
row
[
TSDB_SHOW_DB_PRECISION_INDEX
],
fields
[
TSDB_SHOW_DB_PRECISION_INDEX
].
bytes
);
strncpy
(
dbInfos
[
count
]
->
precision
,
(
char
*
)
row
[
TSDB_SHOW_DB_PRECISION_INDEX
],
fields
[
TSDB_SHOW_DB_PRECISION_INDEX
].
bytes
);
//dbInfos[count]->precision = *((int8_t *)row[TSDB_SHOW_DB_PRECISION_INDEX]);
dbInfos
[
count
]
->
update
=
*
((
int8_t
*
)
row
[
TSDB_SHOW_DB_UPDATE_INDEX
]);
}
...
...
@@ -1009,8 +1012,8 @@ int taosDumpOut(struct arguments *arguments) {
g_resultStatistics
.
totalDatabasesOfDumpOut
++
;
sprintf
(
command
,
"use %s"
,
dbInfos
[
0
]
->
name
);
result
=
taos_query
(
taos
,
command
);
result
=
taos_query
(
taos
,
command
);
int32_t
code
=
taos_errno
(
result
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"invalid database %s
\n
"
,
dbInfos
[
0
]
->
name
);
...
...
@@ -1040,7 +1043,7 @@ int taosDumpOut(struct arguments *arguments) {
int
ret
=
taosDumpStable
(
tableRecordInfo
.
tableRecord
.
metric
,
fp
,
taos
,
dbInfos
[
0
]
->
name
);
if
(
0
==
ret
)
{
superTblCnt
++
;
}
}
}
retCode
=
taosSaveAllNormalTableToTempFile
(
taos
,
tableRecordInfo
.
tableRecord
.
name
,
tableRecordInfo
.
tableRecord
.
metric
,
&
normalTblFd
);
}
...
...
@@ -1052,7 +1055,7 @@ int taosDumpOut(struct arguments *arguments) {
goto
_clean_tmp_file
;
}
}
// TODO: save dump super table <superTblCnt> into result_output.txt
fprintf
(
g_fpOfResult
,
"# super table counter: %d
\n
"
,
superTblCnt
);
g_resultStatistics
.
totalSuperTblsOfDumpOut
+=
superTblCnt
;
...
...
@@ -1078,7 +1081,7 @@ int taosDumpOut(struct arguments *arguments) {
taos_close
(
taos
);
taos_free_result
(
result
);
tfree
(
command
);
taosFreeDbInfos
();
taosFreeDbInfos
();
fprintf
(
stderr
,
"dump out rows: %"
PRId64
"
\n
"
,
totalDumpOutRows
);
return
0
;
...
...
@@ -1099,8 +1102,8 @@ int taosGetTableDes(char* dbName, char *table, STableDef *tableDes, TAOS* taosCo
char
sqlstr
[
COMMAND_SIZE
];
sprintf
(
sqlstr
,
"describe %s.%s;"
,
dbName
,
table
);
res
=
taos_query
(
taosCon
,
sqlstr
);
res
=
taos_query
(
taosCon
,
sqlstr
);
int32_t
code
=
taos_errno
(
res
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command <%s>, reason:%s
\n
"
,
sqlstr
,
taos_errstr
(
res
));
...
...
@@ -1130,23 +1133,23 @@ int taosGetTableDes(char* dbName, char *table, STableDef *tableDes, TAOS* taosCo
if
(
isSuperTable
)
{
return
count
;
}
// if chidl-table have tag, using select tagName from table to get tagValue
for
(
int
i
=
0
;
i
<
count
;
i
++
)
{
if
(
strcmp
(
tableDes
->
cols
[
i
].
note
,
"TAG"
)
!=
0
)
continue
;
sprintf
(
sqlstr
,
"select %s from %s.%s"
,
tableDes
->
cols
[
i
].
field
,
dbName
,
table
);
res
=
taos_query
(
taosCon
,
sqlstr
);
res
=
taos_query
(
taosCon
,
sqlstr
);
code
=
taos_errno
(
res
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command <%s>, reason:%s
\n
"
,
sqlstr
,
taos_errstr
(
res
));
taos_free_result
(
res
);
return
-
1
;
}
fields
=
taos_fetch_fields
(
res
);
fields
=
taos_fetch_fields
(
res
);
row
=
taos_fetch_row
(
res
);
if
(
NULL
==
row
)
{
...
...
@@ -1161,7 +1164,7 @@ int taosGetTableDes(char* dbName, char *table, STableDef *tableDes, TAOS* taosCo
res
=
NULL
;
continue
;
}
int32_t
*
length
=
taos_fetch_lengths
(
res
);
//int32_t* length = taos_fetch_lengths(tmpResult);
...
...
@@ -1198,7 +1201,7 @@ int taosGetTableDes(char* dbName, char *table, STableDef *tableDes, TAOS* taosCo
}
case
TSDB_DATA_TYPE_NCHAR
:
{
memset
(
tableDes
->
cols
[
i
].
note
,
0
,
sizeof
(
tableDes
->
cols
[
i
].
note
));
char
tbuf
[
COL_NOTE_LEN
-
2
];
// need reserve 2 bytes for ' '
char
tbuf
[
COL_NOTE_LEN
-
2
];
// need reserve 2 bytes for ' '
convertNCharToReadable
((
char
*
)
row
[
0
],
length
[
0
],
tbuf
,
COL_NOTE_LEN
);
sprintf
(
tableDes
->
cols
[
i
].
note
,
"
\'
%s
\'
"
,
tbuf
);
break
;
...
...
@@ -1221,9 +1224,9 @@ int taosGetTableDes(char* dbName, char *table, STableDef *tableDes, TAOS* taosCo
default:
break
;
}
taos_free_result
(
res
);
res
=
NULL
;
res
=
NULL
;
}
return
count
;
...
...
@@ -1282,9 +1285,10 @@ void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, FILE *fp) {
pstr
+=
sprintf
(
pstr
,
"CREATE DATABASE IF NOT EXISTS %s "
,
dbInfo
->
name
);
if
(
isDumpProperty
)
{
pstr
+=
sprintf
(
pstr
,
"TABLES %d VGROUPS %d REPLICA %d QUORUM %d DAYS %d KEEP %s CACHE %d BLOCKS %d MINROWS %d MAXROWS %d WALLEVEL %d FYNC %d COMP %d PRECISION '%s' UPDATE %d"
,
dbInfo
->
ntables
,
dbInfo
->
vgroups
,
dbInfo
->
replica
,
dbInfo
->
quorum
,
dbInfo
->
days
,
dbInfo
->
keeplist
,
dbInfo
->
cache
,
dbInfo
->
blocks
,
dbInfo
->
minrows
,
dbInfo
->
maxrows
,
dbInfo
->
wallevel
,
dbInfo
->
fsync
,
dbInfo
->
comp
,
dbInfo
->
precision
,
dbInfo
->
update
);
"REPLICA %d QUORUM %d DAYS %d KEEP %s CACHE %d BLOCKS %d MINROWS %d MAXROWS %d FSYNC %d CACHELAST %d COMP %d PRECISION '%s' UPDATE %d"
,
dbInfo
->
replica
,
dbInfo
->
quorum
,
dbInfo
->
days
,
dbInfo
->
keeplist
,
dbInfo
->
cache
,
dbInfo
->
blocks
,
dbInfo
->
minrows
,
dbInfo
->
maxrows
,
dbInfo
->
fsync
,
dbInfo
->
cachelast
,
dbInfo
->
comp
,
dbInfo
->
precision
,
dbInfo
->
update
);
}
pstr
+=
sprintf
(
pstr
,
";"
);
...
...
@@ -1295,8 +1299,8 @@ void* taosDumpOutWorkThreadFp(void *arg)
{
SThreadParaObj
*
pThread
=
(
SThreadParaObj
*
)
arg
;
STableRecord
tableRecord
;
int
fd
;
int
fd
;
char
tmpBuf
[
TSDB_FILENAME_LEN
*
4
]
=
{
0
};
sprintf
(
tmpBuf
,
".tables.tmp.%d"
,
pThread
->
threadIndex
);
fd
=
open
(
tmpBuf
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRGRP
|
S_IXGRP
|
S_IROTH
);
...
...
@@ -1307,13 +1311,13 @@ void* taosDumpOutWorkThreadFp(void *arg)
FILE
*
fp
=
NULL
;
memset
(
tmpBuf
,
0
,
TSDB_FILENAME_LEN
+
128
);
if
(
g_args
.
outpath
[
0
]
!=
0
)
{
sprintf
(
tmpBuf
,
"%s/%s.tables.%d.sql"
,
g_args
.
outpath
,
pThread
->
dbName
,
pThread
->
threadIndex
);
}
else
{
sprintf
(
tmpBuf
,
"%s.tables.%d.sql"
,
pThread
->
dbName
,
pThread
->
threadIndex
);
}
fp
=
fopen
(
tmpBuf
,
"w"
);
if
(
fp
==
NULL
)
{
fprintf
(
stderr
,
"failed to open file %s
\n
"
,
tmpBuf
);
...
...
@@ -1323,13 +1327,13 @@ void* taosDumpOutWorkThreadFp(void *arg)
memset
(
tmpBuf
,
0
,
TSDB_FILENAME_LEN
);
sprintf
(
tmpBuf
,
"use %s"
,
pThread
->
dbName
);
TAOS_RES
*
tmpResult
=
taos_query
(
pThread
->
taosCon
,
tmpBuf
);
TAOS_RES
*
tmpResult
=
taos_query
(
pThread
->
taosCon
,
tmpBuf
);
int32_t
code
=
taos_errno
(
tmpResult
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"invalid database %s
\n
"
,
pThread
->
dbName
);
taos_free_result
(
tmpResult
);
fclose
(
fp
);
fclose
(
fp
);
close
(
fd
);
return
NULL
;
}
...
...
@@ -1347,7 +1351,7 @@ void* taosDumpOutWorkThreadFp(void *arg)
// TODO: sum table count and table rows by self
pThread
->
tablesOfDumpOut
++
;
pThread
->
rowsOfDumpOut
+=
ret
;
if
(
pThread
->
rowsOfDumpOut
>=
lastRowsPrint
)
{
printf
(
" %"
PRId64
" rows already be dumpout from database %s
\n
"
,
pThread
->
rowsOfDumpOut
,
pThread
->
dbName
);
lastRowsPrint
+=
5000000
;
...
...
@@ -1357,15 +1361,15 @@ void* taosDumpOutWorkThreadFp(void *arg)
if
(
tablesInOneFile
>=
g_args
.
table_batch
)
{
fclose
(
fp
);
tablesInOneFile
=
0
;
memset
(
tmpBuf
,
0
,
TSDB_FILENAME_LEN
+
128
);
memset
(
tmpBuf
,
0
,
TSDB_FILENAME_LEN
+
128
);
if
(
g_args
.
outpath
[
0
]
!=
0
)
{
sprintf
(
tmpBuf
,
"%s/%s.tables.%d-%d.sql"
,
g_args
.
outpath
,
pThread
->
dbName
,
pThread
->
threadIndex
,
fileNameIndex
);
}
else
{
sprintf
(
tmpBuf
,
"%s.tables.%d-%d.sql"
,
pThread
->
dbName
,
pThread
->
threadIndex
,
fileNameIndex
);
}
fileNameIndex
++
;
fp
=
fopen
(
tmpBuf
,
"w"
);
if
(
fp
==
NULL
)
{
fprintf
(
stderr
,
"failed to open file %s
\n
"
,
tmpBuf
);
...
...
@@ -1379,7 +1383,7 @@ void* taosDumpOutWorkThreadFp(void *arg)
taos_free_result
(
tmpResult
);
close
(
fd
);
fclose
(
fp
);
fclose
(
fp
);
return
NULL
;
}
...
...
@@ -1395,7 +1399,7 @@ static void taosStartDumpOutWorkThreads(void* taosCon, struct arguments* args, i
pThread
->
threadIndex
=
t
;
pThread
->
totalThreads
=
numOfThread
;
tstrncpy
(
pThread
->
dbName
,
dbName
,
TSDB_TABLE_NAME_LEN
);
pThread
->
taosCon
=
taosCon
;
pThread
->
taosCon
=
taosCon
;
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
...
...
@@ -1410,7 +1414,7 @@ static void taosStartDumpOutWorkThreads(void* taosCon, struct arguments* args, i
pthread_join
(
threadObj
[
t
].
threadID
,
NULL
);
}
// TODO: sum all thread dump table count and rows of per table, then save into result_output.txt
// TODO: sum all thread dump table count and rows of per table, then save into result_output.txt
int64_t
totalRowsOfDumpOut
=
0
;
int64_t
totalChildTblsOfDumpOut
=
0
;
for
(
int32_t
t
=
0
;
t
<
numOfThread
;
++
t
)
{
...
...
@@ -1451,7 +1455,7 @@ int32_t taosDumpStable(char *table, FILE *fp, TAOS* taosCon, char* dbName) {
}
int32_t
taosDumpCreateSuperTableClause
(
TAOS
*
taosCon
,
char
*
dbName
,
FILE
*
fp
)
int32_t
taosDumpCreateSuperTableClause
(
TAOS
*
taosCon
,
char
*
dbName
,
FILE
*
fp
)
{
TAOS_ROW
row
;
int
fd
=
-
1
;
...
...
@@ -1459,8 +1463,8 @@ int32_t taosDumpCreateSuperTableClause(TAOS* taosCon, char* dbName, FILE *fp)
char
sqlstr
[
TSDB_MAX_SQL_LEN
]
=
{
0
};
sprintf
(
sqlstr
,
"show %s.stables"
,
dbName
);
TAOS_RES
*
res
=
taos_query
(
taosCon
,
sqlstr
);
TAOS_RES
*
res
=
taos_query
(
taosCon
,
sqlstr
);
int32_t
code
=
taos_errno
(
res
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command <%s>, reason: %s
\n
"
,
sqlstr
,
taos_errstr
(
res
));
...
...
@@ -1480,13 +1484,13 @@ int32_t taosDumpCreateSuperTableClause(TAOS* taosCon, char* dbName, FILE *fp)
(
void
)
remove
(
".stables.tmp"
);
exit
(
-
1
);
}
while
((
row
=
taos_fetch_row
(
res
))
!=
NULL
)
{
while
((
row
=
taos_fetch_row
(
res
))
!=
NULL
)
{
memset
(
&
tableRecord
,
0
,
sizeof
(
STableRecord
));
strncpy
(
tableRecord
.
name
,
(
char
*
)
row
[
TSDB_SHOW_TABLES_NAME_INDEX
],
fields
[
TSDB_SHOW_TABLES_NAME_INDEX
].
bytes
);
taosWrite
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
));
}
}
taos_free_result
(
res
);
(
void
)
lseek
(
fd
,
0
,
SEEK_SET
);
...
...
@@ -1494,7 +1498,7 @@ int32_t taosDumpCreateSuperTableClause(TAOS* taosCon, char* dbName, FILE *fp)
while
(
1
)
{
ssize_t
readLen
=
read
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
));
if
(
readLen
<=
0
)
break
;
int
ret
=
taosDumpStable
(
tableRecord
.
name
,
fp
,
taosCon
,
dbName
);
if
(
0
==
ret
)
{
superTblCnt
++
;
...
...
@@ -1507,8 +1511,8 @@ int32_t taosDumpCreateSuperTableClause(TAOS* taosCon, char* dbName, FILE *fp)
close
(
fd
);
(
void
)
remove
(
".stables.tmp"
);
return
0
;
return
0
;
}
...
...
@@ -1518,19 +1522,19 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao
STableRecord
tableRecord
;
taosDumpCreateDbClause
(
dbInfo
,
arguments
->
with_property
,
fp
);
fprintf
(
g_fpOfResult
,
"
\n
#### database: %s
\n
"
,
dbInfo
->
name
);
g_resultStatistics
.
totalDatabasesOfDumpOut
++
;
char
sqlstr
[
TSDB_MAX_SQL_LEN
]
=
{
0
};
fprintf
(
fp
,
"USE %s;
\n\n
"
,
dbInfo
->
name
);
(
void
)
taosDumpCreateSuperTableClause
(
taosCon
,
dbInfo
->
name
,
fp
);
sprintf
(
sqlstr
,
"show %s.tables"
,
dbInfo
->
name
);
TAOS_RES
*
res
=
taos_query
(
taosCon
,
sqlstr
);
TAOS_RES
*
res
=
taos_query
(
taosCon
,
sqlstr
);
int
code
=
taos_errno
(
res
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command <%s>, reason:%s
\n
"
,
sqlstr
,
taos_errstr
(
res
));
...
...
@@ -1549,15 +1553,15 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao
}
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
res
);
int32_t
numOfTable
=
0
;
while
((
row
=
taos_fetch_row
(
res
))
!=
NULL
)
{
while
((
row
=
taos_fetch_row
(
res
))
!=
NULL
)
{
memset
(
&
tableRecord
,
0
,
sizeof
(
STableRecord
));
tstrncpy
(
tableRecord
.
name
,
(
char
*
)
row
[
TSDB_SHOW_TABLES_NAME_INDEX
],
fields
[
TSDB_SHOW_TABLES_NAME_INDEX
].
bytes
);
tstrncpy
(
tableRecord
.
metric
,
(
char
*
)
row
[
TSDB_SHOW_TABLES_METRIC_INDEX
],
fields
[
TSDB_SHOW_TABLES_METRIC_INDEX
].
bytes
);
taosWrite
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
));
numOfTable
++
;
}
taos_free_result
(
res
);
...
...
@@ -1572,7 +1576,7 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao
tableOfPerFile
=
numOfTable
/
g_args
.
thread_num
;
if
(
0
!=
numOfTable
%
g_args
.
thread_num
)
{
tableOfPerFile
+=
1
;
}
}
}
char
*
tblBuf
=
(
char
*
)
calloc
(
1
,
tableOfPerFile
*
sizeof
(
STableRecord
));
...
...
@@ -1581,7 +1585,7 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao
close
(
fd
);
return
-
1
;
}
int32_t
numOfThread
=
0
;
int
subFd
=
-
1
;
for
(
numOfThread
=
0
;
numOfThread
<
maxThreads
;
numOfThread
++
)
{
...
...
@@ -1618,7 +1622,7 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao
close
(
fd
);
fd
=
-
1
;
}
taos_free_result
(
res
);
// start multi threads to dumpout
...
...
@@ -1626,7 +1630,7 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao
for
(
int
loopCnt
=
0
;
loopCnt
<
numOfThread
;
loopCnt
++
)
{
sprintf
(
tmpBuf
,
".tables.tmp.%d"
,
loopCnt
);
(
void
)
remove
(
tmpBuf
);
}
}
free
(
tblBuf
);
return
0
;
...
...
@@ -1737,7 +1741,7 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS*
char
*
pstr
=
NULL
;
TAOS_ROW
row
=
NULL
;
int
numFields
=
0
;
if
(
arguments
->
schemaonly
)
{
return
0
;
}
...
...
@@ -1752,11 +1756,11 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS*
pstr
=
tmpBuffer
;
char
sqlstr
[
1024
]
=
{
0
};
sprintf
(
sqlstr
,
"select * from %s.%s where _c0 >= %"
PRId64
" and _c0 <= %"
PRId64
" order by _c0 asc;"
,
sprintf
(
sqlstr
,
"select * from %s.%s where _c0 >= %"
PRId64
" and _c0 <= %"
PRId64
" order by _c0 asc;"
,
dbName
,
tbname
,
arguments
->
start_time
,
arguments
->
end_time
);
TAOS_RES
*
tmpResult
=
taos_query
(
taosCon
,
sqlstr
);
TAOS_RES
*
tmpResult
=
taos_query
(
taosCon
,
sqlstr
);
int32_t
code
=
taos_errno
(
tmpResult
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s, reason: %s
\n
"
,
sqlstr
,
taos_errstr
(
tmpResult
));
...
...
@@ -1776,7 +1780,7 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS*
while
((
row
=
taos_fetch_row
(
tmpResult
))
!=
NULL
)
{
pstr
=
tmpBuffer
;
curr_sqlstr_len
=
0
;
int32_t
*
length
=
taos_fetch_lengths
(
tmpResult
);
// act len
if
(
count
==
0
)
{
...
...
@@ -1831,7 +1835,7 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS*
converStringToReadable
((
char
*
)
row
[
col
],
length
[
col
],
tbuf
,
COMMAND_SIZE
);
//pstr = stpcpy(pstr, tbuf);
//*(pstr++) = '\'';
pstr
+=
sprintf
(
pstr
+
curr_sqlstr_len
,
"
\'
%s
\'
"
,
tbuf
);
pstr
+=
sprintf
(
pstr
+
curr_sqlstr_len
,
"
\'
%s
\'
"
,
tbuf
);
break
;
}
case
TSDB_DATA_TYPE_NCHAR
:
{
...
...
@@ -1859,10 +1863,10 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS*
curr_sqlstr_len
+=
sprintf
(
pstr
+
curr_sqlstr_len
,
") "
);
totalRows
++
;
totalRows
++
;
count
++
;
fprintf
(
fp
,
"%s"
,
tmpBuffer
);
if
(
totalRows
>=
lastRowsPrint
)
{
printf
(
" %"
PRId64
" rows already be dumpout from %s.%s
\n
"
,
totalRows
,
dbName
,
tbname
);
lastRowsPrint
+=
5000000
;
...
...
@@ -2208,7 +2212,7 @@ static FILE* taosOpenDumpInFile(char *fptr) {
}
char
*
fname
=
full_path
.
we_wordv
[
0
];
FILE
*
f
=
fopen
(
fname
,
"r"
);
if
(
f
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: failed to open file %s
\n
"
,
fname
);
...
...
@@ -2242,7 +2246,7 @@ int taosDumpInOneFile(TAOS * taos, FILE* fp, char* fcharset, char* encode, c
line
[
--
read_len
]
=
'\0'
;
//if (read_len == 0 || isCommentLine(line)) { // line starts with #
if
(
read_len
==
0
)
{
if
(
read_len
==
0
)
{
continue
;
}
...
...
@@ -2261,8 +2265,8 @@ int taosDumpInOneFile(TAOS * taos, FILE* fp, char* fcharset, char* encode, c
}
memset
(
cmd
,
0
,
TSDB_MAX_ALLOWED_SQL_LEN
);
cmd_len
=
0
;
cmd_len
=
0
;
if
(
lineNo
>=
lastRowsPrint
)
{
printf
(
" %d lines already be executed from file %s
\n
"
,
lineNo
,
fileName
);
lastRowsPrint
+=
5000000
;
...
...
@@ -2302,7 +2306,7 @@ static void taosStartDumpInWorkThreads(void* taosCon, struct arguments *args)
if
(
totalThreads
>
tsSqlFileNum
)
{
totalThreads
=
tsSqlFileNum
;
}
SThreadParaObj
*
threadObj
=
(
SThreadParaObj
*
)
calloc
(
totalThreads
,
sizeof
(
SThreadParaObj
));
for
(
int32_t
t
=
0
;
t
<
totalThreads
;
++
t
)
{
pThread
=
threadObj
+
t
;
...
...
@@ -2332,7 +2336,7 @@ static void taosStartDumpInWorkThreads(void* taosCon, struct arguments *args)
int
taosDumpIn
(
struct
arguments
*
arguments
)
{
assert
(
arguments
->
isDumpIn
);
TAOS
*
taos
=
NULL
;
FILE
*
fp
=
NULL
;
...
...
@@ -2347,22 +2351,22 @@ int taosDumpIn(struct arguments *arguments) {
int32_t
tsSqlFileNumOfTbls
=
tsSqlFileNum
;
if
(
tsDbSqlFile
[
0
]
!=
0
)
{
tsSqlFileNumOfTbls
--
;
fp
=
taosOpenDumpInFile
(
tsDbSqlFile
);
if
(
NULL
==
fp
)
{
fprintf
(
stderr
,
"failed to open input file %s
\n
"
,
tsDbSqlFile
);
return
-
1
;
}
fprintf
(
stderr
,
"Success Open input file: %s
\n
"
,
tsDbSqlFile
);
taosLoadFileCharset
(
fp
,
tsfCharset
);
taosDumpInOneFile
(
taos
,
fp
,
tsfCharset
,
arguments
->
encode
,
tsDbSqlFile
);
}
if
(
0
!=
tsSqlFileNumOfTbls
)
{
taosStartDumpInWorkThreads
(
taos
,
arguments
);
}
}
taos_close
(
taos
);
taosFreeSQLFiles
();
...
...
src/plugins/http/inc/httpInt.h
浏览文件 @
763f7126
...
...
@@ -171,7 +171,7 @@ typedef struct HttpThread {
EpollFd
pollFd
;
int32_t
numOfContexts
;
int32_t
threadId
;
char
label
[
HTTP_LABEL_SIZE
];
char
label
[
HTTP_LABEL_SIZE
<<
1
];
bool
(
*
processData
)(
HttpContext
*
pContext
);
}
HttpThread
;
...
...
src/util/src/tnettest.c
浏览文件 @
763f7126
...
...
@@ -539,7 +539,7 @@ static void taosNetTestServer(char *host, int32_t startPort, int32_t pkgLen) {
}
void
taosNetTest
(
char
*
role
,
char
*
host
,
int32_t
port
,
int32_t
pkgLen
)
{
//
tscEmbedded = 1;
tscEmbedded
=
1
;
if
(
host
==
NULL
)
host
=
tsLocalFqdn
;
if
(
port
==
0
)
port
=
tsServerPort
;
if
(
pkgLen
<=
10
)
pkgLen
=
1000
;
...
...
@@ -550,6 +550,7 @@ void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen) {
}
else
if
(
0
==
strcmp
(
"server"
,
role
))
{
taosNetTestServer
(
host
,
port
,
pkgLen
);
}
else
if
(
0
==
strcmp
(
"rpc"
,
role
))
{
tscEmbedded
=
0
;
taosNetTestRpc
(
host
,
port
,
pkgLen
);
}
else
if
(
0
==
strcmp
(
"sync"
,
role
))
{
taosNetCheckSync
(
host
,
port
);
...
...
@@ -559,5 +560,5 @@ void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen) {
taosNetTestStartup
(
host
,
port
);
}
//
tscEmbedded = 0;
tscEmbedded
=
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录