Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
807e86a3
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
807e86a3
编写于
7月 08, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
7月 08, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #2594 from taosdata/hotfix/test
[coverity scan]
上级
af1d98fc
fb55414c
变更
5
显示空白变更内容
内联
并排
Showing
5 changed file
with
1317 addition
and
574 deletion
+1317
-574
src/kit/taosdemo/taosdemo.c
src/kit/taosdemo/taosdemo.c
+10
-0
src/kit/taosdump/taosdump.c
src/kit/taosdump/taosdump.c
+1298
-574
src/kit/taosmigrate/taosmigrate.c
src/kit/taosmigrate/taosmigrate.c
+1
-0
src/kit/taosmigrate/taosmigrateMnodeWal.c
src/kit/taosmigrate/taosmigrateMnodeWal.c
+6
-0
src/kit/taosmigrate/taosmigrateVnodeCfg.c
src/kit/taosmigrate/taosmigrateVnodeCfg.c
+2
-0
未找到文件。
src/kit/taosdemo/taosdemo.c
浏览文件 @
807e86a3
...
...
@@ -870,6 +870,11 @@ void *readTable(void *sarg) {
int64_t
sTime
=
rinfo
->
start_time
;
char
*
tb_prefix
=
rinfo
->
tb_prefix
;
FILE
*
fp
=
fopen
(
rinfo
->
fp
,
"a"
);
if
(
NULL
==
fp
)
{
printf
(
"fopen %s fail, reason:%s.
\n
"
,
rinfo
->
fp
,
strerror
(
errno
));
return
NULL
;
}
int
num_of_DPT
=
rinfo
->
nrecords_per_table
;
int
num_of_tables
=
rinfo
->
end_table_id
-
rinfo
->
start_table_id
+
1
;
int
totalData
=
num_of_DPT
*
num_of_tables
;
...
...
@@ -925,6 +930,11 @@ void *readMetric(void *sarg) {
TAOS
*
taos
=
rinfo
->
taos
;
char
command
[
BUFFER_SIZE
]
=
"
\0
"
;
FILE
*
fp
=
fopen
(
rinfo
->
fp
,
"a"
);
if
(
NULL
==
fp
)
{
printf
(
"fopen %s fail, reason:%s.
\n
"
,
rinfo
->
fp
,
strerror
(
errno
));
return
NULL
;
}
int
num_of_DPT
=
rinfo
->
nrecords_per_table
;
int
num_of_tables
=
rinfo
->
end_table_id
-
rinfo
->
start_table_id
+
1
;
int
totalData
=
num_of_DPT
*
num_of_tables
;
...
...
src/kit/taosdump/taosdump.c
浏览文件 @
807e86a3
...
...
@@ -27,19 +27,18 @@
#include <unistd.h>
#include <wordexp.h>
#include <iconv.h>
#include <time.h>
#include "taos.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "tsclient.h"
#include "t
aosdef
.h"
#include "t
sdb
.h"
#include "tutil.h"
#include "tglobal.h"
#define COMMAND_SIZE 65536
#define DEFAULT_DUMP_FILE "taosdump.sql"
#define MAX_DBS 100
//#define DEFAULT_DUMP_FILE "taosdump.sql"
int
converStringToReadable
(
char
*
str
,
int
size
,
char
*
buf
,
int
bufsize
);
int
convertNCharToReadable
(
char
*
str
,
int
size
,
char
*
buf
,
int
bufsize
);
...
...
@@ -90,21 +89,21 @@ enum _describe_table_index {
};
typedef
struct
{
char
field
[
TSDB_COL_NAME_LEN
];
char
field
[
TSDB_COL_NAME_LEN
+
1
];
char
type
[
16
];
int
length
;
char
note
[
128
];
}
SColDes
;
typedef
struct
{
char
name
[
TSDB_COL_NAME_LEN
];
char
name
[
TSDB_COL_NAME_LEN
+
1
];
SColDes
cols
[];
}
STableDef
;
extern
char
version
[];
typedef
struct
{
char
name
[
TSDB_DB_NAME_LEN
];
char
name
[
TSDB_DB_NAME_LEN
+
1
];
int32_t
replica
;
int32_t
days
;
int32_t
keep
;
...
...
@@ -119,8 +118,8 @@ typedef struct {
}
SDbInfo
;
typedef
struct
{
char
name
[
TSDB_TABLE_NAME_LEN
];
char
metric
[
TSDB_TABLE_NAME_LEN
];
char
name
[
TSDB_TABLE_NAME_LEN
+
1
];
char
metric
[
TSDB_TABLE_NAME_LEN
+
1
];
}
STableRecord
;
typedef
struct
{
...
...
@@ -128,6 +127,16 @@ typedef struct {
STableRecord
tableRecord
;
}
STableRecordInfo
;
typedef
struct
{
pthread_t
threadID
;
int32_t
threadIndex
;
int32_t
totalThreads
;
char
dbName
[
TSDB_TABLE_NAME_LEN
+
1
];
void
*
taosCon
;
}
SThreadParaObj
;
static
int64_t
totalDumpOutRows
=
0
;
SDbInfo
**
dbInfos
=
NULL
;
const
char
*
argp_program_version
=
version
;
...
...
@@ -142,7 +151,7 @@ static char doc[] = "";
/* to force a line-break, e.g.\n<-- here."; */
/* A description of the arguments we accept. */
static
char
args_doc
[]
=
"dbname [tbname ...]
\n
--databases dbname ...
\n
--all-databases
\n
-i inp
ut_file
"
;
static
char
args_doc
[]
=
"dbname [tbname ...]
\n
--databases dbname ...
\n
--all-databases
\n
-i inp
ath
\n
-o outpath
"
;
/* Keys for options without short-options. */
#define OPT_ABORT 1
/* –abort */
...
...
@@ -154,9 +163,11 @@ static struct argp_option options[] = {
{
"user"
,
'u'
,
"USER"
,
0
,
"User name used to connect to server. Default is root."
,
0
},
{
"password"
,
'p'
,
"PASSWORD"
,
0
,
"User password to connect to server. Default is taosdata."
,
0
},
{
"port"
,
'P'
,
"PORT"
,
0
,
"Port to connect"
,
0
},
{
"cversion"
,
'v'
,
"CVERION"
,
0
,
"client version"
,
0
},
{
"mysqlFlag"
,
'q'
,
"MYSQLFLAG"
,
0
,
"mysqlFlag, Default is 0"
,
0
},
// input/output file
{
"outp
ut"
,
'o'
,
"OUTPUT"
,
0
,
"Output file name."
,
1
},
{
"inp
ut"
,
'i'
,
"INPUT"
,
0
,
"Input file name."
,
1
},
{
"outp
ath"
,
'o'
,
"OUTPATH"
,
0
,
"Output file path."
,
1
},
{
"inp
ath"
,
'i'
,
"INPATH"
,
0
,
"Input file path."
,
1
},
{
"config"
,
'c'
,
"CONFIG_DIR"
,
0
,
"Configure directory. Default is /etc/taos/taos.cfg."
,
1
},
{
"encode"
,
'e'
,
"ENCODE"
,
0
,
"Input file encoding."
,
1
},
// dump unit options
...
...
@@ -168,19 +179,23 @@ static struct argp_option options[] = {
{
"start-time"
,
'S'
,
"START_TIME"
,
0
,
"Start time to dump."
,
3
},
{
"end-time"
,
'E'
,
"END_TIME"
,
0
,
"End time to dump."
,
3
},
{
"data-batch"
,
'N'
,
"DATA_BATCH"
,
0
,
"Number of data point per insert statement. Default is 1."
,
3
},
{
"table-batch"
,
'T'
,
"TABLE_BATCH"
,
0
,
"Number of table dumpout into one output file. Default is 1."
,
3
},
{
"thread_num"
,
't'
,
"THREAD_NUM"
,
0
,
"Number of thread for dump in file. Default is 5."
,
3
},
{
"allow-sys"
,
'a'
,
0
,
0
,
"Allow to dump sys database"
,
3
},
{
0
}};
/* Used by main to communicate with parse_opt. */
typedef
struct
SDumpA
rguments
{
struct
a
rguments
{
// connection option
char
*
host
;
char
*
user
;
char
*
password
;
uint16_t
port
;
char
cversion
[
TSDB_FILENAME_LEN
+
1
];
uint16_t
mysqlFlag
;
// output file
char
output
[
TSDB_FILENAME_LEN
];
char
input
[
TSDB_FILENAME_LEN
];
char
outpath
[
TSDB_FILENAME_LEN
+
1
];
char
inpath
[
TSDB_FILENAME_LEN
+
1
];
char
*
encode
;
// dump unit option
bool
all_databases
;
...
...
@@ -190,20 +205,22 @@ typedef struct SDumpArguments {
bool
with_property
;
int64_t
start_time
;
int64_t
end_time
;
int
data_batch
;
int32_t
data_batch
;
int32_t
table_batch
;
// num of table which will be dump into one output file.
bool
allow_sys
;
// other options
int32_t
thread_num
;
int
abort
;
char
**
arg_list
;
int
arg_list_len
;
bool
isDumpIn
;
}
SDumpArguments
;
};
/* Parse a single option. */
static
error_t
parse_opt
(
int
key
,
char
*
arg
,
struct
argp_state
*
state
)
{
/* Get the input argument from argp_parse, which we
know is a pointer to our arguments structure. */
SDumpA
rguments
*
arguments
=
state
->
input
;
struct
a
rguments
*
arguments
=
state
->
input
;
wordexp_t
full_path
;
switch
(
key
)
{
...
...
@@ -223,13 +240,24 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case
'P'
:
arguments
->
port
=
atoi
(
arg
);
break
;
// output file
case
'q'
:
arguments
->
mysqlFlag
=
atoi
(
arg
);
break
;
case
'v'
:
if
(
wordexp
(
arg
,
&
full_path
,
0
)
!=
0
)
{
fprintf
(
stderr
,
"Invalid client vesion %s
\n
"
,
arg
);
return
-
1
;
}
strcpy
(
arguments
->
cversion
,
full_path
.
we_wordv
[
0
]);
wordfree
(
&
full_path
);
break
;
// output file path
case
'o'
:
if
(
wordexp
(
arg
,
&
full_path
,
0
)
!=
0
)
{
fprintf
(
stderr
,
"Invalid path %s
\n
"
,
arg
);
return
-
1
;
}
tstrncpy
(
arguments
->
output
,
full_path
.
we_wordv
[
0
],
TSDB_FILENAME_LEN
);
strcpy
(
arguments
->
outpath
,
full_path
.
we_wordv
[
0
]
);
wordfree
(
&
full_path
);
break
;
case
'i'
:
...
...
@@ -238,7 +266,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
fprintf
(
stderr
,
"Invalid path %s
\n
"
,
arg
);
return
-
1
;
}
tstrncpy
(
arguments
->
input
,
full_path
.
we_wordv
[
0
],
TSDB_FILENAME_LEN
);
strcpy
(
arguments
->
inpath
,
full_path
.
we_wordv
[
0
]
);
wordfree
(
&
full_path
);
break
;
case
'c'
:
...
...
@@ -246,7 +274,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
fprintf
(
stderr
,
"Invalid path %s
\n
"
,
arg
);
return
-
1
;
}
tstrncpy
(
configDir
,
full_path
.
we_wordv
[
0
],
TSDB_FILENAME_LEN
);
strcpy
(
configDir
,
full_path
.
we_wordv
[
0
]
);
wordfree
(
&
full_path
);
break
;
case
'e'
:
...
...
@@ -276,6 +304,12 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case
'N'
:
arguments
->
data_batch
=
atoi
(
arg
);
break
;
case
'T'
:
arguments
->
table_batch
=
atoi
(
arg
);
break
;
case
't'
:
arguments
->
thread_num
=
atoi
(
arg
);
break
;
case
OPT_ABORT
:
arguments
->
abort
=
1
;
break
;
...
...
@@ -294,52 +328,70 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
/* Our argp parser. */
static
struct
argp
argp
=
{
options
,
parse_opt
,
args_doc
,
doc
};
TAOS
*
taos
=
NULL
;
char
*
command
=
NULL
;
char
*
lcommand
=
NULL
;
char
*
buffer
=
NULL
;
int
taosDumpOut
(
SDumpArguments
*
arguments
);
int
taosDumpIn
(
SDumpArguments
*
arguments
);
int
taosDumpOut
(
struct
arguments
*
arguments
);
int
taosDumpIn
(
struct
arguments
*
arguments
);
void
taosDumpCreateDbClause
(
SDbInfo
*
dbInfo
,
bool
isDumpProperty
,
FILE
*
fp
);
int
taosDumpDb
(
SDbInfo
*
dbInfo
,
SDumpArguments
*
arguments
,
FILE
*
fp
);
void
taosDumpCreateTableClause
(
STableDef
*
tableDes
,
int
numOfCols
,
SDumpArguments
*
arguments
,
FILE
*
fp
);
void
taosDumpCreateMTableClause
(
STableDef
*
tableDes
,
char
*
metric
,
int
numOfCols
,
SDumpArguments
*
arguments
,
FILE
*
fp
);
int32_t
taosDumpTable
(
char
*
table
,
char
*
metric
,
SDumpArguments
*
arguments
,
FILE
*
fp
);
int32_t
taosDumpMetric
(
char
*
metric
,
SDumpArguments
*
arguments
,
FILE
*
fp
);
int
taosDumpTableData
(
FILE
*
fp
,
char
*
tbname
,
SDumpArguments
*
arguments
);
int
taosCheckParam
(
SDumpArguments
*
arguments
);
int
taosDumpDb
(
SDbInfo
*
dbInfo
,
struct
arguments
*
arguments
,
FILE
*
fp
,
TAOS
*
taosCon
);
int32_t
taosDumpStable
(
char
*
table
,
FILE
*
fp
,
TAOS
*
taosCon
);
void
taosDumpCreateTableClause
(
STableDef
*
tableDes
,
int
numOfCols
,
FILE
*
fp
);
void
taosDumpCreateMTableClause
(
STableDef
*
tableDes
,
char
*
metric
,
int
numOfCols
,
FILE
*
fp
);
int32_t
taosDumpTable
(
char
*
table
,
char
*
metric
,
struct
arguments
*
arguments
,
FILE
*
fp
,
TAOS
*
taosCon
);
int
taosDumpTableData
(
FILE
*
fp
,
char
*
tbname
,
struct
arguments
*
arguments
,
TAOS
*
taosCon
);
int
taosCheckParam
(
struct
arguments
*
arguments
);
void
taosFreeDbInfos
();
static
void
taosStartDumpOutWorkThreads
(
struct
arguments
*
args
,
int32_t
numOfThread
,
char
*
dbName
);
int
main
(
int
argc
,
char
*
argv
[])
{
SDumpArguments
arguments
=
{
struct
arguments
tsArguments
=
{
// connection option
NULL
,
TSDB_DEFAULT_USER
,
TSDB_DEFAULT_PASS
,
0
,
// output file
DEFAULT_DUMP_FILE
,
DEFAULT_DUMP_FILE
,
NULL
,
NULL
,
"root"
,
"taosdata"
,
0
,
""
,
0
,
// outpath and inpath
""
,
""
,
NULL
,
// dump unit option
false
,
false
,
false
,
false
,
// dump format option
false
,
false
,
0
,
INT64_MAX
,
1
,
false
,
false
,
false
,
0
,
INT64_MAX
,
1
,
1
,
false
,
// other options
0
,
NULL
,
0
,
false
};
5
,
0
,
NULL
,
0
,
false
};
int
queryDB
(
TAOS
*
taos
,
char
*
command
)
{
TAOS_RES
*
pSql
=
NULL
;
int32_t
code
=
-
1
;
pSql
=
taos_query
(
taos
,
command
);
code
=
taos_errno
(
pSql
);
if
(
code
)
{
fprintf
(
stderr
,
"sql error: %s, reason:%s
\n
"
,
command
,
taos_errstr
(
pSql
));
}
taos_free_result
(
pSql
);
return
code
;
}
int
main
(
int
argc
,
char
*
argv
[])
{
/* Parse our arguments; every option seen by parse_opt will be
reflected in arguments. */
argp_parse
(
&
argp
,
argc
,
argv
,
0
,
0
,
&
a
rguments
);
argp_parse
(
&
argp
,
argc
,
argv
,
0
,
0
,
&
tsA
rguments
);
if
(
a
rguments
.
abort
)
{
if
(
tsA
rguments
.
abort
)
{
#ifndef _ALPINE
error
(
10
,
0
,
"ABORTED"
);
#else
...
...
@@ -347,14 +399,48 @@ int main(int argc, char *argv[]) {
#endif
}
if
(
taosCheckParam
(
&
arguments
)
<
0
)
{
printf
(
"====== arguments config ======
\n
"
);
{
printf
(
"host: %s
\n
"
,
tsArguments
.
host
);
printf
(
"user: %s
\n
"
,
tsArguments
.
user
);
printf
(
"password: %s
\n
"
,
tsArguments
.
password
);
printf
(
"port: %u
\n
"
,
tsArguments
.
port
);
printf
(
"cversion: %s
\n
"
,
tsArguments
.
cversion
);
printf
(
"mysqlFlag: %d"
,
tsArguments
.
mysqlFlag
);
printf
(
"outpath: %s
\n
"
,
tsArguments
.
outpath
);
printf
(
"inpath: %s
\n
"
,
tsArguments
.
inpath
);
printf
(
"encode: %s
\n
"
,
tsArguments
.
encode
);
printf
(
"all_databases: %d
\n
"
,
tsArguments
.
all_databases
);
printf
(
"databases: %d
\n
"
,
tsArguments
.
databases
);
printf
(
"schemaonly: %d
\n
"
,
tsArguments
.
schemaonly
);
printf
(
"with_property: %d
\n
"
,
tsArguments
.
with_property
);
printf
(
"start_time: %"
PRId64
"
\n
"
,
tsArguments
.
start_time
);
printf
(
"end_time: %"
PRId64
"
\n
"
,
tsArguments
.
end_time
);
printf
(
"data_batch: %d
\n
"
,
tsArguments
.
data_batch
);
printf
(
"table_batch: %d
\n
"
,
tsArguments
.
table_batch
);
printf
(
"allow_sys: %d
\n
"
,
tsArguments
.
allow_sys
);
printf
(
"abort: %d
\n
"
,
tsArguments
.
abort
);
printf
(
"isDumpIn: %d
\n
"
,
tsArguments
.
isDumpIn
);
printf
(
"arg_list_len: %d
\n
"
,
tsArguments
.
arg_list_len
);
for
(
int32_t
i
=
0
;
i
<
tsArguments
.
arg_list_len
;
i
++
)
{
printf
(
"arg_list[%d]: %s
\n
"
,
i
,
tsArguments
.
arg_list
[
i
]);
}
}
printf
(
"==============================
\n
"
);
if
(
tsArguments
.
cversion
[
0
]
!=
0
){
strcpy
(
version
,
tsArguments
.
cversion
);
}
if
(
taosCheckParam
(
&
tsArguments
)
<
0
)
{
exit
(
EXIT_FAILURE
);
}
if
(
a
rguments
.
isDumpIn
)
{
if
(
taosDumpIn
(
&
a
rguments
)
<
0
)
return
-
1
;
if
(
tsA
rguments
.
isDumpIn
)
{
if
(
taosDumpIn
(
&
tsA
rguments
)
<
0
)
return
-
1
;
}
else
{
if
(
taosDumpOut
(
&
a
rguments
)
<
0
)
return
-
1
;
if
(
taosDumpOut
(
&
tsA
rguments
)
<
0
)
return
-
1
;
}
return
0
;
...
...
@@ -362,96 +448,214 @@ int main(int argc, char *argv[]) {
void
taosFreeDbInfos
()
{
if
(
dbInfos
==
NULL
)
return
;
for
(
int
i
=
0
;
i
<
MAX_DBS
;
i
++
)
tfree
(
dbInfos
[
i
]);
for
(
int
i
=
0
;
i
<
128
;
i
++
)
tfree
(
dbInfos
[
i
]);
tfree
(
dbInfos
);
}
int
taosGetTableRecordInfo
(
char
*
table
,
STableRecordInfo
*
pTableRecordInfo
)
{
// check table is normal table or super table
int
taosGetTableRecordInfo
(
char
*
table
,
STableRecordInfo
*
pTableRecordInfo
,
TAOS
*
taosCon
)
{
TAOS_ROW
row
=
NULL
;
bool
isSet
=
false
;
TAOS_RES
*
result
=
NULL
;
memset
(
pTableRecordInfo
,
0
,
sizeof
(
STableRecordInfo
));
sprintf
(
command
,
"show tables like %s"
,
table
);
TAOS_RES
*
result
=
taos_query
(
taos
,
command
);
\
char
*
tempCommand
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
tempCommand
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
return
-
1
;
}
sprintf
(
tempCommand
,
"show tables like %s"
,
table
);
result
=
taos_query
(
taosCon
,
tempCommand
);
int32_t
code
=
taos_errno
(
result
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s, error: %s
\n
"
,
command
,
taos_errstr
(
result
));
fprintf
(
stderr
,
"failed to run command %s
\n
"
,
tempCommand
);
free
(
tempCommand
);
taos_free_result
(
result
);
return
-
1
;
}
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
result
);
if
((
row
=
taos_fetch_row
(
result
))
!=
NULL
)
{
while
((
row
=
taos_fetch_row
(
result
))
!=
NULL
)
{
isSet
=
true
;
pTableRecordInfo
->
isMetric
=
false
;
strncpy
(
pTableRecordInfo
->
tableRecord
.
name
,
(
char
*
)
row
[
TSDB_SHOW_TABLES_NAME_INDEX
],
fields
[
TSDB_SHOW_TABLES_NAME_INDEX
].
bytes
);
strncpy
(
pTableRecordInfo
->
tableRecord
.
metric
,
(
char
*
)
row
[
TSDB_SHOW_TABLES_METRIC_INDEX
],
fields
[
TSDB_SHOW_TABLES_METRIC_INDEX
].
bytes
);
break
;
}
taos_free_result
(
result
);
result
=
NULL
;
if
(
isSet
)
return
0
;
if
(
isSet
)
{
free
(
tempCommand
);
return
0
;
}
sprintf
(
c
ommand
,
"show stables like %s"
,
table
);
sprintf
(
tempC
ommand
,
"show stables like %s"
,
table
);
result
=
taos_query
(
taos
,
command
);
result
=
taos_query
(
taos
Con
,
tempCommand
);
code
=
taos_errno
(
result
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s, error: %s
\n
"
,
command
,
taos_errstr
(
result
));
fprintf
(
stderr
,
"failed to run command %s
\n
"
,
tempCommand
);
free
(
tempCommand
);
taos_free_result
(
result
);
return
-
1
;
}
if
((
row
=
taos_fetch_row
(
result
))
!=
NULL
)
{
while
((
row
=
taos_fetch_row
(
result
))
!=
NULL
)
{
isSet
=
true
;
pTableRecordInfo
->
isMetric
=
true
;
tstrncpy
(
pTableRecordInfo
->
tableRecord
.
metric
,
table
,
TSDB_TABLE_NAME_LEN
);
strcpy
(
pTableRecordInfo
->
tableRecord
.
metric
,
table
);
break
;
}
taos_free_result
(
result
);
result
=
NULL
;
if
(
isSet
)
return
0
;
if
(
isSet
)
{
free
(
tempCommand
);
return
0
;
}
fprintf
(
stderr
,
"invalid table/metric %s
\n
"
,
table
);
free
(
tempCommand
);
return
-
1
;
}
int32_t
taosSaveAllNormalTableToTempFile
(
TAOS
*
taosCon
,
char
*
meter
,
char
*
metric
,
int
*
fd
)
{
STableRecord
tableRecord
;
if
(
-
1
==
*
fd
)
{
*
fd
=
open
(
".tables.tmp.0"
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRGRP
|
S_IXGRP
|
S_IROTH
);
if
(
*
fd
==
-
1
)
{
fprintf
(
stderr
,
"failed to open temp file: .tables.tmp.0
\n
"
);
return
-
1
;
}
}
memset
(
tableRecord
.
name
,
0
,
sizeof
(
STableRecord
));
strcpy
(
tableRecord
.
name
,
meter
);
strcpy
(
tableRecord
.
metric
,
metric
);
twrite
(
*
fd
,
&
tableRecord
,
sizeof
(
STableRecord
));
return
0
;
}
int32_t
taosSaveTableOfMetricToTempFile
(
TAOS
*
taosCon
,
char
*
metric
,
struct
arguments
*
arguments
,
int32_t
*
totalNumOfThread
)
{
TAOS_ROW
row
;
int
fd
=
-
1
;
STableRecord
tableRecord
;
char
*
tmpCommand
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
tmpCommand
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
return
-
1
;
}
sprintf
(
tmpCommand
,
"select tbname from %s"
,
metric
);
TAOS_RES
*
result
=
taos_query
(
taosCon
,
tmpCommand
);
int32_t
code
=
taos_errno
(
result
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s
\n
"
,
tmpCommand
);
free
(
tmpCommand
);
taos_free_result
(
result
);
return
-
1
;
}
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
result
);
int32_t
numOfTable
=
0
;
int32_t
numOfThread
=
*
totalNumOfThread
;
char
tmpFileName
[
TSDB_FILENAME_LEN
+
1
];
while
((
row
=
taos_fetch_row
(
result
))
!=
NULL
)
{
if
(
0
==
numOfTable
)
{
memset
(
tmpFileName
,
0
,
TSDB_FILENAME_LEN
);
sprintf
(
tmpFileName
,
".tables.tmp.%d"
,
numOfThread
);
fd
=
open
(
tmpFileName
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRGRP
|
S_IXGRP
|
S_IROTH
);
if
(
fd
==
-
1
)
{
fprintf
(
stderr
,
"failed to open temp file: %s
\n
"
,
tmpFileName
);
taos_free_result
(
result
);
for
(
int32_t
loopCnt
=
0
;
loopCnt
<
numOfThread
;
loopCnt
++
)
{
sprintf
(
tmpFileName
,
".tables.tmp.%d"
,
loopCnt
);
remove
(
tmpFileName
);
}
free
(
tmpCommand
);
return
-
1
;
}
numOfThread
++
;
}
memset
(
tableRecord
.
name
,
0
,
sizeof
(
STableRecord
));
strncpy
(
tableRecord
.
name
,
(
char
*
)
row
[
0
],
fields
[
0
].
bytes
);
strcpy
(
tableRecord
.
metric
,
metric
);
twrite
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
));
numOfTable
++
;
if
(
numOfTable
>=
arguments
->
table_batch
)
{
numOfTable
=
0
;
tclose
(
fd
);
fd
=
-
1
;
}
}
tclose
(
fd
);
fd
=
-
1
;
taos_free_result
(
result
);
*
totalNumOfThread
=
numOfThread
;
free
(
tmpCommand
);
return
0
;
}
int
taosDumpOut
(
SDumpArguments
*
arguments
)
{
int
taosDumpOut
(
struct
arguments
*
arguments
)
{
TAOS
*
taos
=
NULL
;
TAOS_RES
*
result
=
NULL
;
char
*
command
=
NULL
;
TAOS_ROW
row
;
TAOS_RES
*
result
=
NULL
;
char
*
temp
=
NULL
;
FILE
*
fp
=
NULL
;
int
count
=
0
;
int
32_t
count
=
0
;
STableRecordInfo
tableRecordInfo
;
fp
=
fopen
(
arguments
->
output
,
"w"
);
char
tmpBuf
[
TSDB_FILENAME_LEN
+
9
]
=
{
0
};
if
(
arguments
->
outpath
[
0
]
!=
0
)
{
sprintf
(
tmpBuf
,
"%s/dbs.sql"
,
arguments
->
outpath
);
}
else
{
sprintf
(
tmpBuf
,
"dbs.sql"
);
}
fp
=
fopen
(
tmpBuf
,
"w"
);
if
(
fp
==
NULL
)
{
fprintf
(
stderr
,
"failed to open file %s
\n
"
,
arguments
->
output
);
fprintf
(
stderr
,
"failed to open file %s
\n
"
,
tmpBuf
);
return
-
1
;
}
dbInfos
=
(
SDbInfo
**
)
calloc
(
MAX_DBS
,
sizeof
(
SDbInfo
*
));
dbInfos
=
(
SDbInfo
**
)
calloc
(
128
,
sizeof
(
SDbInfo
*
));
if
(
dbInfos
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
goto
_exit_failure
;
}
temp
=
(
char
*
)
malloc
(
2
*
COMMAND_SIZE
);
if
(
temp
==
NULL
)
{
command
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
command
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
goto
_exit_failure
;
}
command
=
temp
;
buffer
=
command
+
COMMAND_SIZE
;
/* Connect to server */
taos
=
taos_connect
(
arguments
->
host
,
arguments
->
user
,
arguments
->
password
,
NULL
,
arguments
->
port
);
if
(
taos
==
NULL
)
{
...
...
@@ -467,27 +671,28 @@ int taosDumpOut(SDumpArguments *arguments) {
sprintf
(
command
,
"show databases"
);
result
=
taos_query
(
taos
,
command
);
int32_t
code
=
taos_errno
(
result
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command: %s, reason: %s
\n
"
,
command
,
taos_errstr
(
result
));
taos_free_result
(
result
);
fprintf
(
stderr
,
"failed to run command: %s, reason: %s
\n
"
,
command
,
taos_errstr
(
taos
));
goto
_exit_failure
;
}
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
result
);
while
((
row
=
taos_fetch_row
(
result
))
!=
NULL
)
{
// sys database name : 'monitor', but subsequent version changed to 'log'
if
(
strncasecmp
(
row
[
TSDB_SHOW_DB_NAME_INDEX
],
"monitor"
,
fields
[
TSDB_SHOW_DB_NAME_INDEX
].
bytes
)
==
0
&&
(
!
arguments
->
allow_sys
))
continue
;
if
(
arguments
->
databases
)
{
if
(
arguments
->
databases
)
{
// input multi dbs
for
(
int
i
=
0
;
arguments
->
arg_list
[
i
];
i
++
)
{
if
(
strncasecmp
(
arguments
->
arg_list
[
i
],
(
char
*
)
row
[
TSDB_SHOW_DB_NAME_INDEX
],
fields
[
TSDB_SHOW_DB_NAME_INDEX
].
bytes
)
==
0
)
goto
_dump_db_point
;
}
continue
;
}
else
if
(
!
arguments
->
all_databases
)
{
}
else
if
(
!
arguments
->
all_databases
)
{
// only input one db
if
(
strncasecmp
(
arguments
->
arg_list
[
0
],
(
char
*
)
row
[
TSDB_SHOW_DB_NAME_INDEX
],
fields
[
TSDB_SHOW_DB_NAME_INDEX
].
bytes
)
==
0
)
goto
_dump_db_point
;
...
...
@@ -504,7 +709,7 @@ int taosDumpOut(SDumpArguments *arguments) {
}
strncpy
(
dbInfos
[
count
]
->
name
,
(
char
*
)
row
[
TSDB_SHOW_DB_NAME_INDEX
],
fields
[
TSDB_SHOW_DB_NAME_INDEX
].
bytes
);
if
(
strcmp
(
arguments
->
user
,
TSDB_DEFAULT_USER
)
==
0
)
{
#if 0
dbInfos[count]->replica = (int)(*((int16_t *)row[TSDB_SHOW_DB_REPLICA_INDEX]));
dbInfos[count]->days = (int)(*((int16_t *)row[TSDB_SHOW_DB_DAYS_INDEX]));
dbInfos[count]->keep = *((int *)row[TSDB_SHOW_DB_KEEP_INDEX]);
...
...
@@ -516,7 +721,7 @@ int taosDumpOut(SDumpArguments *arguments) {
dbInfos[count]->ctime = *((int *)row[TSDB_SHOW_DB_CTIME_INDEX]);
dbInfos[count]->clog = (int)(*((int8_t *)row[TSDB_SHOW_DB_CLOG_INDEX]));
dbInfos[count]->comp = (int)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX]));
}
#endif
count
++
;
...
...
@@ -528,42 +733,71 @@ int taosDumpOut(SDumpArguments *arguments) {
}
}
// taos_free_result(result);
if
(
count
==
0
)
{
fprintf
(
stderr
,
"No databases valid to dump
\n
"
);
goto
_exit_failure
;
}
if
(
arguments
->
databases
||
arguments
->
all_databases
)
{
if
(
arguments
->
databases
||
arguments
->
all_databases
)
{
// case: taosdump --databases dbx dby ... OR taosdump --all-databases
for
(
int
i
=
0
;
i
<
count
;
i
++
)
{
(
void
)
taosDumpDb
(
dbInfos
[
i
],
arguments
,
fp
);
taosDumpDb
(
dbInfos
[
i
],
arguments
,
fp
,
taos
);
}
}
else
{
if
(
arguments
->
arg_list_len
==
1
)
{
(
void
)
taosDumpDb
(
dbInfos
[
0
],
arguments
,
fp
);
}
else
{
if
(
arguments
->
arg_list_len
==
1
)
{
// case: taosdump <db>
taosDumpDb
(
dbInfos
[
0
],
arguments
,
fp
,
taos
);
}
else
{
// case: taosdump <db> tablex tabley ...
taosDumpCreateDbClause
(
dbInfos
[
0
],
arguments
->
with_property
,
fp
);
sprintf
(
command
,
"use %s"
,
dbInfos
[
0
]
->
name
);
if
(
taos_query
(
taos
,
command
)
==
NULL
)
{
result
=
taos_query
(
taos
,
command
);
int32_t
code
=
taos_errno
(
result
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"invalid database %s
\n
"
,
dbInfos
[
0
]
->
name
);
goto
_exit_failure
;
}
fprintf
(
fp
,
"USE %s;
\n\n
"
,
dbInfos
[
0
]
->
name
);
int32_t
totalNumOfThread
=
1
;
// 0: all normal talbe into .tables.tmp.0
int
normalTblFd
=
-
1
;
int32_t
retCode
;
for
(
int
i
=
1
;
arguments
->
arg_list
[
i
];
i
++
)
{
if
(
taosGetTableRecordInfo
(
arguments
->
arg_list
[
i
],
&
tableRecordInfo
)
<
0
)
{
fprintf
(
stderr
,
"invalide table %s
\n
"
,
arguments
->
arg_list
[
i
]);
if
(
taosGetTableRecordInfo
(
arguments
->
arg_list
[
i
],
&
tableRecordInfo
,
taos
)
<
0
)
{
fprintf
(
stderr
,
"in
put the in
valide table %s
\n
"
,
arguments
->
arg_list
[
i
]);
continue
;
}
if
(
tableRecordInfo
.
isMetric
)
{
// dump whole metric
(
void
)
taosDumpMetric
(
tableRecordInfo
.
tableRecord
.
metric
,
arguments
,
fp
);
}
else
{
// dump MTable and NTable
(
void
)
taosDumpTable
(
tableRecordInfo
.
tableRecord
.
name
,
tableRecordInfo
.
tableRecord
.
metric
,
arguments
,
fp
);
if
(
tableRecordInfo
.
isMetric
)
{
// dump all table of this metric
(
void
)
taosDumpStable
(
tableRecordInfo
.
tableRecord
.
metric
,
fp
,
taos
);
retCode
=
taosSaveTableOfMetricToTempFile
(
taos
,
tableRecordInfo
.
tableRecord
.
metric
,
arguments
,
&
totalNumOfThread
);
}
else
{
if
(
tableRecordInfo
.
tableRecord
.
metric
[
0
]
!=
'\0'
)
{
// dump this sub table and it's metric
(
void
)
taosDumpStable
(
tableRecordInfo
.
tableRecord
.
metric
,
fp
,
taos
);
}
retCode
=
taosSaveAllNormalTableToTempFile
(
taos
,
tableRecordInfo
.
tableRecord
.
name
,
tableRecordInfo
.
tableRecord
.
metric
,
&
normalTblFd
);
}
if
(
retCode
<
0
)
{
if
(
-
1
!=
normalTblFd
){
tclose
(
normalTblFd
);
}
goto
_clean_tmp_file
;
}
}
if
(
-
1
!=
normalTblFd
){
tclose
(
normalTblFd
);
}
// start multi threads to dumpout
taosStartDumpOutWorkThreads
(
arguments
,
totalNumOfThread
,
dbInfos
[
0
]
->
name
);
char
tmpFileName
[
TSDB_FILENAME_LEN
+
1
];
_clean_tmp_file:
for
(
int
loopCnt
=
0
;
loopCnt
<
totalNumOfThread
;
loopCnt
++
)
{
sprintf
(
tmpFileName
,
".tables.tmp.%d"
,
loopCnt
);
remove
(
tmpFileName
);
}
}
}
...
...
@@ -572,413 +806,600 @@ int taosDumpOut(SDumpArguments *arguments) {
fclose
(
fp
);
taos_close
(
taos
);
taos_free_result
(
result
);
tfree
(
temp
);
tfree
(
command
);
taosFreeDbInfos
();
fprintf
(
stderr
,
"dump out rows: %"
PRId64
"
\n
"
,
totalDumpOutRows
);
return
0
;
_exit_failure:
fclose
(
fp
);
taos_close
(
taos
);
taos_free_result
(
result
);
tfree
(
temp
);
tfree
(
command
);
taosFreeDbInfos
();
fprintf
(
stderr
,
"dump out rows: %"
PRId64
"
\n
"
,
totalDumpOutRows
);
return
-
1
;
}
void
taosDumpCreateDbClause
(
SDbInfo
*
dbInfo
,
bool
isDumpProperty
,
FILE
*
fp
)
{
char
*
pstr
=
buffer
;
int
taosGetTableDes
(
char
*
table
,
STableDef
*
tableDes
,
TAOS
*
taosCon
)
{
TAOS_ROW
row
=
NULL
;
TAOS_RES
*
tmpResult
=
NULL
;
int
count
=
0
;
pstr
+=
sprintf
(
pstr
,
"CREATE DATABASE IF NOT EXISTS %s"
,
dbInfo
->
name
);
if
(
isDumpProperty
)
{
pstr
+=
sprintf
(
pstr
,
" REPLICA %d DAYS %d KEEP %d TABLES %d ROWS %d CACHE %d ABLOCKS %d TBLOCKS %d CTIME %d CLOG %d COMP %d"
,
dbInfo
->
replica
,
dbInfo
->
days
,
dbInfo
->
keep
,
dbInfo
->
tables
,
dbInfo
->
rows
,
dbInfo
->
cache
,
dbInfo
->
ablocks
,
dbInfo
->
tblocks
,
dbInfo
->
ctime
,
dbInfo
->
clog
,
dbInfo
->
comp
);
char
*
tempCommand
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
tempCommand
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
return
-
1
;
}
sprintf
(
tempCommand
,
"describe %s"
,
table
);
tmpResult
=
taos_query
(
taosCon
,
tempCommand
);
int32_t
code
=
taos_errno
(
tmpResult
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s
\n
"
,
tempCommand
);
free
(
tempCommand
);
taos_free_result
(
tmpResult
);
return
-
1
;
}
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
tmpResult
);
strcpy
(
tableDes
->
name
,
table
);
while
((
row
=
taos_fetch_row
(
tmpResult
))
!=
NULL
)
{
strncpy
(
tableDes
->
cols
[
count
].
field
,
(
char
*
)
row
[
TSDB_DESCRIBE_METRIC_FIELD_INDEX
],
fields
[
TSDB_DESCRIBE_METRIC_FIELD_INDEX
].
bytes
);
strncpy
(
tableDes
->
cols
[
count
].
type
,
(
char
*
)
row
[
TSDB_DESCRIBE_METRIC_TYPE_INDEX
],
fields
[
TSDB_DESCRIBE_METRIC_TYPE_INDEX
].
bytes
);
tableDes
->
cols
[
count
].
length
=
*
((
int
*
)
row
[
TSDB_DESCRIBE_METRIC_LENGTH_INDEX
]);
strncpy
(
tableDes
->
cols
[
count
].
note
,
(
char
*
)
row
[
TSDB_DESCRIBE_METRIC_NOTE_INDEX
],
fields
[
TSDB_DESCRIBE_METRIC_NOTE_INDEX
].
bytes
);
count
++
;
}
fprintf
(
fp
,
"%s
\n\n
"
,
buffer
);
taos_free_result
(
tmpResult
);
tmpResult
=
NULL
;
free
(
tempCommand
);
return
count
;
}
int
taosDumpDb
(
SDbInfo
*
dbInfo
,
SDumpArguments
*
arguments
,
FILE
*
fp
)
{
TAOS_ROW
row
;
int
fd
=
-
1
;
STableRecord
tableRecord
;
int32_t
taosDumpTable
(
char
*
table
,
char
*
metric
,
struct
arguments
*
arguments
,
FILE
*
fp
,
TAOS
*
taosCon
)
{
int
count
=
0
;
taosDumpCreateDbClause
(
dbInfo
,
arguments
->
with_property
,
fp
);
STableDef
*
tableDes
=
(
STableDef
*
)
calloc
(
1
,
sizeof
(
STableDef
)
+
sizeof
(
SColDes
)
*
TSDB_MAX_COLUMNS
);
sprintf
(
command
,
"use %s"
,
dbInfo
->
name
);
if
(
taos_errno
(
taos_query
(
taos
,
command
))
!=
0
)
{
fprintf
(
stderr
,
"invalid database %s
\n
"
,
dbInfo
->
name
);
if
(
metric
!=
NULL
&&
metric
[
0
]
!=
'\0'
)
{
// dump table schema which is created by using super table
/*
count = taosGetTableDes(metric, tableDes, taosCon);
if (count < 0) {
free(tableDes);
return -1;
}
fprintf
(
fp
,
"USE %s
\n\n
"
,
dbInfo
->
name
);
taosDumpCreateTableClause(tableDes, count, fp
);
sprintf
(
command
,
"show tables"
);
TAOS_RES
*
result
=
taos_query
(
taos
,
command
);
int32_t
code
=
taos_errno
(
result
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s, error: %s
\n
"
,
command
,
taos_errstr
(
result
));
taos_free_result
(
result
);
memset(tableDes, 0, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS);
*/
count
=
taosGetTableDes
(
table
,
tableDes
,
taosCon
);
if
(
count
<
0
)
{
free
(
tableDes
);
return
-
1
;
}
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
result
);
taosDumpCreateMTableClause
(
tableDes
,
metric
,
count
,
fp
);
fd
=
open
(
".table.tmp"
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRGRP
|
S_IXGRP
|
S_IROTH
);
if
(
fd
==
-
1
)
{
fprintf
(
stderr
,
"failed to open temp file
\n
"
);
taos_free_result
(
result
);
}
else
{
// dump table definition
count
=
taosGetTableDes
(
table
,
tableDes
,
taosCon
);
if
(
count
<
0
)
{
free
(
tableDes
);
return
-
1
;
}
while
((
row
=
taos_fetch_row
(
result
))
!=
NULL
)
{
memset
(
&
tableRecord
,
0
,
sizeof
(
STableRecord
));
strncpy
(
tableRecord
.
name
,
(
char
*
)
row
[
TSDB_SHOW_TABLES_NAME_INDEX
],
fields
[
TSDB_SHOW_TABLES_NAME_INDEX
].
bytes
);
strncpy
(
tableRecord
.
metric
,
(
char
*
)
row
[
TSDB_SHOW_TABLES_METRIC_INDEX
],
fields
[
TSDB_SHOW_TABLES_METRIC_INDEX
].
bytes
);
twrite
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
));
taosDumpCreateTableClause
(
tableDes
,
count
,
fp
);
}
taos_free_result
(
result
);
free
(
tableDes
);
(
void
)
lseek
(
fd
,
0
,
SEEK_SET
);
return
taosDumpTableData
(
fp
,
table
,
arguments
,
taosCon
);
}
STableRecord
tableInfo
;
while
(
1
)
{
memset
(
&
tableInfo
,
0
,
sizeof
(
STableRecord
));
ssize_t
ret
=
read
(
fd
,
&
tableInfo
,
sizeof
(
STableRecord
));
if
(
ret
<=
0
)
break
;
void
taosDumpCreateDbClause
(
SDbInfo
*
dbInfo
,
bool
isDumpProperty
,
FILE
*
fp
)
{
tableInfo
.
name
[
sizeof
(
tableInfo
.
name
)
-
1
]
=
0
;
tableInfo
.
metric
[
sizeof
(
tableInfo
.
metric
)
-
1
]
=
0
;
taosDumpTable
(
tableInfo
.
name
,
tableInfo
.
metric
,
arguments
,
fp
);
char
*
tmpCommand
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
tmpCommand
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
return
;
}
close
(
fd
);
(
void
)
remove
(
".table.tmp"
);
char
*
pstr
=
tmpCommand
;
return
0
;
pstr
+=
sprintf
(
pstr
,
"CREATE DATABASE IF NOT EXISTS %s"
,
dbInfo
->
name
);
if
(
isDumpProperty
)
{
pstr
+=
sprintf
(
pstr
,
" REPLICA %d DAYS %d KEEP %d TABLES %d ROWS %d CACHE %d ABLOCKS %d TBLOCKS %d CTIME %d CLOG %d COMP %d"
,
dbInfo
->
replica
,
dbInfo
->
days
,
dbInfo
->
keep
,
dbInfo
->
tables
,
dbInfo
->
rows
,
dbInfo
->
cache
,
dbInfo
->
ablocks
,
dbInfo
->
tblocks
,
dbInfo
->
ctime
,
dbInfo
->
clog
,
dbInfo
->
comp
);
}
pstr
+=
sprintf
(
pstr
,
";"
);
fprintf
(
fp
,
"%s
\n\n
"
,
tmpCommand
);
free
(
tmpCommand
);
}
void
taosDumpCreateTableClause
(
STableDef
*
tableDes
,
int
numOfCols
,
SDumpArguments
*
arguments
,
FILE
*
fp
)
{
char
*
pstr
=
NULL
;
pstr
=
buffer
;
int
counter
=
0
;
int
count_temp
=
0
;
void
*
taosDumpOutWorkThreadFp
(
void
*
arg
)
{
SThreadParaObj
*
pThread
=
(
SThreadParaObj
*
)
arg
;
STableRecord
tableRecord
;
int
fd
;
pstr
+=
sprintf
(
buffer
,
"CREATE TABLE IF NOT EXISTS %s"
,
tableDes
->
name
);
char
tmpFileName
[
TSDB_FILENAME_LEN
*
4
]
=
{
0
};
sprintf
(
tmpFileName
,
".tables.tmp.%d"
,
pThread
->
threadIndex
);
fd
=
open
(
tmpFileName
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRGRP
|
S_IXGRP
|
S_IROTH
);
if
(
fd
==
-
1
)
{
fprintf
(
stderr
,
"taosDumpTableFp() failed to open temp file: %s
\n
"
,
tmpFileName
);
return
NULL
;
}
for
(;
counter
<
numOfCols
;
counter
++
)
{
if
(
tableDes
->
cols
[
counter
].
note
[
0
]
!=
'\0'
)
break
;
FILE
*
fp
=
NULL
;
memset
(
tmpFileName
,
0
,
TSDB_FILENAME_LEN
+
128
)
;
if
(
counter
=
=
0
)
{
pstr
+=
sprintf
(
pstr
,
" (%s %s"
,
tableDes
->
cols
[
counter
].
field
,
tableDes
->
cols
[
counter
].
type
);
if
(
tsArguments
.
outpath
[
0
]
!
=
0
)
{
sprintf
(
tmpFileName
,
"%s/%s.tables.%d.sql"
,
tsArguments
.
outpath
,
pThread
->
dbName
,
pThread
->
threadIndex
);
}
else
{
pstr
+=
sprintf
(
pstr
,
", %s %s"
,
tableDes
->
cols
[
counter
].
field
,
tableDes
->
cols
[
counter
].
type
);
sprintf
(
tmpFileName
,
"%s.tables.%d.sql"
,
pThread
->
dbName
,
pThread
->
threadIndex
);
}
if
(
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"binary"
)
==
0
||
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"nchar"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
"(%d)"
,
tableDes
->
cols
[
counter
].
length
);
}
fp
=
fopen
(
tmpFileName
,
"w"
);
if
(
fp
==
NULL
)
{
fprintf
(
stderr
,
"failed to open file %s
\n
"
,
tmpFileName
);
return
NULL
;
}
count_temp
=
counter
;
memset
(
tmpFileName
,
0
,
TSDB_FILENAME_LEN
);
sprintf
(
tmpFileName
,
"use %s"
,
pThread
->
dbName
);
for
(;
counter
<
numOfCols
;
counter
++
)
{
if
(
counter
==
count_temp
)
{
pstr
+=
sprintf
(
pstr
,
") TAGS (%s %s"
,
tableDes
->
cols
[
counter
].
field
,
tableDes
->
cols
[
counter
].
type
);
}
else
{
pstr
+=
sprintf
(
pstr
,
", %s %s"
,
tableDes
->
cols
[
counter
].
field
,
tableDes
->
cols
[
counter
].
type
);
TAOS_RES
*
tmpResult
=
taos_query
(
pThread
->
taosCon
,
tmpFileName
);
int32_t
code
=
taos_errno
(
tmpResult
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"invalid database %s
\n
"
,
pThread
->
dbName
);
taos_free_result
(
tmpResult
);
return
NULL
;
}
if
(
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"binary"
)
==
0
||
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"nchar"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
"(%d)"
,
tableDes
->
cols
[
counter
].
length
);
}
fprintf
(
fp
,
"USE %s
\n\n
"
,
pThread
->
dbName
);
while
(
read
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
))
>
0
)
{
taosDumpTable
(
tableRecord
.
name
,
tableRecord
.
metric
,
&
tsArguments
,
fp
,
pThread
->
taosCon
);
}
pstr
+=
sprintf
(
pstr
,
")"
);
taos_free_result
(
tmpResult
);
tclose
(
fd
);
fclose
(
fp
);
fprintf
(
fp
,
"%s
\n\n
"
,
buffer
)
;
return
NULL
;
}
void
taosDumpCreateMTableClause
(
STableDef
*
tableDes
,
char
*
metric
,
int
numOfCols
,
SDumpArguments
*
arguments
,
FILE
*
fp
)
{
char
*
pstr
=
NULL
;
pstr
=
buffer
;
int
counter
=
0
;
int
count_temp
=
0
;
pstr
+=
sprintf
(
buffer
,
"CREATE TABLE IF NOT EXISTS %s USING %s TAGS ("
,
tableDes
->
name
,
metric
);
static
void
taosStartDumpOutWorkThreads
(
struct
arguments
*
args
,
int32_t
numOfThread
,
char
*
dbName
)
{
pthread_attr_t
thattr
;
SThreadParaObj
*
threadObj
=
(
SThreadParaObj
*
)
calloc
(
numOfThread
,
sizeof
(
SThreadParaObj
));
for
(
int
t
=
0
;
t
<
numOfThread
;
++
t
)
{
SThreadParaObj
*
pThread
=
threadObj
+
t
;
pThread
->
threadIndex
=
t
;
pThread
->
totalThreads
=
numOfThread
;
strcpy
(
pThread
->
dbName
,
dbName
);
pThread
->
taosCon
=
taos_connect
(
args
->
host
,
args
->
user
,
args
->
password
,
NULL
,
args
->
port
);
for
(;
counter
<
numOfCols
;
counter
++
)
{
if
(
tableDes
->
cols
[
counter
].
note
[
0
]
!=
'\0'
)
break
;
if
(
pThread
->
taosCon
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: thread:%d failed connect to TDengine, error:%s
\n
"
,
pThread
->
threadIndex
,
taos_errstr
(
pThread
->
taosCon
));
exit
(
0
);
}
assert
(
counter
<
numOfCols
);
count_temp
=
counter
;
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
)
;
for
(;
counter
<
numOfCols
;
counter
++
)
{
TAOS_ROW
row
=
NULL
;
if
(
pthread_create
(
&
(
pThread
->
threadID
),
&
thattr
,
taosDumpOutWorkThreadFp
,
(
void
*
)
pThread
)
!=
0
)
{
fprintf
(
stderr
,
"ERROR: thread:%d failed to start
\n
"
,
pThread
->
threadIndex
);
exit
(
0
);
}
}
sprintf
(
command
,
"select %s from %s limit 1"
,
tableDes
->
cols
[
counter
].
field
,
tableDes
->
name
);
for
(
int32_t
t
=
0
;
t
<
numOfThread
;
++
t
)
{
pthread_join
(
threadObj
[
t
].
threadID
,
NULL
);
}
TAOS_RES
*
result
=
taos_query
(
taos
,
command
);
int32_t
code
=
taos_errno
(
result
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s, error: %s
\n
"
,
command
,
taos_errstr
(
result
));
return
;
for
(
int32_t
t
=
0
;
t
<
numOfThread
;
++
t
)
{
taos_close
(
threadObj
[
t
].
taosCon
);
}
free
(
threadObj
);
}
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
result
);
row
=
taos_fetch_row
(
result
);
switch
(
fields
[
0
].
type
)
{
case
TSDB_DATA_TYPE_BOOL
:
sprintf
(
tableDes
->
cols
[
counter
].
note
,
"%d"
,
((((
int
)(
*
((
char
*
)
row
[
0
])))
==
1
)
?
1
:
0
));
break
;
case
TSDB_DATA_TYPE_TINYINT
:
sprintf
(
tableDes
->
cols
[
counter
].
note
,
"%d"
,
(
int
)(
*
((
char
*
)
row
[
0
])));
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
sprintf
(
tableDes
->
cols
[
counter
].
note
,
"%d"
,
(
int
)(
*
((
short
*
)
row
[
0
])));
break
;
case
TSDB_DATA_TYPE_INT
:
sprintf
(
tableDes
->
cols
[
counter
].
note
,
"%d"
,
*
((
int
*
)
row
[
0
]));
break
;
case
TSDB_DATA_TYPE_BIGINT
:
sprintf
(
tableDes
->
cols
[
counter
].
note
,
"%"
PRId64
""
,
*
((
int64_t
*
)
row
[
0
]));
break
;
case
TSDB_DATA_TYPE_FLOAT
:
sprintf
(
tableDes
->
cols
[
counter
].
note
,
"%f"
,
GET_FLOAT_VAL
(
row
[
0
]));
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
sprintf
(
tableDes
->
cols
[
counter
].
note
,
"%f"
,
GET_DOUBLE_VAL
(
row
[
0
]));
break
;
case
TSDB_DATA_TYPE_TIMESTAMP
:
sprintf
(
tableDes
->
cols
[
counter
].
note
,
"%"
PRId64
""
,
*
(
int64_t
*
)
row
[
0
]);
break
;
case
TSDB_DATA_TYPE_BINARY
:
case
TSDB_DATA_TYPE_NCHAR
:
default:
strncpy
(
tableDes
->
cols
[
counter
].
note
,
(
char
*
)
row
[
0
],
fields
[
0
].
bytes
);
break
;
}
taos_free_result
(
result
);
int32_t
taosDumpStable
(
char
*
table
,
FILE
*
fp
,
TAOS
*
taosCon
)
{
int
count
=
0
;
if
(
counter
!=
count_temp
)
{
if
(
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"binary"
)
==
0
||
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"nchar"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
",
\'
%s
\'
"
,
tableDes
->
cols
[
counter
].
note
);
}
else
{
pstr
+=
sprintf
(
pstr
,
", %s"
,
tableDes
->
cols
[
counter
].
note
);
}
}
else
{
if
(
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"binary"
)
==
0
||
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"nchar"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
"
\'
%s
\'
"
,
tableDes
->
cols
[
counter
].
note
);
}
else
{
pstr
+=
sprintf
(
pstr
,
"%s"
,
tableDes
->
cols
[
counter
].
note
);
}
/* pstr += sprintf(pstr, "%s", tableDes->cols[counter].note); */
STableDef
*
tableDes
=
(
STableDef
*
)
calloc
(
1
,
sizeof
(
STableDef
)
+
sizeof
(
SColDes
)
*
TSDB_MAX_COLUMNS
);
if
(
NULL
==
tableDes
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
exit
(
-
1
);
}
/* if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || strcasecmp(tableDes->cols[counter].type, "nchar")
* == 0) { */
/* pstr += sprintf(pstr, "(%d)", tableDes->cols[counter].length); */
/* } */
count
=
taosGetTableDes
(
table
,
tableDes
,
taosCon
);
if
(
count
<
0
)
{
free
(
tableDes
);
fprintf
(
stderr
,
"failed to get stable schema
\n
"
);
exit
(
-
1
);
}
pstr
+=
sprintf
(
pstr
,
")"
);
taosDumpCreateTableClause
(
tableDes
,
count
,
fp
);
fprintf
(
fp
,
"%s
\n\n
"
,
buffer
);
free
(
tableDes
);
return
0
;
}
int
taosGetTableDes
(
char
*
table
,
STableDef
*
tableDes
)
{
TAOS_ROW
row
=
NULL
;
int
count
=
0
;
sprintf
(
command
,
"describe %s"
,
table
);
int32_t
taosDumpCreateSuperTableClause
(
TAOS
*
taosCon
,
char
*
dbName
,
FILE
*
fp
)
{
TAOS_ROW
row
;
int
fd
=
-
1
;
STableRecord
tableRecord
;
TAOS_RES
*
result
=
taos_query
(
taos
,
command
);
int32_t
code
=
taos_errno
(
result
);
char
*
tmpCommand
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
tmpCommand
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
exit
(
-
1
);
}
sprintf
(
tmpCommand
,
"use %s"
,
dbName
);
TAOS_RES
*
tmpResult
=
taos_query
(
taosCon
,
tmpCommand
);
int32_t
code
=
taos_errno
(
tmpResult
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s, error: %s
\n
"
,
command
,
taos_errstr
(
result
));
taos_free_result
(
result
);
return
-
1
;
fprintf
(
stderr
,
"invalid database %s, error: %s
\n
"
,
dbName
,
taos_errstr
(
taosCon
));
free
(
tmpCommand
);
taos_free_result
(
tmpResult
);
exit
(
-
1
);
}
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
r
esult
);
taos_free_result
(
tmpR
esult
);
tstrncpy
(
tableDes
->
name
,
table
,
TSDB_COL_NAME_LEN
);
sprintf
(
tmpCommand
,
"show stables"
);
while
((
row
=
taos_fetch_row
(
result
))
!=
NULL
)
{
strncpy
(
tableDes
->
cols
[
count
].
field
,
(
char
*
)
row
[
TSDB_DESCRIBE_METRIC_FIELD_INDEX
],
fields
[
TSDB_DESCRIBE_METRIC_FIELD_INDEX
].
bytes
);
strncpy
(
tableDes
->
cols
[
count
].
type
,
(
char
*
)
row
[
TSDB_DESCRIBE_METRIC_TYPE_INDEX
],
fields
[
TSDB_DESCRIBE_METRIC_TYPE_INDEX
].
bytes
);
tableDes
->
cols
[
count
].
length
=
*
((
int
*
)
row
[
TSDB_DESCRIBE_METRIC_LENGTH_INDEX
]);
strncpy
(
tableDes
->
cols
[
count
].
note
,
(
char
*
)
row
[
TSDB_DESCRIBE_METRIC_NOTE_INDEX
],
fields
[
TSDB_DESCRIBE_METRIC_NOTE_INDEX
].
bytes
);
tmpResult
=
taos_query
(
taosCon
,
tmpCommand
);
code
=
taos_errno
(
tmpResult
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s, error: %s
\n
"
,
tmpCommand
,
taos_errstr
(
taosCon
));
free
(
tmpCommand
);
taos_free_result
(
tmpResult
);
exit
(
-
1
);
}
taos_free_result
(
tmpResult
);
count
++
;
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
tmpResult
);
char
tmpFileName
[
TSDB_FILENAME_LEN
+
1
];
memset
(
tmpFileName
,
0
,
TSDB_FILENAME_LEN
);
sprintf
(
tmpFileName
,
".stables.tmp"
);
fd
=
open
(
tmpFileName
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRGRP
|
S_IXGRP
|
S_IROTH
);
if
(
fd
==
-
1
)
{
fprintf
(
stderr
,
"failed to open temp file: %s
\n
"
,
tmpFileName
);
taos_free_result
(
tmpResult
);
free
(
tmpCommand
);
remove
(
".stables.tmp"
);
exit
(
-
1
);
}
taos_free_result
(
result
);
result
=
NULL
;
while
((
row
=
taos_fetch_row
(
tmpResult
))
!=
NULL
)
{
memset
(
&
tableRecord
,
0
,
sizeof
(
STableRecord
));
strncpy
(
tableRecord
.
name
,
(
char
*
)
row
[
TSDB_SHOW_TABLES_NAME_INDEX
],
fields
[
TSDB_SHOW_TABLES_NAME_INDEX
].
bytes
);
twrite
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
));
}
return
count
;
taos_free_result
(
tmpResult
);
lseek
(
fd
,
0
,
SEEK_SET
);
while
(
read
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
))
>
0
)
{
(
void
)
taosDumpStable
(
tableRecord
.
name
,
fp
,
taosCon
);
}
tclose
(
fd
);
remove
(
".stables.tmp"
);
free
(
tmpCommand
);
return
0
;
}
int32_t
taosDumpTable
(
char
*
table
,
char
*
metric
,
SDumpArguments
*
arguments
,
FILE
*
fp
)
{
int
count
=
0
;
STableDef
*
tableDes
=
(
STableDef
*
)
calloc
(
1
,
sizeof
(
STableDef
)
+
sizeof
(
SColDes
)
*
TSDB_MAX_COLUMNS
);
int
taosDumpDb
(
SDbInfo
*
dbInfo
,
struct
arguments
*
arguments
,
FILE
*
fp
,
TAOS
*
taosCon
)
{
TAOS_ROW
row
;
int
fd
=
-
1
;
STableRecord
tableRecord
;
if
(
metric
!=
NULL
&&
metric
[
0
]
!=
'\0'
)
{
// dump metric definition
count
=
taosGetTableDes
(
metric
,
tableDes
);
taosDumpCreateDbClause
(
dbInfo
,
arguments
->
with_property
,
fp
);
if
(
count
<
0
)
{
free
(
tableDes
);
char
*
tmpCommand
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
tmpCommand
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
return
-
1
;
}
taosDumpCreateTableClause
(
tableDes
,
count
,
arguments
,
fp
);
sprintf
(
tmpCommand
,
"use %s"
,
dbInfo
->
name
);
memset
(
tableDes
,
0
,
sizeof
(
STableDef
)
+
sizeof
(
SColDes
)
*
TSDB_MAX_COLUMNS
);
TAOS_RES
*
tmpResult
=
taos_query
(
taosCon
,
tmpCommand
);
int32_t
code
=
taos_errno
(
tmpResult
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"invalid database %s
\n
"
,
dbInfo
->
name
);
free
(
tmpCommand
);
taos_free_result
(
tmpResult
);
return
-
1
;
}
taos_free_result
(
tmpResult
);
fprintf
(
fp
,
"USE %s
\n\n
"
,
dbInfo
->
name
);
count
=
taosGetTableDes
(
table
,
tableDes
);
(
void
)
taosDumpCreateSuperTableClause
(
taosCon
,
dbInfo
->
name
,
fp
);
if
(
count
<
0
)
{
free
(
tableDes
);
sprintf
(
tmpCommand
,
"show tables"
);
tmpResult
=
taos_query
(
taosCon
,
tmpCommand
);
code
=
taos_errno
(
tmpResult
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s
\n
"
,
tmpCommand
);
free
(
tmpCommand
);
taos_free_result
(
tmpResult
);
return
-
1
;
}
taosDumpCreateMTableClause
(
tableDes
,
metric
,
count
,
arguments
,
fp
);
}
else
{
// dump table definition
count
=
taosGetTableDes
(
table
,
tableDes
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
tmpResult
);
if
(
count
<
0
)
{
free
(
tableDes
);
int32_t
numOfTable
=
0
;
int32_t
numOfThread
=
0
;
char
tmpFileName
[
TSDB_FILENAME_LEN
+
1
];
while
((
row
=
taos_fetch_row
(
tmpResult
))
!=
NULL
)
{
if
(
0
==
numOfTable
)
{
memset
(
tmpFileName
,
0
,
TSDB_FILENAME_LEN
);
sprintf
(
tmpFileName
,
".tables.tmp.%d"
,
numOfThread
);
fd
=
open
(
tmpFileName
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRGRP
|
S_IXGRP
|
S_IROTH
);
if
(
fd
==
-
1
)
{
fprintf
(
stderr
,
"failed to open temp file: %s
\n
"
,
tmpFileName
);
taos_free_result
(
tmpResult
);
for
(
int32_t
loopCnt
=
0
;
loopCnt
<
numOfThread
;
loopCnt
++
)
{
sprintf
(
tmpFileName
,
".tables.tmp.%d"
,
loopCnt
);
remove
(
tmpFileName
);
}
free
(
tmpCommand
);
return
-
1
;
}
taosDumpCreateTableClause
(
tableDes
,
count
,
arguments
,
fp
)
;
numOfThread
++
;
}
free
(
tableDes
);
memset
(
&
tableRecord
,
0
,
sizeof
(
STableRecord
));
strncpy
(
tableRecord
.
name
,
(
char
*
)
row
[
TSDB_SHOW_TABLES_NAME_INDEX
],
fields
[
TSDB_SHOW_TABLES_NAME_INDEX
].
bytes
);
strncpy
(
tableRecord
.
metric
,
(
char
*
)
row
[
TSDB_SHOW_TABLES_METRIC_INDEX
],
fields
[
TSDB_SHOW_TABLES_METRIC_INDEX
].
bytes
);
twrite
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
));
return
taosDumpTableData
(
fp
,
table
,
arguments
);
numOfTable
++
;
if
(
numOfTable
>=
arguments
->
table_batch
)
{
numOfTable
=
0
;
tclose
(
fd
);
fd
=
-
1
;
}
}
tclose
(
fd
);
fd
=
-
1
;
taos_free_result
(
tmpResult
);
// start multi threads to dumpout
taosStartDumpOutWorkThreads
(
arguments
,
numOfThread
,
dbInfo
->
name
);
for
(
int
loopCnt
=
0
;
loopCnt
<
numOfThread
;
loopCnt
++
)
{
sprintf
(
tmpFileName
,
".tables.tmp.%d"
,
loopCnt
);
remove
(
tmpFileName
);
}
free
(
tmpCommand
);
return
0
;
}
int32_t
taosDumpMetric
(
char
*
metric
,
SDumpArguments
*
arguments
,
FILE
*
fp
)
{
TAOS_ROW
row
=
NULL
;
int
fd
=
-
1
;
STableRecord
tableRecord
;
void
taosDumpCreateTableClause
(
STableDef
*
tableDes
,
int
numOfCols
,
FILE
*
fp
)
{
int
counter
=
0
;
int
count_temp
=
0
;
char
*
tmpBuf
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
tmpBuf
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
return
;
}
//tstrncpy(tableRecord.metric, metric, TSDB_TABLE_NAME_LEN)
;
char
*
pstr
=
tmpBuf
;
sprintf
(
command
,
"select tbname from %s"
,
metric
);
TAOS_RES
*
result
=
taos_query
(
taos
,
command
);
int32_t
code
=
taos_errno
(
result
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s, error: %s
\n
"
,
command
,
taos_errstr
(
result
));
taos_free_result
(
result
);
return
-
1
;
pstr
+=
sprintf
(
tmpBuf
,
"CREATE TABLE IF NOT EXISTS %s"
,
tableDes
->
name
);
for
(;
counter
<
numOfCols
;
counter
++
)
{
if
(
tableDes
->
cols
[
counter
].
note
[
0
]
!=
'\0'
)
break
;
if
(
counter
==
0
)
{
pstr
+=
sprintf
(
pstr
,
" (%s %s"
,
tableDes
->
cols
[
counter
].
field
,
tableDes
->
cols
[
counter
].
type
);
}
else
{
pstr
+=
sprintf
(
pstr
,
", %s %s"
,
tableDes
->
cols
[
counter
].
field
,
tableDes
->
cols
[
counter
].
type
);
}
fd
=
open
(
".table.tmp"
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRGRP
|
S_IXGRP
|
S_IROTH
);
if
(
fd
<
0
)
{
fprintf
(
stderr
,
"failed to open temp file"
);
return
-
1
;
if
(
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"binary"
)
==
0
||
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"nchar"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
"(%d)"
,
tableDes
->
cols
[
counter
].
length
);
}
}
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
result
)
;
count_temp
=
counter
;
while
((
row
=
taos_fetch_row
(
result
))
!=
NULL
)
{
memset
(
&
tableRecord
,
0
,
sizeof
(
STableRecord
));
tstrncpy
(
tableRecord
.
name
,
(
char
*
)
row
[
0
],
fields
[
0
].
bytes
);
tstrncpy
(
tableRecord
.
metric
,
metric
,
TSDB_TABLE_NAME_LEN
);
twrite
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
)
);
for
(;
counter
<
numOfCols
;
counter
++
)
{
if
(
counter
==
count_temp
)
{
pstr
+=
sprintf
(
pstr
,
") TAGS (%s %s"
,
tableDes
->
cols
[
counter
].
field
,
tableDes
->
cols
[
counter
].
type
);
}
else
{
pstr
+=
sprintf
(
pstr
,
", %s %s"
,
tableDes
->
cols
[
counter
].
field
,
tableDes
->
cols
[
counter
].
type
);
}
taos_free_result
(
result
);
result
=
NULL
;
if
(
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"binary"
)
==
0
||
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"nchar"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
"(%d)"
,
tableDes
->
cols
[
counter
].
length
);
}
}
(
void
)
lseek
(
fd
,
0
,
SEEK_SET
);
pstr
+=
sprintf
(
pstr
,
");"
);
//STableRecord tableInfo;
char
tableName
[
TSDB_TABLE_NAME_LEN
]
;
char
metricName
[
TSDB_TABLE_NAME_LEN
];
ssize_t
ret
;
while
(
1
)
{
//memset(&tableInfo, 0, sizeof(STableRecord));
memset
(
tableName
,
0
,
TSDB_TABLE_NAME_LEN
);
memset
(
metricName
,
0
,
TSDB_TABLE_NAME_LEN
);
//ssize_t ret = read(fd, &tableInfo, sizeof(STableRecord));
//if (ret <= 0) break;
ret
=
read
(
fd
,
tableName
,
TSDB_TABLE_NAME_LEN
);
if
(
ret
<=
0
)
break
;
fprintf
(
fp
,
"%s
\n
"
,
tmpBuf
);
free
(
tmpBuf
);
}
ret
=
read
(
fd
,
metricName
,
TSDB_TABLE_NAME_LEN
);
if
(
ret
<=
0
)
break
;
void
taosDumpCreateMTableClause
(
STableDef
*
tableDes
,
char
*
metric
,
int
numOfCols
,
FILE
*
fp
)
{
int
counter
=
0
;
int
count_temp
=
0
;
//tableInfo.name[sizeof(tableInfo.name) - 1] = 0;
//tableInfo.metric[sizeof(tableInfo.metric) - 1] = 0;
//taosDumpTable(tableInfo.name, tableInfo.metric, arguments, fp);
//tstrncpy(tableName, tableInfo.name, TSDB_TABLE_NAME_LEN-1);
//tstrncpy(metricName, tableInfo.metric, TSDB_TABLE_NAME_LEN-1);
taosDumpTable
(
tableName
,
metricName
,
arguments
,
fp
);
char
*
tmpBuf
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
tmpBuf
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
return
;
}
c
lose
(
fd
)
;
(
void
)
remove
(
".table.tmp"
)
;
c
har
*
pstr
=
NULL
;
pstr
=
tmpBuf
;
return
0
;
pstr
+=
sprintf
(
tmpBuf
,
"CREATE TABLE IF NOT EXISTS %s USING %s TAGS ("
,
tableDes
->
name
,
metric
);
for
(;
counter
<
numOfCols
;
counter
++
)
{
if
(
tableDes
->
cols
[
counter
].
note
[
0
]
!=
'\0'
)
break
;
}
assert
(
counter
<
numOfCols
);
count_temp
=
counter
;
for
(;
counter
<
numOfCols
;
counter
++
)
{
if
(
counter
!=
count_temp
)
{
if
(
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"binary"
)
==
0
||
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"nchar"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
",
\'
%s
\'
"
,
tableDes
->
cols
[
counter
].
note
);
}
else
{
pstr
+=
sprintf
(
pstr
,
", %s"
,
tableDes
->
cols
[
counter
].
note
);
}
}
else
{
if
(
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"binary"
)
==
0
||
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"nchar"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
"
\'
%s
\'
"
,
tableDes
->
cols
[
counter
].
note
);
}
else
{
pstr
+=
sprintf
(
pstr
,
"%s"
,
tableDes
->
cols
[
counter
].
note
);
}
/* pstr += sprintf(pstr, "%s", tableDes->cols[counter].note); */
}
/* if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || strcasecmp(tableDes->cols[counter].type, "nchar")
* == 0) { */
/* pstr += sprintf(pstr, "(%d)", tableDes->cols[counter].length); */
/* } */
}
pstr
+=
sprintf
(
pstr
,
");"
);
fprintf
(
fp
,
"%s
\n
"
,
tmpBuf
);
free
(
tmpBuf
);
}
int
taosDumpTableData
(
FILE
*
fp
,
char
*
tbname
,
SDumpArguments
*
arguments
)
{
int
taosDumpTableData
(
FILE
*
fp
,
char
*
tbname
,
struct
arguments
*
arguments
,
TAOS
*
taosCon
)
{
/* char temp[MAX_COMMAND_SIZE] = "\0"; */
int64_t
totalRows
=
0
;
int
count
=
0
;
char
*
pstr
=
NULL
;
TAOS_ROW
row
=
NULL
;
int
numFields
=
0
;
char
*
tbuf
=
NULL
;
if
(
arguments
->
schemaonly
)
return
0
;
char
*
tmpCommand
=
(
char
*
)
calloc
(
1
,
COMMAND_SIZE
);
if
(
tmpCommand
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
return
-
1
;
}
sprintf
(
command
,
"select * from %s where _c0 >= %"
PRId64
" and _c0 <= %"
PRId64
" order by _c0 asc"
,
tbname
,
arguments
->
start_time
,
char
*
tmpBuffer
=
(
char
*
)
calloc
(
1
,
COMMAND_SIZE
);
if
(
tmpBuffer
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
free
(
tmpCommand
);
return
-
1
;
}
pstr
=
tmpBuffer
;
if
(
arguments
->
schemaonly
)
{
free
(
tmpCommand
);
free
(
tmpBuffer
);
return
0
;
}
sprintf
(
tmpCommand
,
"select * from %s where _c0 >= %"
PRId64
" and _c0 <= %"
PRId64
" order by _c0 asc"
,
tbname
,
arguments
->
start_time
,
arguments
->
end_time
);
TAOS_RES
*
result
=
taos_query
(
taos
,
command
);
int32_t
code
=
taos_errno
(
r
esult
);
TAOS_RES
*
tmpResult
=
taos_query
(
taosCon
,
tmpCommand
);
int32_t
code
=
taos_errno
(
tmpR
esult
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s, reason: %s
\n
"
,
command
,
taos_errstr
(
result
));
taos_free_result
(
result
);
fprintf
(
stderr
,
"failed to run command %s, reason: %s
\n
"
,
tmpCommand
,
taos_errstr
(
taosCon
));
free
(
tmpCommand
);
free
(
tmpBuffer
);
taos_free_result
(
tmpResult
);
return
-
1
;
}
numFields
=
taos_field_count
(
result
);
numFields
=
taos_field_count
(
taosCon
);
assert
(
numFields
>
0
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
r
esult
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
tmpR
esult
);
tbuf
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
tbuf
==
NULL
)
{
fprintf
(
stderr
,
"No enough memory
\n
"
);
free
(
tmpCommand
);
free
(
tmpBuffer
);
taos_free_result
(
tmpResult
);
return
-
1
;
}
char
sqlStr
[
8
]
=
"
\0
"
;
if
(
arguments
->
mysqlFlag
)
{
sprintf
(
sqlStr
,
"INSERT"
);
}
else
{
sprintf
(
sqlStr
,
"IMPORT"
);
}
int
rowFlag
=
0
;
count
=
0
;
while
((
row
=
taos_fetch_row
(
r
esult
))
!=
NULL
)
{
pstr
=
b
uffer
;
while
((
row
=
taos_fetch_row
(
tmpR
esult
))
!=
NULL
)
{
pstr
=
tmpB
uffer
;
if
(
count
==
0
)
{
pstr
+=
sprintf
(
pstr
,
"
INSERT INTO %s VALUES ("
,
tbname
);
pstr
+=
sprintf
(
pstr
,
"
%s INTO %s VALUES ("
,
sqlStr
,
tbname
);
}
else
{
if
(
arguments
->
mysqlFlag
)
{
if
(
0
==
rowFlag
)
{
pstr
+=
sprintf
(
pstr
,
"("
);
rowFlag
++
;
}
else
{
pstr
+=
sprintf
(
pstr
,
", ("
);
}
}
else
{
pstr
+=
sprintf
(
pstr
,
"("
);
}
}
for
(
int
col
=
0
;
col
<
numFields
;
col
++
)
{
...
...
@@ -1003,7 +1424,7 @@ int taosDumpTableData(FILE *fp, char *tbname, SDumpArguments *arguments) {
pstr
+=
sprintf
(
pstr
,
"%d"
,
*
((
int
*
)
row
[
col
]));
break
;
case
TSDB_DATA_TYPE_BIGINT
:
pstr
+=
sprintf
(
pstr
,
"%"
PRId64
,
*
((
int64_t
*
)
row
[
col
]));
pstr
+=
sprintf
(
pstr
,
"%"
PRId64
""
,
*
((
int64_t
*
)
row
[
col
]));
break
;
case
TSDB_DATA_TYPE_FLOAT
:
pstr
+=
sprintf
(
pstr
,
"%f"
,
GET_FLOAT_VAL
(
row
[
col
]));
...
...
@@ -1022,146 +1443,407 @@ int taosDumpTableData(FILE *fp, char *tbname, SDumpArguments *arguments) {
pstr
+=
sprintf
(
pstr
,
"
\'
%s
\'
"
,
tbuf
);
break
;
case
TSDB_DATA_TYPE_TIMESTAMP
:
pstr
+=
sprintf
(
pstr
,
"%"
PRId64
,
*
(
int64_t
*
)
row
[
col
]);
if
(
!
arguments
->
mysqlFlag
)
{
pstr
+=
sprintf
(
pstr
,
"%"
PRId64
""
,
*
(
int64_t
*
)
row
[
col
]);
}
else
{
char
buf
[
64
]
=
"
\0
"
;
int64_t
ts
=
*
((
int64_t
*
)
row
[
col
]);
time_t
tt
=
(
time_t
)(
ts
/
1000
);
struct
tm
*
ptm
=
localtime
(
&
tt
);
strftime
(
buf
,
64
,
"%y-%m-%d %H:%M:%S"
,
ptm
);
pstr
+=
sprintf
(
pstr
,
"
\'
%s.%03d
\'
"
,
buf
,
(
int
)(
ts
%
1000
));
}
break
;
default:
break
;
}
}
sprintf
(
pstr
,
")"
);
count
++
;
fprintf
(
fp
,
"%s"
,
buffer
);
pstr
+=
sprintf
(
pstr
,
") "
);
totalRows
++
;
count
++
;
fprintf
(
fp
,
"%s"
,
tmpBuffer
);
if
(
count
>=
arguments
->
data_batch
)
{
fprintf
(
fp
,
";
\n
"
);
count
=
0
;
}
//else {
//fprintf(fp, "\\\n");
//}
}
atomic_add_fetch_64
(
&
totalDumpOutRows
,
totalRows
);
fprintf
(
fp
,
"
\n
"
);
if
(
tbuf
)
{
free
(
tbuf
);
}
taos_free_result
(
tmpResult
);
tmpResult
=
NULL
;
free
(
tmpCommand
);
free
(
tmpBuffer
);
return
0
;
}
int
taosCheckParam
(
struct
arguments
*
arguments
)
{
if
(
arguments
->
all_databases
&&
arguments
->
databases
)
{
fprintf
(
stderr
,
"conflict option --all-databases and --databases
\n
"
);
return
-
1
;
}
if
(
arguments
->
start_time
>
arguments
->
end_time
)
{
fprintf
(
stderr
,
"start time is larger than end time
\n
"
);
return
-
1
;
}
if
(
arguments
->
arg_list_len
==
0
)
{
if
((
!
arguments
->
all_databases
)
&&
(
!
arguments
->
isDumpIn
))
{
fprintf
(
stderr
,
"taosdump requires parameters
\n
"
);
return
-
1
;
}
}
/*
if (arguments->isDumpIn && (strcmp(arguments->outpath, DEFAULT_DUMP_FILE) != 0)) {
fprintf(stderr, "duplicate parameter input and output file path\n");
return -1;
}
*/
if
(
!
arguments
->
isDumpIn
&&
arguments
->
encode
!=
NULL
)
{
fprintf
(
stderr
,
"invalid option in dump out
\n
"
);
return
-
1
;
}
if
(
arguments
->
table_batch
<=
0
)
{
fprintf
(
stderr
,
"invalid option in dump out
\n
"
);
return
-
1
;
}
return
0
;
}
bool
isEmptyCommand
(
char
*
cmd
)
{
char
*
pchar
=
cmd
;
while
(
*
pchar
!=
'\0'
)
{
if
(
*
pchar
!=
' '
)
return
false
;
pchar
++
;
}
return
true
;
}
void
taosReplaceCtrlChar
(
char
*
str
)
{
_Bool
ctrlOn
=
false
;
char
*
pstr
=
NULL
;
for
(
pstr
=
str
;
*
str
!=
'\0'
;
++
str
)
{
if
(
ctrlOn
)
{
switch
(
*
str
)
{
case
'n'
:
*
pstr
=
'\n'
;
pstr
++
;
break
;
case
'r'
:
*
pstr
=
'\r'
;
pstr
++
;
break
;
case
't'
:
*
pstr
=
'\t'
;
pstr
++
;
break
;
case
'\\'
:
*
pstr
=
'\\'
;
pstr
++
;
break
;
case
'\''
:
*
pstr
=
'\''
;
pstr
++
;
break
;
default:
break
;
}
ctrlOn
=
false
;
}
else
{
if
(
*
str
==
'\\'
)
{
ctrlOn
=
true
;
}
else
{
*
pstr
=
*
str
;
pstr
++
;
}
}
}
*
pstr
=
'\0'
;
}
char
*
ascii_literal_list
[]
=
{
"
\\
x00"
,
"
\\
x01"
,
"
\\
x02"
,
"
\\
x03"
,
"
\\
x04"
,
"
\\
x05"
,
"
\\
x06"
,
"
\\
x07"
,
"
\\
x08"
,
"
\\
t"
,
"
\\
n"
,
"
\\
x0b"
,
"
\\
x0c"
,
"
\\
r"
,
"
\\
x0e"
,
"
\\
x0f"
,
"
\\
x10"
,
"
\\
x11"
,
"
\\
x12"
,
"
\\
x13"
,
"
\\
x14"
,
"
\\
x15"
,
"
\\
x16"
,
"
\\
x17"
,
"
\\
x18"
,
"
\\
x19"
,
"
\\
x1a"
,
"
\\
x1b"
,
"
\\
x1c"
,
"
\\
x1d"
,
"
\\
x1e"
,
"
\\
x1f"
,
" "
,
"!"
,
"
\\\"
"
,
"#"
,
"$"
,
"%"
,
"&"
,
"
\\
'"
,
"("
,
")"
,
"*"
,
"+"
,
","
,
"-"
,
"."
,
"/"
,
"0"
,
"1"
,
"2"
,
"3"
,
"4"
,
"5"
,
"6"
,
"7"
,
"8"
,
"9"
,
":"
,
";"
,
"<"
,
"="
,
">"
,
"?"
,
"@"
,
"A"
,
"B"
,
"C"
,
"D"
,
"E"
,
"F"
,
"G"
,
"H"
,
"I"
,
"J"
,
"K"
,
"L"
,
"M"
,
"N"
,
"O"
,
"P"
,
"Q"
,
"R"
,
"S"
,
"T"
,
"U"
,
"V"
,
"W"
,
"X"
,
"Y"
,
"Z"
,
"["
,
"
\\\\
"
,
"]"
,
"^"
,
"_"
,
"`"
,
"a"
,
"b"
,
"c"
,
"d"
,
"e"
,
"f"
,
"g"
,
"h"
,
"i"
,
"j"
,
"k"
,
"l"
,
"m"
,
"n"
,
"o"
,
"p"
,
"q"
,
"r"
,
"s"
,
"t"
,
"u"
,
"v"
,
"w"
,
"x"
,
"y"
,
"z"
,
"{"
,
"|"
,
"}"
,
"~"
,
"
\\
x7f"
,
"
\\
x80"
,
"
\\
x81"
,
"
\\
x82"
,
"
\\
x83"
,
"
\\
x84"
,
"
\\
x85"
,
"
\\
x86"
,
"
\\
x87"
,
"
\\
x88"
,
"
\\
x89"
,
"
\\
x8a"
,
"
\\
x8b"
,
"
\\
x8c"
,
"
\\
x8d"
,
"
\\
x8e"
,
"
\\
x8f"
,
"
\\
x90"
,
"
\\
x91"
,
"
\\
x92"
,
"
\\
x93"
,
"
\\
x94"
,
"
\\
x95"
,
"
\\
x96"
,
"
\\
x97"
,
"
\\
x98"
,
"
\\
x99"
,
"
\\
x9a"
,
"
\\
x9b"
,
"
\\
x9c"
,
"
\\
x9d"
,
"
\\
x9e"
,
"
\\
x9f"
,
"
\\
xa0"
,
"
\\
xa1"
,
"
\\
xa2"
,
"
\\
xa3"
,
"
\\
xa4"
,
"
\\
xa5"
,
"
\\
xa6"
,
"
\\
xa7"
,
"
\\
xa8"
,
"
\\
xa9"
,
"
\\
xaa"
,
"
\\
xab"
,
"
\\
xac"
,
"
\\
xad"
,
"
\\
xae"
,
"
\\
xaf"
,
"
\\
xb0"
,
"
\\
xb1"
,
"
\\
xb2"
,
"
\\
xb3"
,
"
\\
xb4"
,
"
\\
xb5"
,
"
\\
xb6"
,
"
\\
xb7"
,
"
\\
xb8"
,
"
\\
xb9"
,
"
\\
xba"
,
"
\\
xbb"
,
"
\\
xbc"
,
"
\\
xbd"
,
"
\\
xbe"
,
"
\\
xbf"
,
"
\\
xc0"
,
"
\\
xc1"
,
"
\\
xc2"
,
"
\\
xc3"
,
"
\\
xc4"
,
"
\\
xc5"
,
"
\\
xc6"
,
"
\\
xc7"
,
"
\\
xc8"
,
"
\\
xc9"
,
"
\\
xca"
,
"
\\
xcb"
,
"
\\
xcc"
,
"
\\
xcd"
,
"
\\
xce"
,
"
\\
xcf"
,
"
\\
xd0"
,
"
\\
xd1"
,
"
\\
xd2"
,
"
\\
xd3"
,
"
\\
xd4"
,
"
\\
xd5"
,
"
\\
xd6"
,
"
\\
xd7"
,
"
\\
xd8"
,
"
\\
xd9"
,
"
\\
xda"
,
"
\\
xdb"
,
"
\\
xdc"
,
"
\\
xdd"
,
"
\\
xde"
,
"
\\
xdf"
,
"
\\
xe0"
,
"
\\
xe1"
,
"
\\
xe2"
,
"
\\
xe3"
,
"
\\
xe4"
,
"
\\
xe5"
,
"
\\
xe6"
,
"
\\
xe7"
,
"
\\
xe8"
,
"
\\
xe9"
,
"
\\
xea"
,
"
\\
xeb"
,
"
\\
xec"
,
"
\\
xed"
,
"
\\
xee"
,
"
\\
xef"
,
"
\\
xf0"
,
"
\\
xf1"
,
"
\\
xf2"
,
"
\\
xf3"
,
"
\\
xf4"
,
"
\\
xf5"
,
"
\\
xf6"
,
"
\\
xf7"
,
"
\\
xf8"
,
"
\\
xf9"
,
"
\\
xfa"
,
"
\\
xfb"
,
"
\\
xfc"
,
"
\\
xfd"
,
"
\\
xfe"
,
"
\\
xff"
};
int
converStringToReadable
(
char
*
str
,
int
size
,
char
*
buf
,
int
bufsize
)
{
char
*
pstr
=
str
;
char
*
pbuf
=
buf
;
while
(
size
>
0
)
{
if
(
*
pstr
==
'\0'
)
break
;
pbuf
=
stpcpy
(
pbuf
,
ascii_literal_list
[((
uint8_t
)(
*
pstr
))]);
pstr
++
;
size
--
;
}
*
pbuf
=
'\0'
;
return
0
;
}
int
convertNCharToReadable
(
char
*
str
,
int
size
,
char
*
buf
,
int
bufsize
)
{
char
*
pstr
=
str
;
char
*
pbuf
=
buf
;
// TODO
wchar_t
wc
;
while
(
size
>
0
)
{
if
(
*
pstr
==
'\0'
)
break
;
int
byte_width
=
mbtowc
(
&
wc
,
pstr
,
MB_CUR_MAX
);
if
((
int
)
wc
<
256
)
{
pbuf
=
stpcpy
(
pbuf
,
ascii_literal_list
[(
int
)
wc
]);
}
else
{
memcpy
(
pbuf
,
pstr
,
byte_width
);
pbuf
+=
byte_width
;
}
pstr
+=
byte_width
;
}
*
pbuf
=
'\0'
;
return
0
;
}
void
taosDumpCharset
(
FILE
*
fp
)
{
char
charsetline
[
256
];
fseek
(
fp
,
0
,
SEEK_SET
);
sprintf
(
charsetline
,
"#!%s
\n
"
,
tsCharset
);
fwrite
(
charsetline
,
strlen
(
charsetline
),
1
,
fp
);
}
void
taosLoadFileCharset
(
FILE
*
fp
,
char
*
fcharset
)
{
char
*
line
=
NULL
;
size_t
line_size
=
0
;
fseek
(
fp
,
0
,
SEEK_SET
);
ssize_t
size
=
getline
(
&
line
,
&
line_size
,
fp
);
if
(
size
<=
2
)
{
goto
_exit_no_charset
;
}
if
(
strncmp
(
line
,
"#!"
,
2
)
!=
0
)
{
goto
_exit_no_charset
;
}
if
(
line
[
size
-
1
]
==
'\n'
)
{
line
[
size
-
1
]
=
'\0'
;
size
--
;
}
strcpy
(
fcharset
,
line
+
2
);
tfree
(
line
);
return
;
if
(
count
>=
arguments
->
data_batch
)
{
fprintf
(
fp
,
"
\n
"
);
count
=
0
;
}
else
{
fprintf
(
fp
,
"
\\\n
"
);
_exit_no_charset:
fseek
(
fp
,
0
,
SEEK_SET
);
*
fcharset
=
'\0'
;
tfree
(
line
);
return
;
}
// ======== dumpIn support multi threads functions ================================//
static
char
**
tsDumpInSqlFiles
=
NULL
;
static
int32_t
tsSqlFileNum
=
0
;
static
char
tsDbSqlFile
[
TSDB_FILENAME_LEN
]
=
{
0
};
static
char
tsfCharset
[
64
]
=
{
0
};
static
int
taosGetFilesNum
(
const
char
*
directoryName
,
const
char
*
prefix
)
{
char
cmd
[
1024
]
=
{
0
};
sprintf
(
cmd
,
"ls %s/*.%s | wc -l "
,
directoryName
,
prefix
);
FILE
*
fp
=
popen
(
cmd
,
"r"
);
if
(
fp
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: failed to execute:%s, error:%s
\n
"
,
cmd
,
strerror
(
errno
));
exit
(
0
);
}
int
fileNum
=
0
;
if
(
fscanf
(
fp
,
"%d"
,
&
fileNum
)
!=
1
)
{
fprintf
(
stderr
,
"ERROR: failed to execute:%s, parse result error
\n
"
,
cmd
);
exit
(
0
);
}
fprintf
(
fp
,
"
\n
"
);
if
(
fileNum
<=
0
)
{
fprintf
(
stderr
,
"ERROR: directory:%s is empry
\n
"
,
directoryName
);
exit
(
0
);
}
if
(
tbuf
)
free
(
tbuf
);
taos_free_result
(
result
);
result
=
NULL
;
return
0
;
pclose
(
fp
);
return
fileNum
;
}
int
taosCheckParam
(
SDumpArguments
*
arguments
)
{
if
(
arguments
->
all_databases
&&
arguments
->
databases
)
{
fprintf
(
stderr
,
"conflict option --all-databases and --databases
\n
"
);
return
-
1
;
static
void
taosParseDirectory
(
const
char
*
directoryName
,
const
char
*
prefix
,
char
**
fileArray
,
int
totalFiles
)
{
char
cmd
[
1024
]
=
{
0
};
sprintf
(
cmd
,
"ls %s/*.%s | sort"
,
directoryName
,
prefix
);
FILE
*
fp
=
popen
(
cmd
,
"r"
);
if
(
fp
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: failed to execute:%s, error:%s
\n
"
,
cmd
,
strerror
(
errno
));
exit
(
0
);
}
if
(
arguments
->
start_time
>
arguments
->
end_time
)
{
fprintf
(
stderr
,
"start time is larger than end time
\n
"
);
return
-
1
;
int
fileNum
=
0
;
while
(
fscanf
(
fp
,
"%s"
,
fileArray
[
fileNum
++
]))
{
if
(
strcmp
(
fileArray
[
fileNum
-
1
],
tsDbSqlFile
)
==
0
)
{
fileNum
--
;
}
if
(
arguments
->
arg_list_len
==
0
)
{
if
((
!
arguments
->
all_databases
)
&&
(
!
arguments
->
isDumpIn
))
{
fprintf
(
stderr
,
"taosdump requires parameters
\n
"
);
return
-
1
;
if
(
fileNum
>=
totalFiles
)
{
break
;
}
}
if
(
arguments
->
isDumpIn
&&
(
strcmp
(
arguments
->
output
,
DEFAULT_DUMP_FILE
)
!=
0
)
)
{
fprintf
(
stderr
,
"
duplicate parameter input and output file
\n
"
);
return
-
1
;
if
(
fileNum
!=
totalFiles
)
{
fprintf
(
stderr
,
"
ERROR: directory:%s changed while read
\n
"
,
directoryName
);
exit
(
0
)
;
}
if
(
!
arguments
->
isDumpIn
&&
arguments
->
encode
!=
NULL
)
{
fprintf
(
stderr
,
"invalid option in dump out
\n
"
);
return
-
1
;
pclose
(
fp
);
}
static
void
taosCheckTablesSQLFile
(
const
char
*
directoryName
)
{
char
cmd
[
1024
]
=
{
0
};
sprintf
(
cmd
,
"ls %s/dbs.sql"
,
directoryName
);
FILE
*
fp
=
popen
(
cmd
,
"r"
);
if
(
fp
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: failed to execute:%s, error:%s
\n
"
,
cmd
,
strerror
(
errno
));
exit
(
0
);
}
return
0
;
while
(
fscanf
(
fp
,
"%s"
,
tsDbSqlFile
))
{
break
;
}
pclose
(
fp
);
}
bool
isEmptyCommand
(
char
*
cmd
)
{
char
*
pchar
=
cmd
;
static
void
taosMallocSQLFiles
()
{
tsDumpInSqlFiles
=
(
char
**
)
calloc
(
tsSqlFileNum
,
sizeof
(
char
*
));
for
(
int
i
=
0
;
i
<
tsSqlFileNum
;
i
++
)
{
tsDumpInSqlFiles
[
i
]
=
calloc
(
1
,
TSDB_FILENAME_LEN
);
}
}
while
(
*
pchar
!=
'\0'
)
{
if
(
*
pchar
!=
' '
)
return
false
;
pchar
++
;
static
void
taosFreeSQLFiles
()
{
for
(
int
i
=
0
;
i
<
tsSqlFileNum
;
i
++
)
{
tfree
(
tsDumpInSqlFiles
[
i
]);
}
tfree
(
tsDumpInSqlFiles
);
}
return
true
;
static
void
taosGetDirectoryFileList
(
char
*
inputDir
)
{
struct
stat
fileStat
;
if
(
stat
(
inputDir
,
&
fileStat
)
<
0
)
{
fprintf
(
stderr
,
"ERROR: %s not exist
\n
"
,
inputDir
);
exit
(
0
);
}
if
(
fileStat
.
st_mode
&
S_IFDIR
)
{
taosCheckTablesSQLFile
(
inputDir
);
tsSqlFileNum
=
taosGetFilesNum
(
inputDir
,
"sql"
);
int
totalSQLFileNum
=
tsSqlFileNum
;
if
(
tsDbSqlFile
[
0
]
!=
0
)
{
tsSqlFileNum
--
;
}
taosMallocSQLFiles
();
taosParseDirectory
(
inputDir
,
"sql"
,
tsDumpInSqlFiles
,
tsSqlFileNum
);
fprintf
(
stdout
,
"
\n
start to dispose %d files in %s
\n
"
,
totalSQLFileNum
,
inputDir
);
}
else
{
fprintf
(
stderr
,
"ERROR: %s is not a directory
\n
"
,
inputDir
);
exit
(
0
);
}
}
void
taosReplaceCtrlChar
(
char
*
str
)
{
_Bool
ctrlOn
=
false
;
char
*
pstr
=
NULL
;
static
FILE
*
taosOpenDumpInFile
(
char
*
fptr
)
{
wordexp_t
full_path
;
for
(
pstr
=
str
;
*
str
!=
'\0'
;
++
str
)
{
if
(
ctrlOn
)
{
switch
(
*
str
)
{
case
'n'
:
*
pstr
=
'\n'
;
pstr
++
;
break
;
case
'r'
:
*
pstr
=
'\r'
;
pstr
++
;
break
;
case
't'
:
*
pstr
=
'\t'
;
pstr
++
;
break
;
case
'\\'
:
*
pstr
=
'\\'
;
pstr
++
;
break
;
case
'\''
:
*
pstr
=
'\''
;
pstr
++
;
break
;
default:
break
;
if
(
wordexp
(
fptr
,
&
full_path
,
0
)
!=
0
)
{
fprintf
(
stderr
,
"ERROR: illegal file name: %s
\n
"
,
fptr
);
return
NULL
;
}
ctrlOn
=
false
;
}
else
{
if
(
*
str
==
'\\'
)
{
ctrlOn
=
true
;
}
else
{
*
pstr
=
*
str
;
pstr
++
;
char
*
fname
=
full_path
.
we_wordv
[
0
];
if
(
access
(
fname
,
F_OK
)
!=
0
)
{
fprintf
(
stderr
,
"ERROR: file %s is not exist
\n
"
,
fptr
);
wordfree
(
&
full_path
);
return
NULL
;
}
if
(
access
(
fname
,
R_OK
)
!=
0
)
{
fprintf
(
stderr
,
"ERROR: file %s is not readable
\n
"
,
fptr
);
wordfree
(
&
full_path
);
return
NULL
;
}
FILE
*
f
=
fopen
(
fname
,
"r"
);
if
(
f
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: failed to open file %s
\n
"
,
fname
);
wordfree
(
&
full_path
);
return
NULL
;
}
*
pstr
=
'\0'
;
}
wordfree
(
&
full_path
);
int
taosDumpIn
(
SDumpArguments
*
arguments
)
{
assert
(
arguments
->
isDumpIn
);
return
f
;
}
int
taosDumpInOneFile_old
(
TAOS
*
taos
,
FILE
*
fp
,
char
*
fcharset
,
char
*
encode
)
{
char
*
command
=
NULL
;
char
*
lcommand
=
NULL
;
int
tsize
=
0
;
FILE
*
fp
=
NULL
;
char
*
line
=
NULL
;
char
*
line
=
NULL
;
_Bool
isRun
=
true
;
size_t
line_size
=
0
;
char
*
pstr
=
NULL
,
*
lstr
=
NULL
;
iconv_t
cd
=
(
iconv_t
)
-
1
;
char
*
pstr
=
NULL
;
char
*
lstr
=
NULL
;
size_t
inbytesleft
=
0
;
size_t
outbytesleft
=
COMMAND_SIZE
;
char
fcharset
[
64
];
char
*
tcommand
=
NULL
;
fp
=
fopen
(
arguments
->
input
,
"r"
);
if
(
fp
==
NULL
)
{
fprintf
(
stderr
,
"failed to open input file %s
\n
"
,
arguments
->
input
);
return
-
1
;
}
taosLoadFileCharset
(
fp
,
fcharset
);
taos
=
taos_connect
(
arguments
->
host
,
arguments
->
user
,
arguments
->
password
,
NULL
,
arguments
->
port
);
if
(
taos
==
NULL
)
{
fprintf
(
stderr
,
"failed to connect to TDengine server
\n
"
);
goto
_dumpin_exit_failure
;
}
char
*
tcommand
=
NULL
;
char
*
charsetOfFile
=
NULL
;
iconv_t
cd
=
(
iconv_t
)(
-
1
);
command
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
lcommand
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
...
...
@@ -1172,12 +1854,14 @@ int taosDumpIn(SDumpArguments *arguments) {
// Resolve locale
if
(
*
fcharset
!=
'\0'
)
{
arguments
->
encode
=
fcharset
;
charsetOfFile
=
fcharset
;
}
else
{
charsetOfFile
=
encode
;
}
if
(
arguments
->
encode
!=
NULL
&&
strcasecmp
(
tsCharset
,
arguments
->
encod
e
)
!=
0
)
{
cd
=
iconv_open
(
tsCharset
,
arguments
->
encod
e
);
if
(
cd
==
(
iconv_t
)
-
1
)
{
if
(
charsetOfFile
!=
NULL
&&
strcasecmp
(
tsCharset
,
charsetOfFil
e
)
!=
0
)
{
cd
=
iconv_open
(
tsCharset
,
charsetOfFil
e
);
if
(
cd
==
(
(
iconv_t
)(
-
1
))
)
{
fprintf
(
stderr
,
"Failed to open iconv handle
\n
"
);
goto
_dumpin_exit_failure
;
}
...
...
@@ -1196,19 +1880,18 @@ int taosDumpIn(SDumpArguments *arguments) {
pstr
=
command
;
lstr
=
lcommand
;
outbytesleft
=
COMMAND_SIZE
;
if
(
cd
!=
(
iconv_t
)
-
1
)
{
if
(
cd
!=
(
(
iconv_t
)(
-
1
))
)
{
iconv
(
cd
,
&
pstr
,
&
inbytesleft
,
&
lstr
,
&
outbytesleft
);
tcommand
=
lcommand
;
}
else
{
tcommand
=
command
;
}
taosReplaceCtrlChar
(
tcommand
);
TAOS_RES
*
result
=
taos_query
(
taos
,
tcommand
);
if
(
taos_errno
(
result
)
!=
0
){
fprintf
(
stderr
,
"linenu: %"
PRId64
" failed to run command %s reason:%s
\n
continue...
\n
"
,
linenu
,
command
,
taos_errstr
(
result
));
taos_free_result
(
result
);
if
(
queryDB
(
taos
,
tcommand
)
!=
0
)
{
fprintf
(
stderr
,
"error sql: linenu: %"
PRId64
" failed
\n
"
,
linenu
);
exit
(
0
);
}
pstr
=
command
;
...
...
@@ -1248,21 +1931,17 @@ int taosDumpIn(SDumpArguments *arguments) {
pstr
=
command
;
lstr
=
lcommand
;
outbytesleft
=
COMMAND_SIZE
;
if
(
cd
!=
(
iconv_t
)
-
1
)
{
if
(
cd
!=
(
(
iconv_t
)(
-
1
))
)
{
iconv
(
cd
,
&
pstr
,
&
inbytesleft
,
&
lstr
,
&
outbytesleft
);
tcommand
=
lcommand
;
}
else
{
tcommand
=
command
;
}
taosReplaceCtrlChar
(
tcommand
);
TAOS_RES
*
result
=
taos_query
(
taos
,
tcommand
);
int32_t
code
=
taos_errno
(
result
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"linenu:%"
PRId64
" failed to run command %s reason: %s
\n
continue...
\n
"
,
linenu
,
command
,
taos_errstr
(
result
));
if
(
queryDB
(
taos
,
tcommand
)
!=
0
)
{
fprintf
(
stderr
,
"error sql: linenu:%"
PRId64
" failed
\n
"
,
linenu
);
exit
(
0
);
}
taos_free_result
(
result
);
}
pstr
=
command
;
...
...
@@ -1276,19 +1955,18 @@ int taosDumpIn(SDumpArguments *arguments) {
pstr
=
command
;
lstr
=
lcommand
;
outbytesleft
=
COMMAND_SIZE
;
if
(
cd
!=
(
iconv_t
)
-
1
)
{
if
(
cd
!=
(
(
iconv_t
)(
-
1
))
)
{
iconv
(
cd
,
&
pstr
,
&
inbytesleft
,
&
lstr
,
&
outbytesleft
);
tcommand
=
lcommand
;
}
else
{
tcommand
=
command
;
}
taosReplaceCtrlChar
(
lcommand
);
if
(
taos_query
(
taos
,
tcommand
)
==
NULL
)
fprintf
(
stderr
,
"linenu:%"
PRId64
" failed to run command %s reason:%s
\n
continue...
\n
"
,
linenu
,
command
,
taos_errstr
(
taos
));
if
(
queryDB
(
taos
,
tcommand
)
!=
0
)
fprintf
(
stderr
,
"error sql: linenu:%"
PRId64
" failed
\n
"
,
linenu
);
}
if
(
cd
!=
(
iconv_t
)
-
1
)
iconv_close
(
cd
);
if
(
cd
!=
(
(
iconv_t
)(
-
1
))
)
iconv_close
(
cd
);
tfree
(
line
);
tfree
(
command
);
tfree
(
lcommand
);
...
...
@@ -1297,7 +1975,7 @@ int taosDumpIn(SDumpArguments *arguments) {
return
0
;
_dumpin_exit_failure:
if
(
cd
!=
(
iconv_t
)
-
1
)
iconv_close
(
cd
);
if
(
cd
!=
(
(
iconv_t
)(
-
1
))
)
iconv_close
(
cd
);
tfree
(
command
);
tfree
(
lcommand
);
taos_close
(
taos
);
...
...
@@ -1305,97 +1983,143 @@ _dumpin_exit_failure:
return
-
1
;
}
char
*
ascii_literal_list
[]
=
{
"
\\
x00"
,
"
\\
x01"
,
"
\\
x02"
,
"
\\
x03"
,
"
\\
x04"
,
"
\\
x05"
,
"
\\
x06"
,
"
\\
x07"
,
"
\\
x08"
,
"
\\
t"
,
"
\\
n"
,
"
\\
x0b"
,
"
\\
x0c"
,
"
\\
r"
,
"
\\
x0e"
,
"
\\
x0f"
,
"
\\
x10"
,
"
\\
x11"
,
"
\\
x12"
,
"
\\
x13"
,
"
\\
x14"
,
"
\\
x15"
,
"
\\
x16"
,
"
\\
x17"
,
"
\\
x18"
,
"
\\
x19"
,
"
\\
x1a"
,
"
\\
x1b"
,
"
\\
x1c"
,
"
\\
x1d"
,
"
\\
x1e"
,
"
\\
x1f"
,
" "
,
"!"
,
"
\\\"
"
,
"#"
,
"$"
,
"%"
,
"&"
,
"
\\
'"
,
"("
,
")"
,
"*"
,
"+"
,
","
,
"-"
,
"."
,
"/"
,
"0"
,
"1"
,
"2"
,
"3"
,
"4"
,
"5"
,
"6"
,
"7"
,
"8"
,
"9"
,
":"
,
";"
,
"<"
,
"="
,
">"
,
"?"
,
"@"
,
"A"
,
"B"
,
"C"
,
"D"
,
"E"
,
"F"
,
"G"
,
"H"
,
"I"
,
"J"
,
"K"
,
"L"
,
"M"
,
"N"
,
"O"
,
"P"
,
"Q"
,
"R"
,
"S"
,
"T"
,
"U"
,
"V"
,
"W"
,
"X"
,
"Y"
,
"Z"
,
"["
,
"
\\\\
"
,
"]"
,
"^"
,
"_"
,
"`"
,
"a"
,
"b"
,
"c"
,
"d"
,
"e"
,
"f"
,
"g"
,
"h"
,
"i"
,
"j"
,
"k"
,
"l"
,
"m"
,
"n"
,
"o"
,
"p"
,
"q"
,
"r"
,
"s"
,
"t"
,
"u"
,
"v"
,
"w"
,
"x"
,
"y"
,
"z"
,
"{"
,
"|"
,
"}"
,
"~"
,
"
\\
x7f"
,
"
\\
x80"
,
"
\\
x81"
,
"
\\
x82"
,
"
\\
x83"
,
"
\\
x84"
,
"
\\
x85"
,
"
\\
x86"
,
"
\\
x87"
,
"
\\
x88"
,
"
\\
x89"
,
"
\\
x8a"
,
"
\\
x8b"
,
"
\\
x8c"
,
"
\\
x8d"
,
"
\\
x8e"
,
"
\\
x8f"
,
"
\\
x90"
,
"
\\
x91"
,
"
\\
x92"
,
"
\\
x93"
,
"
\\
x94"
,
"
\\
x95"
,
"
\\
x96"
,
"
\\
x97"
,
"
\\
x98"
,
"
\\
x99"
,
"
\\
x9a"
,
"
\\
x9b"
,
"
\\
x9c"
,
"
\\
x9d"
,
"
\\
x9e"
,
"
\\
x9f"
,
"
\\
xa0"
,
"
\\
xa1"
,
"
\\
xa2"
,
"
\\
xa3"
,
"
\\
xa4"
,
"
\\
xa5"
,
"
\\
xa6"
,
"
\\
xa7"
,
"
\\
xa8"
,
"
\\
xa9"
,
"
\\
xaa"
,
"
\\
xab"
,
"
\\
xac"
,
"
\\
xad"
,
"
\\
xae"
,
"
\\
xaf"
,
"
\\
xb0"
,
"
\\
xb1"
,
"
\\
xb2"
,
"
\\
xb3"
,
"
\\
xb4"
,
"
\\
xb5"
,
"
\\
xb6"
,
"
\\
xb7"
,
"
\\
xb8"
,
"
\\
xb9"
,
"
\\
xba"
,
"
\\
xbb"
,
"
\\
xbc"
,
"
\\
xbd"
,
"
\\
xbe"
,
"
\\
xbf"
,
"
\\
xc0"
,
"
\\
xc1"
,
"
\\
xc2"
,
"
\\
xc3"
,
"
\\
xc4"
,
"
\\
xc5"
,
"
\\
xc6"
,
"
\\
xc7"
,
"
\\
xc8"
,
"
\\
xc9"
,
"
\\
xca"
,
"
\\
xcb"
,
"
\\
xcc"
,
"
\\
xcd"
,
"
\\
xce"
,
"
\\
xcf"
,
"
\\
xd0"
,
"
\\
xd1"
,
"
\\
xd2"
,
"
\\
xd3"
,
"
\\
xd4"
,
"
\\
xd5"
,
"
\\
xd6"
,
"
\\
xd7"
,
"
\\
xd8"
,
"
\\
xd9"
,
"
\\
xda"
,
"
\\
xdb"
,
"
\\
xdc"
,
"
\\
xdd"
,
"
\\
xde"
,
"
\\
xdf"
,
"
\\
xe0"
,
"
\\
xe1"
,
"
\\
xe2"
,
"
\\
xe3"
,
"
\\
xe4"
,
"
\\
xe5"
,
"
\\
xe6"
,
"
\\
xe7"
,
"
\\
xe8"
,
"
\\
xe9"
,
"
\\
xea"
,
"
\\
xeb"
,
"
\\
xec"
,
"
\\
xed"
,
"
\\
xee"
,
"
\\
xef"
,
"
\\
xf0"
,
"
\\
xf1"
,
"
\\
xf2"
,
"
\\
xf3"
,
"
\\
xf4"
,
"
\\
xf5"
,
"
\\
xf6"
,
"
\\
xf7"
,
"
\\
xf8"
,
"
\\
xf9"
,
"
\\
xfa"
,
"
\\
xfb"
,
"
\\
xfc"
,
"
\\
xfd"
,
"
\\
xfe"
,
"
\\
xff"
};
int
taosDumpInOneFile
(
TAOS
*
taos
,
FILE
*
fp
,
char
*
fcharset
,
char
*
encode
,
char
*
fileName
)
{
int
read_len
=
0
;
char
*
cmd
=
NULL
;
size_t
cmd_len
=
0
;
char
*
line
=
NULL
;
size_t
line_len
=
0
;
int
converStringToReadable
(
char
*
str
,
int
size
,
char
*
buf
,
int
bufsize
)
{
char
*
pstr
=
str
;
char
*
pbuf
=
buf
;
while
(
size
>
0
)
{
if
(
*
pstr
==
'\0'
)
break
;
pbuf
=
stpcpy
(
pbuf
,
ascii_literal_list
[((
uint8_t
)(
*
pstr
))]);
pstr
++
;
size
--
;
cmd
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
cmd
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
return
-
1
;
}
*
pbuf
=
'\0'
;
return
0
;
}
int
convertNCharToReadable
(
char
*
str
,
int
size
,
char
*
buf
,
int
bufsize
)
{
char
*
pstr
=
str
;
char
*
pbuf
=
buf
;
// TODO
wchar_t
wc
;
while
(
size
>
0
)
{
if
(
*
pstr
==
'\0'
)
break
;
int
byte_width
=
mbtowc
(
&
wc
,
pstr
,
MB_CUR_MAX
);
int
lineNo
=
0
;
while
((
read_len
=
getline
(
&
line
,
&
line_len
,
fp
))
!=
-
1
)
{
++
lineNo
;
if
(
read_len
>=
COMMAND_SIZE
)
continue
;
line
[
--
read_len
]
=
'\0'
;
if
((
int
)
wc
<
256
)
{
pbuf
=
stpcpy
(
pbuf
,
ascii_literal_list
[(
int
)
wc
]);
}
else
if
(
byte_width
>
0
)
{
memcpy
(
pbuf
,
pstr
,
byte_width
);
pbuf
+=
byte_width
;
//if (read_len == 0 || isCommentLine(line)) { // line starts with #
if
(
read_len
==
0
)
{
continue
;
}
pstr
+=
byte_width
;
if
(
line
[
read_len
-
1
]
==
'\\'
)
{
line
[
read_len
-
1
]
=
' '
;
memcpy
(
cmd
+
cmd_len
,
line
,
read_len
);
cmd_len
+=
read_len
;
continue
;
}
*
pbuf
=
'\0'
;
memcpy
(
cmd
+
cmd_len
,
line
,
read_len
);
if
(
queryDB
(
taos
,
cmd
))
{
fprintf
(
stderr
,
"error sql: linenu:%d, file:%s
\n
"
,
lineNo
,
fileName
);
}
memset
(
cmd
,
0
,
COMMAND_SIZE
);
cmd_len
=
0
;
}
tfree
(
cmd
);
tfree
(
line
);
fclose
(
fp
);
return
0
;
}
void
taosDumpCharset
(
FILE
*
fp
)
{
char
charsetline
[
256
];
void
*
taosDumpInWorkThreadFp
(
void
*
arg
)
{
SThreadParaObj
*
pThread
=
(
SThreadParaObj
*
)
arg
;
for
(
int32_t
f
=
0
;
f
<
tsSqlFileNum
;
++
f
)
{
if
(
f
%
pThread
->
totalThreads
==
pThread
->
threadIndex
)
{
char
*
SQLFileName
=
tsDumpInSqlFiles
[
f
];
FILE
*
fp
=
taosOpenDumpInFile
(
SQLFileName
);
if
(
NULL
==
fp
)
{
continue
;
}
fprintf
(
stderr
,
"Success Open input file: %s
\n
"
,
SQLFileName
);
taosDumpInOneFile
(
pThread
->
taosCon
,
fp
,
tsfCharset
,
tsArguments
.
encode
,
SQLFileName
);
}
}
fseek
(
fp
,
0
,
SEEK_SET
);
sprintf
(
charsetline
,
"#!%s
\n
"
,
tsCharset
);
fwrite
(
charsetline
,
strlen
(
charsetline
),
1
,
fp
);
return
NULL
;
}
void
taosLoadFileCharset
(
FILE
*
fp
,
char
*
fcharset
)
{
char
*
line
=
NULL
;
size_t
line_size
=
0
;
static
void
taosStartDumpInWorkThreads
(
struct
arguments
*
args
)
{
pthread_attr_t
thattr
;
SThreadParaObj
*
pThread
;
int32_t
totalThreads
=
args
->
thread_num
;
fseek
(
fp
,
0
,
SEEK_SET
);
ssize_t
size
=
getline
(
&
line
,
&
line_size
,
fp
);
if
(
size
<=
2
)
{
goto
_exit_no_charset
;
if
(
totalThreads
>
tsSqlFileNum
)
{
totalThreads
=
tsSqlFileNum
;
}
if
(
strncmp
(
line
,
"#!"
,
2
)
!=
0
)
{
goto
_exit_no_charset
;
SThreadParaObj
*
threadObj
=
(
SThreadParaObj
*
)
calloc
(
totalThreads
,
sizeof
(
SThreadParaObj
));
for
(
int32_t
t
=
0
;
t
<
totalThreads
;
++
t
)
{
pThread
=
threadObj
+
t
;
pThread
->
threadIndex
=
t
;
pThread
->
totalThreads
=
totalThreads
;
pThread
->
taosCon
=
taos_connect
(
args
->
host
,
args
->
user
,
args
->
password
,
NULL
,
args
->
port
);
if
(
pThread
->
taosCon
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: thread:%d failed connect to TDengine, error:%s
\n
"
,
pThread
->
threadIndex
,
taos_errstr
(
pThread
->
taosCon
));
exit
(
0
);
}
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
(
pThread
->
threadID
),
&
thattr
,
taosDumpInWorkThreadFp
,
(
void
*
)
pThread
)
!=
0
)
{
fprintf
(
stderr
,
"ERROR: thread:%d failed to start
\n
"
,
pThread
->
threadIndex
);
exit
(
0
);
}
if
(
line
[
size
-
1
]
==
'\n'
)
{
line
[
size
-
1
]
=
'\0'
;
size
--
;
}
strcpy
(
fcharset
,
line
+
2
);
tfree
(
line
);
return
;
for
(
int
t
=
0
;
t
<
totalThreads
;
++
t
)
{
pthread_join
(
threadObj
[
t
].
threadID
,
NULL
);
}
_exit_no_charset:
fseek
(
fp
,
0
,
SEEK_SET
);
*
fcharset
=
'\0'
;
tfree
(
line
);
return
;
for
(
int
t
=
0
;
t
<
totalThreads
;
++
t
)
{
taos_close
(
threadObj
[
t
].
taosCon
);
}
free
(
threadObj
);
}
int
taosDumpIn
(
struct
arguments
*
arguments
)
{
assert
(
arguments
->
isDumpIn
);
TAOS
*
taos
=
NULL
;
FILE
*
fp
=
NULL
;
taos
=
taos_connect
(
arguments
->
host
,
arguments
->
user
,
arguments
->
password
,
NULL
,
arguments
->
port
);
if
(
taos
==
NULL
)
{
fprintf
(
stderr
,
"failed to connect to TDengine server
\n
"
);
return
-
1
;
}
taosGetDirectoryFileList
(
arguments
->
inpath
);
if
(
tsDbSqlFile
[
0
]
!=
0
)
{
fp
=
taosOpenDumpInFile
(
tsDbSqlFile
);
if
(
NULL
==
fp
)
{
fprintf
(
stderr
,
"failed to open input file %s
\n
"
,
tsDbSqlFile
);
return
-
1
;
}
fprintf
(
stderr
,
"Success Open input file: %s
\n
"
,
tsDbSqlFile
);
taosLoadFileCharset
(
fp
,
tsfCharset
);
taosDumpInOneFile
(
taos
,
fp
,
tsfCharset
,
arguments
->
encode
,
tsDbSqlFile
);
}
taosStartDumpInWorkThreads
(
arguments
);
taos_close
(
taos
);
taosFreeSQLFiles
();
return
0
;
}
src/kit/taosmigrate/taosmigrate.c
浏览文件 @
807e86a3
...
...
@@ -51,6 +51,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
break
;
case
'f'
:
arguments
->
fqdn
=
arg
;
break
;
case
'g'
:
arguments
->
dnodeGroups
=
arg
;
break
;
...
...
src/kit/taosmigrate/taosmigrateMnodeWal.c
浏览文件 @
807e86a3
...
...
@@ -96,6 +96,7 @@ void walModWalFile(char* walfile) {
if
(
wfd
<
0
)
{
printf
(
"wal:%s, failed to open(%s)
\n
"
,
newWalFile
,
strerror
(
errno
));
free
(
buffer
);
close
(
rfd
);
return
;
}
...
...
@@ -116,6 +117,11 @@ void walModWalFile(char* walfile) {
break
;
}
if
(
pHead
->
len
>=
1024000
-
sizeof
(
SWalHead
))
{
printf
(
"wal:%s, SWalHead.len(%d) overflow, skip the rest of file
\n
"
,
walfile
,
pHead
->
len
);
break
;
}
ret
=
read
(
rfd
,
pHead
->
cont
,
pHead
->
len
);
if
(
ret
!=
pHead
->
len
)
{
printf
(
"wal:%s, failed to read body, skip, len:%d ret:%d
\n
"
,
walfile
,
pHead
->
len
,
ret
);
...
...
src/kit/taosmigrate/taosmigrateVnodeCfg.c
浏览文件 @
807e86a3
...
...
@@ -99,6 +99,8 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
goto
PARSE_OVER
;
}
content
[
maxLen
]
=
(
char
)
0
;
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
printf
(
"failed to json parse %s, invalid json format
\n
"
,
cfgFile
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录