Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
d0cb4390
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
d0cb4390
编写于
2月 29, 2020
作者:
H
haojun Liao
提交者:
GitHub
2月 29, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1285 from taosdata/feature/lihui
Feature/lihui
上级
a0aecc78
5cb77083
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
1278 addition
and
509 deletion
+1278
-509
src/client/src/tscFunctionImpl.c
src/client/src/tscFunctionImpl.c
+68
-46
src/kit/taosdump/CMakeLists.txt
src/kit/taosdump/CMakeLists.txt
+4
-4
src/kit/taosdump/taosdump.c
src/kit/taosdump/taosdump.c
+1206
-459
未找到文件。
src/client/src/tscFunctionImpl.c
浏览文件 @
d0cb4390
...
...
@@ -139,10 +139,10 @@ typedef struct STSCompInfo {
}
STSCompInfo
;
typedef
struct
SRateInfo
{
int64_t
CorrectionValue
;
int64_t
firstValue
;
double
CorrectionValue
;
double
firstValue
;
double
lastValue
;
TSKEY
firstKey
;
int64_t
lastValue
;
TSKEY
lastKey
;
int8_t
hasResult
;
// flag to denote has value
bool
isIRate
;
// true for IRate functions, false for Rate functions
...
...
@@ -4389,7 +4389,7 @@ static double do_calc_rate(const SRateInfo* pRateInfo) {
return
0
;
}
int64_t
diff
=
0
;
double
diff
=
0
;
if
(
pRateInfo
->
isIRate
)
{
diff
=
pRateInfo
->
lastValue
;
...
...
@@ -4408,7 +4408,7 @@ static double do_calc_rate(const SRateInfo* pRateInfo) {
double
resultVal
=
((
double
)
diff
)
/
duration
;
pTrace
(
"do_calc_rate() isIRate:%d firstKey:%"
PRId64
" lastKey:%"
PRId64
" firstValue:%
"
PRId64
" lastValue:%"
PRId64
" CorrectionValue:%"
PRId64
"
resultVal:%f"
,
pTrace
(
"do_calc_rate() isIRate:%d firstKey:%"
PRId64
" lastKey:%"
PRId64
" firstValue:%
f lastValue:%f CorrectionValue:%f
resultVal:%f"
,
pRateInfo
->
isIRate
,
pRateInfo
->
firstKey
,
pRateInfo
->
lastKey
,
pRateInfo
->
firstValue
,
pRateInfo
->
lastValue
,
pRateInfo
->
CorrectionValue
,
resultVal
);
return
resultVal
;
...
...
@@ -4426,8 +4426,8 @@ static bool rate_function_setup(SQLFunctionCtx *pCtx) {
pInfo
->
CorrectionValue
=
0
;
pInfo
->
firstKey
=
INT64_MIN
;
pInfo
->
lastKey
=
INT64_MIN
;
pInfo
->
firstValue
=
INT64_MIN
;
pInfo
->
lastValue
=
INT64_MIN
;
pInfo
->
firstValue
=
-
DBL_MAX
;
pInfo
->
lastValue
=
-
DBL_MAX
;
pInfo
->
num
=
0
;
pInfo
->
sum
=
0
;
...
...
@@ -4457,41 +4457,47 @@ static void rate_function(SQLFunctionCtx *pCtx) {
notNullElems
++
;
int64_t
v
=
0
;
double
v
=
0
;
switch
(
pCtx
->
inputType
)
{
case
TSDB_DATA_TYPE_TINYINT
:
v
=
(
int64_t
)
GET_INT8_VAL
(
pData
);
v
=
(
double
)
GET_INT8_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
v
=
(
int64_t
)
GET_INT16_VAL
(
pData
);
v
=
(
double
)
GET_INT16_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_INT
:
v
=
(
int64_t
)
GET_INT32_VAL
(
pData
);
v
=
(
double
)
GET_INT32_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_BIGINT
:
v
=
(
int64_t
)
GET_INT64_VAL
(
pData
);
v
=
(
double
)
GET_INT64_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_FLOAT
:
v
=
(
double
)
GET_FLOAT_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
v
=
(
double
)
GET_DOUBLE_VAL
(
pData
);
break
;
default:
assert
(
0
);
}
if
((
INT64_MIN
==
pRateInfo
->
firstValue
)
||
(
INT64_MIN
==
pRateInfo
->
firstKey
))
{
if
((
-
DBL_MAX
==
pRateInfo
->
firstValue
)
||
(
INT64_MIN
==
pRateInfo
->
firstKey
))
{
pRateInfo
->
firstValue
=
v
;
pRateInfo
->
firstKey
=
primaryKey
[
i
];
pTrace
(
"firstValue:%
"
PRId64
"
firstKey:%"
PRId64
,
pRateInfo
->
firstValue
,
pRateInfo
->
firstKey
);
pTrace
(
"firstValue:%
f
firstKey:%"
PRId64
,
pRateInfo
->
firstValue
,
pRateInfo
->
firstKey
);
}
if
(
INT64_MIN
==
pRateInfo
->
lastValue
)
{
if
(
-
DBL_MAX
==
pRateInfo
->
lastValue
)
{
pRateInfo
->
lastValue
=
v
;
}
else
if
(
v
<
pRateInfo
->
lastValue
)
{
pRateInfo
->
CorrectionValue
+=
pRateInfo
->
lastValue
;
pTrace
(
"CorrectionValue:%
"
PRId64
,
pRateInfo
->
CorrectionValue
);
pTrace
(
"CorrectionValue:%
f"
,
pRateInfo
->
CorrectionValue
);
}
pRateInfo
->
lastValue
=
v
;
pRateInfo
->
lastKey
=
primaryKey
[
i
];
pTrace
(
"lastValue:%
"
PRId64
"
lastKey:%"
PRId64
,
pRateInfo
->
lastValue
,
pRateInfo
->
lastKey
);
pTrace
(
"lastValue:%
f
lastKey:%"
PRId64
,
pRateInfo
->
lastValue
,
pRateInfo
->
lastKey
);
}
if
(
!
pCtx
->
hasNull
)
{
...
...
@@ -4522,30 +4528,36 @@ static void rate_function_f(SQLFunctionCtx *pCtx, int32_t index) {
SRateInfo
*
pRateInfo
=
(
SRateInfo
*
)
pResInfo
->
interResultBuf
;
TSKEY
*
primaryKey
=
pCtx
->
ptsList
;
int64_t
v
=
0
;
double
v
=
0
;
switch
(
pCtx
->
inputType
)
{
case
TSDB_DATA_TYPE_TINYINT
:
v
=
(
int64_t
)
GET_INT8_VAL
(
pData
);
v
=
(
double
)
GET_INT8_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
v
=
(
int64_t
)
GET_INT16_VAL
(
pData
);
v
=
(
double
)
GET_INT16_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_INT
:
v
=
(
int64_t
)
GET_INT32_VAL
(
pData
);
v
=
(
double
)
GET_INT32_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_BIGINT
:
v
=
(
int64_t
)
GET_INT64_VAL
(
pData
);
v
=
(
double
)
GET_INT64_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_FLOAT
:
v
=
(
double
)
GET_FLOAT_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
v
=
(
double
)
GET_DOUBLE_VAL
(
pData
);
break
;
default:
assert
(
0
);
}
if
((
INT64_MIN
==
pRateInfo
->
firstValue
)
||
(
INT64_MIN
==
pRateInfo
->
firstKey
))
{
if
((
-
DBL_MAX
==
pRateInfo
->
firstValue
)
||
(
INT64_MIN
==
pRateInfo
->
firstKey
))
{
pRateInfo
->
firstValue
=
v
;
pRateInfo
->
firstKey
=
primaryKey
[
index
];
}
if
(
INT64_MIN
==
pRateInfo
->
lastValue
)
{
if
(
-
DBL_MAX
==
pRateInfo
->
lastValue
)
{
pRateInfo
->
lastValue
=
v
;
}
else
if
(
v
<
pRateInfo
->
lastValue
)
{
pRateInfo
->
CorrectionValue
+=
pRateInfo
->
lastValue
;
...
...
@@ -4554,7 +4566,7 @@ static void rate_function_f(SQLFunctionCtx *pCtx, int32_t index) {
pRateInfo
->
lastValue
=
v
;
pRateInfo
->
lastKey
=
primaryKey
[
index
];
pTrace
(
"====%p rate_function_f() index:%d lastValue:%
"
PRId64
" lastKey:%"
PRId64
" CorrectionValue:%"
PRId64
,
pCtx
,
index
,
pRateInfo
->
lastValue
,
pRateInfo
->
lastKey
,
pRateInfo
->
CorrectionValue
);
pTrace
(
"====%p rate_function_f() index:%d lastValue:%
f lastKey:%"
PRId64
" CorrectionValue:%f"
,
pCtx
,
index
,
pRateInfo
->
lastValue
,
pRateInfo
->
lastKey
,
pRateInfo
->
CorrectionValue
);
SET_VAL
(
pCtx
,
1
,
1
);
...
...
@@ -4591,7 +4603,7 @@ static void rate_func_merge(SQLFunctionCtx *pCtx) {
numOfNotNull
++
;
memcpy
(
pBuf
,
pInput
,
sizeof
(
SRateInfo
));
pTrace
(
"%p rate_func_merge() isIRate:%d firstKey:%"
PRId64
" lastKey:%"
PRId64
" firstValue:%
"
PRId64
" lastValue:%"
PRId64
" CorrectionValue:%"
PRId64
,
pTrace
(
"%p rate_func_merge() isIRate:%d firstKey:%"
PRId64
" lastKey:%"
PRId64
" firstValue:%
f lastValue:%f CorrectionValue:%f"
,
pCtx
,
pInput
->
isIRate
,
pInput
->
firstKey
,
pInput
->
lastKey
,
pInput
->
firstValue
,
pInput
->
lastValue
,
pInput
->
CorrectionValue
);
}
...
...
@@ -4614,17 +4626,15 @@ static void rate_func_copy(SQLFunctionCtx *pCtx) {
pResInfo
->
hasResult
=
((
SRateInfo
*
)
pCtx
->
aInputElemBuf
)
->
hasResult
;
SRateInfo
*
pRateInfo
=
(
SRateInfo
*
)
pCtx
->
aInputElemBuf
;
pTrace
(
"%p rate_func_second_merge() firstKey:%"
PRId64
" lastKey:%"
PRId64
" firstValue:%
"
PRId64
" lastValue:%"
PRId64
" CorrectionValue:%"
PRId64
"
hasResult:%d"
,
pTrace
(
"%p rate_func_second_merge() firstKey:%"
PRId64
" lastKey:%"
PRId64
" firstValue:%
f lastValue:%f CorrectionValue:%f
hasResult:%d"
,
pCtx
,
pRateInfo
->
firstKey
,
pRateInfo
->
lastKey
,
pRateInfo
->
firstValue
,
pRateInfo
->
lastValue
,
pRateInfo
->
CorrectionValue
,
pRateInfo
->
hasResult
);
}
static
void
rate_finalizer
(
SQLFunctionCtx
*
pCtx
)
{
SResultInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SRateInfo
*
pRateInfo
=
(
SRateInfo
*
)
pResInfo
->
interResultBuf
;
pTrace
(
"%p isIRate:%d firstKey:%"
PRId64
" lastKey:%"
PRId64
" firstValue:%
"
PRId64
" lastValue:%"
PRId64
" CorrectionValue:%"
PRId64
"
hasResult:%d"
,
pTrace
(
"%p isIRate:%d firstKey:%"
PRId64
" lastKey:%"
PRId64
" firstValue:%
f lastValue:%f CorrectionValue:%f
hasResult:%d"
,
pCtx
,
pRateInfo
->
isIRate
,
pRateInfo
->
firstKey
,
pRateInfo
->
lastKey
,
pRateInfo
->
firstValue
,
pRateInfo
->
lastValue
,
pRateInfo
->
CorrectionValue
,
pRateInfo
->
hasResult
);
if
(
pRateInfo
->
hasResult
!=
DATA_SET_FLAG
)
{
...
...
@@ -4650,7 +4660,7 @@ static void irate_function(SQLFunctionCtx *pCtx) {
int32_t
notNullElems
=
0
;
SResultInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SRateInfo
*
pRateInfo
=
(
SRateInfo
*
)
pResInfo
->
interResultBuf
;
SRateInfo
*
pRateInfo
=
(
SRateInfo
*
)
pResInfo
->
interResultBuf
;
TSKEY
*
primaryKey
=
pCtx
->
ptsList
;
pTrace
(
"%p irate_function() size:%d, hasNull:%d"
,
pCtx
,
pCtx
->
size
,
pCtx
->
hasNull
);
...
...
@@ -4668,38 +4678,44 @@ static void irate_function(SQLFunctionCtx *pCtx) {
notNullElems
++
;
int64_t
v
=
0
;
double
v
=
0
;
switch
(
pCtx
->
inputType
)
{
case
TSDB_DATA_TYPE_TINYINT
:
v
=
(
int64_t
)
GET_INT8_VAL
(
pData
);
v
=
(
double
)
GET_INT8_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
v
=
(
int64_t
)
GET_INT16_VAL
(
pData
);
v
=
(
double
)
GET_INT16_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_INT
:
v
=
(
int64_t
)
GET_INT32_VAL
(
pData
);
v
=
(
double
)
GET_INT32_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_BIGINT
:
v
=
(
int64_t
)
GET_INT64_VAL
(
pData
);
v
=
(
double
)
GET_INT64_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_FLOAT
:
v
=
(
double
)
GET_FLOAT_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
v
=
(
double
)
GET_DOUBLE_VAL
(
pData
);
break
;
default:
assert
(
0
);
}
// TODO: calc once if only call this function once ????
if
((
INT64_MIN
==
pRateInfo
->
lastKey
)
||
(
INT64_MIN
==
pRateInfo
->
lastValue
))
{
if
((
INT64_MIN
==
pRateInfo
->
lastKey
)
||
(
-
DBL_MAX
==
pRateInfo
->
lastValue
))
{
pRateInfo
->
lastValue
=
v
;
pRateInfo
->
lastKey
=
primaryKey
[
i
];
pTrace
(
"%p irate_function() lastValue:%
"
PRId64
"
lastKey:%"
PRId64
,
pCtx
,
pRateInfo
->
lastValue
,
pRateInfo
->
lastKey
);
pTrace
(
"%p irate_function() lastValue:%
f
lastKey:%"
PRId64
,
pCtx
,
pRateInfo
->
lastValue
,
pRateInfo
->
lastKey
);
continue
;
}
if
((
INT64_MIN
==
pRateInfo
->
firstKey
)
||
(
INT64_MIN
==
pRateInfo
->
firstValue
)){
if
((
INT64_MIN
==
pRateInfo
->
firstKey
)
||
(
-
DBL_MAX
==
pRateInfo
->
firstValue
)){
pRateInfo
->
firstValue
=
v
;
pRateInfo
->
firstKey
=
primaryKey
[
i
];
pTrace
(
"%p irate_function() firstValue:%
"
PRId64
"
firstKey:%"
PRId64
,
pCtx
,
pRateInfo
->
firstValue
,
pRateInfo
->
firstKey
);
pTrace
(
"%p irate_function() firstValue:%
f
firstKey:%"
PRId64
,
pCtx
,
pRateInfo
->
firstValue
,
pRateInfo
->
firstKey
);
break
;
}
}
...
...
@@ -4728,19 +4744,25 @@ static void irate_function_f(SQLFunctionCtx *pCtx, int32_t index) {
SRateInfo
*
pRateInfo
=
(
SRateInfo
*
)
pResInfo
->
interResultBuf
;
TSKEY
*
primaryKey
=
pCtx
->
ptsList
;
int64_t
v
=
0
;
double
v
=
0
;
switch
(
pCtx
->
inputType
)
{
case
TSDB_DATA_TYPE_TINYINT
:
v
=
(
int64_t
)
GET_INT8_VAL
(
pData
);
v
=
(
double
)
GET_INT8_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
v
=
(
int64_t
)
GET_INT16_VAL
(
pData
);
v
=
(
double
)
GET_INT16_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_INT
:
v
=
(
int64_t
)
GET_INT32_VAL
(
pData
);
v
=
(
double
)
GET_INT32_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_BIGINT
:
v
=
(
int64_t
)
GET_INT64_VAL
(
pData
);
v
=
(
double
)
GET_INT64_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_FLOAT
:
v
=
(
double
)
GET_FLOAT_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
v
=
(
double
)
GET_DOUBLE_VAL
(
pData
);
break
;
default:
assert
(
0
);
...
...
@@ -4752,7 +4774,7 @@ static void irate_function_f(SQLFunctionCtx *pCtx, int32_t index) {
pRateInfo
->
lastValue
=
v
;
pRateInfo
->
lastKey
=
primaryKey
[
index
];
pTrace
(
"====%p irate_function_f() index:%d lastValue:%
"
PRId64
" lastKey:%"
PRId64
" firstValue:%"
PRId64
"
firstKey:%"
PRId64
,
pCtx
,
index
,
pRateInfo
->
lastValue
,
pRateInfo
->
lastKey
,
pRateInfo
->
firstValue
,
pRateInfo
->
firstKey
);
pTrace
(
"====%p irate_function_f() index:%d lastValue:%
f lastKey:%"
PRId64
" firstValue:%f
firstKey:%"
PRId64
,
pCtx
,
index
,
pRateInfo
->
lastValue
,
pRateInfo
->
lastKey
,
pRateInfo
->
firstValue
,
pRateInfo
->
firstKey
);
SET_VAL
(
pCtx
,
1
,
1
);
...
...
src/kit/taosdump/CMakeLists.txt
浏览文件 @
d0cb4390
...
...
@@ -10,11 +10,11 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY
(
. SRC
)
ADD_EXECUTABLE
(
taosdump
${
SRC
}
)
IF
(
TD_PAGMODE_LITE
)
#
IF (TD_PAGMODE_LITE)
TARGET_LINK_LIBRARIES
(
taosdump taos
)
ELSE
()
TARGET_LINK_LIBRARIES
(
taosdump taos_static
)
ENDIF
()
#
ELSE ()
#
TARGET_LINK_LIBRARIES(taosdump taos_static)
#
ENDIF ()
ENDIF
()
src/kit/taosdump/taosdump.c
浏览文件 @
d0cb4390
...
...
@@ -36,7 +36,7 @@
#include "tutil.h"
#define COMMAND_SIZE 65536
#define DEFAULT_DUMP_FILE "taosdump.sql"
//
#define DEFAULT_DUMP_FILE "taosdump.sql"
int
converStringToReadable
(
char
*
str
,
int
size
,
char
*
buf
,
int
bufsize
);
int
convertNCharToReadable
(
char
*
str
,
int
size
,
char
*
buf
,
int
bufsize
);
...
...
@@ -125,6 +125,16 @@ typedef struct {
STableRecord
tableRecord
;
}
STableRecordInfo
;
typedef
struct
{
pthread_t
threadID
;
int32_t
threadIndex
;
int32_t
totalThreads
;
char
dbName
[
TSDB_METER_NAME_LEN
+
1
];
void
*
taosCon
;
}
SThreadParaObj
;
static
int64_t
totalDumpOutRows
=
0
;
SDbInfo
**
dbInfos
=
NULL
;
const
char
*
argp_program_version
=
version
;
...
...
@@ -139,7 +149,7 @@ static char doc[] = "";
/* to force a line-break, e.g.\n<-- here."; */
/* A description of the arguments we accept. */
static
char
args_doc
[]
=
"dbname [tbname ...]
\n
--databases dbname ...
\n
--all-databases
\n
-i inp
ut_file
"
;
static
char
args_doc
[]
=
"dbname [tbname ...]
\n
--databases dbname ...
\n
--all-databases
\n
-i inp
ath
\n
-o outpath
"
;
/* Keys for options without short-options. */
#define OPT_ABORT 1
/* –abort */
...
...
@@ -147,53 +157,59 @@ static char args_doc[] = "dbname [tbname ...]\n--databases dbname ...\n--all-dat
/* The options we understand. */
static
struct
argp_option
options
[]
=
{
// connection option
{
"host"
,
'h'
,
"HOST"
,
0
,
"Server host dumping data from. Default is localhost."
,
0
},
{
"user"
,
'u'
,
"USER"
,
0
,
"User name used to connect to server. Default is root."
,
0
},
{
"password"
,
'p'
,
"PASSWORD"
,
0
,
"User password to connect to server. Default is taosdata."
,
0
},
{
"port"
,
'P'
,
"PORT"
,
0
,
"Port to connect"
,
0
},
{
"host"
,
'h'
,
"HOST"
,
0
,
"Server host dumping data from. Default is localhost."
,
0
},
{
"user"
,
'u'
,
"USER"
,
0
,
"User name used to connect to server. Default is root."
,
0
},
{
"password"
,
'p'
,
"PASSWORD"
,
0
,
"User password to connect to server. Default is taosdata."
,
0
},
{
"port"
,
'P'
,
"PORT"
,
0
,
"Port to connect"
,
0
},
{
"cversion"
,
'v'
,
"CVERION"
,
0
,
"client version"
,
0
},
// input/output file
{
"outp
ut"
,
'o'
,
"OUTPUT"
,
0
,
"Output file name."
,
1
},
{
"inp
ut"
,
'i'
,
"INPUT"
,
0
,
"Input file name."
,
1
},
{
"config"
,
'c'
,
"CONFIG_DIR"
,
0
,
"Configure directory. Default is /etc/taos/taos.cfg."
,
1
},
{
"encode"
,
'e'
,
"ENCODE"
,
0
,
"Input file encoding."
,
1
},
{
"outp
ath"
,
'o'
,
"OUTPATH"
,
0
,
"Output file path."
,
1
},
{
"inp
ath"
,
'i'
,
"INPATH"
,
0
,
"Input file path."
,
1
},
{
"config"
,
'c'
,
"CONFIG_DIR"
,
0
,
"Configure directory. Default is /etc/taos/taos.cfg."
,
1
},
{
"encode"
,
'e'
,
"ENCODE"
,
0
,
"Input file encoding."
,
1
},
// dump unit options
{
"all-databases"
,
'A'
,
0
,
0
,
"Dump all databases."
,
2
},
{
"databases"
,
'B'
,
0
,
0
,
"Dump assigned databases"
,
2
},
{
"all-databases"
,
'A'
,
0
,
0
,
"Dump all databases."
,
2
},
{
"databases"
,
'B'
,
0
,
0
,
"Dump assigned databases"
,
2
},
// dump format options
{
"schemaonly"
,
's'
,
0
,
0
,
"Only dump schema."
,
3
},
{
"with-property"
,
'M'
,
0
,
0
,
"Dump schema with properties."
,
3
},
{
"start-time"
,
'S'
,
"START_TIME"
,
0
,
"Start time to dump."
,
3
},
{
"end-time"
,
'E'
,
"END_TIME"
,
0
,
"End time to dump."
,
3
},
{
"data-batch"
,
'N'
,
"DATA_BATCH"
,
0
,
"Number of data point per insert statement. Default is 1."
,
3
},
{
"allow-sys"
,
'a'
,
0
,
0
,
"Allow to dump sys database"
,
3
},
{
"schemaonly"
,
's'
,
0
,
0
,
"Only dump schema."
,
3
},
{
"with-property"
,
'M'
,
0
,
0
,
"Dump schema with properties."
,
3
},
{
"start-time"
,
'S'
,
"START_TIME"
,
0
,
"Start time to dump."
,
3
},
{
"end-time"
,
'E'
,
"END_TIME"
,
0
,
"End time to dump."
,
3
},
{
"data-batch"
,
'N'
,
"DATA_BATCH"
,
0
,
"Number of data point per insert statement. Default is 1."
,
3
},
{
"table-batch"
,
'T'
,
"TABLE_BATCH"
,
0
,
"Number of table dumpout into one output file. Default is 1."
,
3
},
{
"thread_num"
,
't'
,
"THREAD_NUM"
,
0
,
"Number of thread for dump in file. Default is 5."
,
3
},
{
"allow-sys"
,
'a'
,
0
,
0
,
"Allow to dump sys database"
,
3
},
{
0
}};
/* Used by main to communicate with parse_opt. */
struct
arguments
{
// connection option
char
*
host
;
char
*
user
;
char
*
password
;
uint16_t
port
;
char
*
host
;
char
*
user
;
char
*
password
;
uint16_t
port
;
char
cversion
[
TSDB_FILENAME_LEN
+
1
];
// output file
char
output
[
TSDB_FILENAME_LEN
+
1
];
char
input
[
TSDB_FILENAME_LEN
+
1
];
char
*
encode
;
char
outpath
[
TSDB_FILENAME_LEN
+
1
];
char
inpath
[
TSDB_FILENAME_LEN
+
1
];
char
*
encode
;
// dump unit option
bool
all_databases
;
bool
databases
;
bool
all_databases
;
bool
databases
;
// dump format option
bool
schemaonly
;
bool
with_property
;
int64_t
start_time
;
int64_t
end_time
;
int
data_batch
;
bool
allow_sys
;
bool
schemaonly
;
bool
with_property
;
int64_t
start_time
;
int64_t
end_time
;
int32_t
data_batch
;
int32_t
table_batch
;
// num of table which will be dump into one output file.
bool
allow_sys
;
// other options
int
abort
;
char
**
arg_list
;
int
arg_list_len
;
bool
isDumpIn
;
int32_t
thread_num
;
int
abort
;
char
**
arg_list
;
int
arg_list_len
;
bool
isDumpIn
;
};
/* Parse a single option. */
...
...
@@ -220,13 +236,21 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case
'P'
:
arguments
->
port
=
atoi
(
arg
);
break
;
// output file
case
'v'
:
if
(
wordexp
(
arg
,
&
full_path
,
0
)
!=
0
)
{
fprintf
(
stderr
,
"Invalid client vesion %s
\n
"
,
arg
);
return
-
1
;
}
strcpy
(
arguments
->
cversion
,
full_path
.
we_wordv
[
0
]);
wordfree
(
&
full_path
);
break
;
// output file path
case
'o'
:
if
(
wordexp
(
arg
,
&
full_path
,
0
)
!=
0
)
{
fprintf
(
stderr
,
"Invalid path %s
\n
"
,
arg
);
return
-
1
;
}
strcpy
(
arguments
->
outp
ut
,
full_path
.
we_wordv
[
0
]);
strcpy
(
arguments
->
outp
ath
,
full_path
.
we_wordv
[
0
]);
wordfree
(
&
full_path
);
break
;
case
'i'
:
...
...
@@ -235,7 +259,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
fprintf
(
stderr
,
"Invalid path %s
\n
"
,
arg
);
return
-
1
;
}
strcpy
(
arguments
->
inp
ut
,
full_path
.
we_wordv
[
0
]);
strcpy
(
arguments
->
inp
ath
,
full_path
.
we_wordv
[
0
]);
wordfree
(
&
full_path
);
break
;
case
'c'
:
...
...
@@ -273,13 +297,19 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case
'N'
:
arguments
->
data_batch
=
atoi
(
arg
);
break
;
case
'T'
:
arguments
->
table_batch
=
atoi
(
arg
);
break
;
case
't'
:
arguments
->
thread_num
=
atoi
(
arg
);
break
;
case
OPT_ABORT
:
arguments
->
abort
=
1
;
break
;
case
ARGP_KEY_ARG
:
arguments
->
arg_list
=
&
state
->
argv
[
state
->
next
-
1
];
arguments
->
arg_list
=
&
state
->
argv
[
state
->
next
-
1
];
arguments
->
arg_list_len
=
state
->
argc
-
state
->
next
+
1
;
state
->
next
=
state
->
argc
;
state
->
next
=
state
->
argc
;
break
;
default:
...
...
@@ -291,53 +321,55 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
/* Our argp parser. */
static
struct
argp
argp
=
{
options
,
parse_opt
,
args_doc
,
doc
};
TAOS
*
taos
=
NULL
;
TAOS_RES
*
result
=
NULL
;
char
*
command
=
NULL
;
char
*
lcommand
=
NULL
;
char
*
buffer
=
NULL
;
int
taosDumpOut
(
struct
arguments
*
arguments
);
int
taosDumpIn
(
struct
arguments
*
arguments
);
void
taosDumpCreateDbClause
(
SDbInfo
*
dbInfo
,
bool
isDumpProperty
,
FILE
*
fp
);
int
taosDumpDb
(
SDbInfo
*
dbInfo
,
struct
arguments
*
arguments
,
FILE
*
fp
);
void
taosDumpCreateTableClause
(
STableDef
*
tableDes
,
int
numOfCols
,
struct
arguments
*
arguments
,
FILE
*
fp
);
void
taosDumpCreateMTableClause
(
STableDef
*
tableDes
,
char
*
metric
,
int
numOfCols
,
struct
arguments
*
arguments
,
FILE
*
fp
);
int32_t
taosDumpTable
(
char
*
table
,
char
*
metric
,
struct
arguments
*
arguments
,
FILE
*
fp
);
int32_t
taosDumpMetric
(
char
*
metric
,
struct
arguments
*
arguments
,
FILE
*
fp
);
int
taosDumpTableData
(
FILE
*
fp
,
char
*
tbname
,
struct
arguments
*
arguments
);
int
taosDumpDb
(
SDbInfo
*
dbInfo
,
struct
arguments
*
arguments
,
FILE
*
fp
,
TAOS
*
taosCon
);
void
taosDumpCreateTableClause
(
STableDef
*
tableDes
,
int
numOfCols
,
FILE
*
fp
);
void
taosDumpCreateMTableClause
(
STableDef
*
tableDes
,
char
*
metric
,
int
numOfCols
,
FILE
*
fp
);
int32_t
taosDumpTable
(
char
*
table
,
char
*
metric
,
struct
arguments
*
arguments
,
FILE
*
fp
,
TAOS
*
taosCon
);
int
taosDumpTableData
(
FILE
*
fp
,
char
*
tbname
,
struct
arguments
*
arguments
,
TAOS
*
taosCon
);
int
taosCheckParam
(
struct
arguments
*
arguments
);
void
taosFreeDbInfos
();
static
void
taosStartDumpOutWorkThreads
(
struct
arguments
*
args
,
int32_t
numOfThread
,
char
*
dbName
);
struct
arguments
tsArguments
=
{
// connection option
NULL
,
"root"
,
"taosdata"
,
0
,
""
,
// outpath and inpath
""
,
""
,
NULL
,
// dump unit option
false
,
false
,
// dump format option
false
,
false
,
0
,
INT64_MAX
,
1
,
1
,
false
,
// other options
5
,
0
,
NULL
,
0
,
false
};
int
main
(
int
argc
,
char
*
argv
[])
{
struct
arguments
arguments
=
{
// connection option
NULL
,
"root"
,
"taosdata"
,
0
,
// output file
DEFAULT_DUMP_FILE
,
DEFAULT_DUMP_FILE
,
NULL
,
// dump unit option
false
,
false
,
// dump format option
false
,
false
,
0
,
INT64_MAX
,
1
,
false
,
// other options
0
,
NULL
,
0
,
false
};
/* Parse our arguments; every option seen by parse_opt will be
reflected in arguments. */
argp_parse
(
&
argp
,
argc
,
argv
,
0
,
0
,
&
a
rguments
);
argp_parse
(
&
argp
,
argc
,
argv
,
0
,
0
,
&
tsA
rguments
);
if
(
a
rguments
.
abort
)
{
if
(
tsA
rguments
.
abort
)
{
#ifndef _ALPINE
error
(
10
,
0
,
"ABORTED"
);
#else
...
...
@@ -345,14 +377,47 @@ int main(int argc, char *argv[]) {
#endif
}
if
(
taosCheckParam
(
&
arguments
)
<
0
)
{
printf
(
"====== arguments config ======
\n
"
);
{
printf
(
"host: %s
\n
"
,
tsArguments
.
host
);
printf
(
"user: %s
\n
"
,
tsArguments
.
user
);
printf
(
"password: %s
\n
"
,
tsArguments
.
password
);
printf
(
"port: %u
\n
"
,
tsArguments
.
port
);
printf
(
"cversion: %s
\n
"
,
tsArguments
.
cversion
);
printf
(
"outpath: %s
\n
"
,
tsArguments
.
outpath
);
printf
(
"inpath: %s
\n
"
,
tsArguments
.
inpath
);
printf
(
"encode: %s
\n
"
,
tsArguments
.
encode
);
printf
(
"all_databases: %d
\n
"
,
tsArguments
.
all_databases
);
printf
(
"databases: %d
\n
"
,
tsArguments
.
databases
);
printf
(
"schemaonly: %d
\n
"
,
tsArguments
.
schemaonly
);
printf
(
"with_property: %d
\n
"
,
tsArguments
.
with_property
);
printf
(
"start_time: %"
PRId64
"
\n
"
,
tsArguments
.
start_time
);
printf
(
"end_time: %"
PRId64
"
\n
"
,
tsArguments
.
end_time
);
printf
(
"data_batch: %d
\n
"
,
tsArguments
.
data_batch
);
printf
(
"table_batch: %d
\n
"
,
tsArguments
.
table_batch
);
printf
(
"allow_sys: %d
\n
"
,
tsArguments
.
allow_sys
);
printf
(
"abort: %d
\n
"
,
tsArguments
.
abort
);
printf
(
"isDumpIn: %d
\n
"
,
tsArguments
.
isDumpIn
);
printf
(
"arg_list_len: %d
\n
"
,
tsArguments
.
arg_list_len
);
for
(
int32_t
i
=
0
;
i
<
tsArguments
.
arg_list_len
;
i
++
)
{
printf
(
"arg_list[%d]: %s
\n
"
,
i
,
tsArguments
.
arg_list
[
i
]);
}
}
printf
(
"==============================
\n
"
);
if
(
tsArguments
.
cversion
[
0
]
!=
0
){
strcpy
(
version
,
tsArguments
.
cversion
);
}
if
(
taosCheckParam
(
&
tsArguments
)
<
0
)
{
exit
(
EXIT_FAILURE
);
}
if
(
a
rguments
.
isDumpIn
)
{
if
(
taosDumpIn
(
&
a
rguments
)
<
0
)
return
-
1
;
if
(
tsA
rguments
.
isDumpIn
)
{
if
(
taosDumpIn
(
&
tsA
rguments
)
<
0
)
return
-
1
;
}
else
{
if
(
taosDumpOut
(
&
a
rguments
)
<
0
)
return
-
1
;
if
(
taosDumpOut
(
&
tsA
rguments
)
<
0
)
return
-
1
;
}
return
0
;
...
...
@@ -364,21 +429,31 @@ void taosFreeDbInfos() {
tfree
(
dbInfos
);
}
int
taosGetTableRecordInfo
(
char
*
table
,
STableRecordInfo
*
pTableRecordInfo
)
{
// check table is normal table or super table
int
taosGetTableRecordInfo
(
char
*
table
,
STableRecordInfo
*
pTableRecordInfo
,
TAOS
*
taosCon
)
{
TAOS_ROW
row
=
NULL
;
bool
isSet
=
false
;
TAOS_RES
*
result
=
NULL
;
memset
(
pTableRecordInfo
,
0
,
sizeof
(
STableRecordInfo
));
sprintf
(
command
,
"show tables like %s"
,
table
);
if
(
t
aos_query
(
taos
,
command
)
!=
0
)
{
fprintf
(
stderr
,
"failed to
run command %s
\n
"
,
command
);
char
*
tempCommand
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
t
empCommand
==
NULL
)
{
fprintf
(
stderr
,
"failed to
allocate memory
\n
"
);
return
-
1
;
}
result
=
taos_use_result
(
taos
);
sprintf
(
tempCommand
,
"show tables like %s"
,
table
);
if
(
taos_query
(
taosCon
,
tempCommand
)
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s
\n
"
,
tempCommand
);
free
(
tempCommand
);
return
-
1
;
}
result
=
taos_use_result
(
taosCon
);
if
(
result
==
NULL
)
{
fprintf
(
stderr
,
"failed to use result
\n
"
);
free
(
tempCommand
);
return
-
1
;
}
...
...
@@ -397,17 +472,22 @@ int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo) {
taos_free_result
(
result
);
result
=
NULL
;
if
(
isSet
)
return
0
;
sprintf
(
command
,
"show stables like %s"
,
table
);
if
(
taos_query
(
taos
,
command
)
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s
\n
"
,
command
);
if
(
isSet
)
{
free
(
tempCommand
);
return
0
;
}
sprintf
(
tempCommand
,
"show stables like %s"
,
table
);
if
(
taos_query
(
taosCon
,
tempCommand
)
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s
\n
"
,
tempCommand
);
free
(
tempCommand
);
return
-
1
;
}
result
=
taos_use_result
(
taos
);
result
=
taos_use_result
(
taos
Con
);
if
(
result
==
NULL
)
{
fprintf
(
stderr
,
"failed to use result
\n
"
);
free
(
tempCommand
);
return
-
1
;
}
...
...
@@ -421,22 +501,130 @@ int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo) {
taos_free_result
(
result
);
result
=
NULL
;
if
(
isSet
)
return
0
;
if
(
isSet
)
{
free
(
tempCommand
);
return
0
;
}
fprintf
(
stderr
,
"invalid table/metric %s
\n
"
,
table
);
free
(
tempCommand
);
return
-
1
;
}
int32_t
taosSaveAllNormalTableToTempFile
(
TAOS
*
taosCon
,
char
*
meter
,
int
*
fd
)
{
STableRecord
tableRecord
;
if
(
-
1
==
*
fd
)
{
*
fd
=
open
(
".tables.tmp.0"
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRGRP
|
S_IXGRP
|
S_IROTH
);
if
(
*
fd
==
-
1
)
{
fprintf
(
stderr
,
"failed to open temp file: .tables.tmp.0
\n
"
);
return
-
1
;
}
}
memset
(
tableRecord
.
name
,
0
,
sizeof
(
STableRecord
));
strcpy
(
tableRecord
.
name
,
meter
);
twrite
(
*
fd
,
&
tableRecord
,
sizeof
(
STableRecord
));
return
0
;
}
int32_t
taosSaveTableOfMetricToTempFile
(
TAOS
*
taosCon
,
char
*
metric
,
struct
arguments
*
arguments
,
int32_t
*
totalNumOfThread
)
{
TAOS_ROW
row
;
int
fd
=
-
1
;
STableRecord
tableRecord
;
char
*
tmpCommand
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
tmpCommand
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
return
-
1
;
}
sprintf
(
tmpCommand
,
"select tbname from %s"
,
metric
);
if
(
taos_query
(
taosCon
,
tmpCommand
)
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s
\n
"
,
tmpCommand
);
free
(
tmpCommand
);
return
-
1
;
}
TAOS_RES
*
tmpResult
=
NULL
;
tmpResult
=
taos_use_result
(
taosCon
);
if
(
tmpResult
==
NULL
)
{
fprintf
(
stderr
,
"failed to use result
\n
"
);
free
(
tmpCommand
);
return
-
1
;
}
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
tmpResult
);
int32_t
numOfTable
=
0
;
int32_t
numOfThread
=
*
totalNumOfThread
;
char
tmpFileName
[
TSDB_FILENAME_LEN
+
1
];
while
((
row
=
taos_fetch_row
(
tmpResult
))
!=
NULL
)
{
if
(
0
==
numOfTable
)
{
memset
(
tmpFileName
,
0
,
TSDB_FILENAME_LEN
);
sprintf
(
tmpFileName
,
".tables.tmp.%d"
,
numOfThread
);
fd
=
open
(
tmpFileName
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRGRP
|
S_IXGRP
|
S_IROTH
);
if
(
fd
==
-
1
)
{
fprintf
(
stderr
,
"failed to open temp file: %s
\n
"
,
tmpFileName
);
taos_free_result
(
tmpResult
);
for
(
int32_t
loopCnt
=
0
;
loopCnt
<
numOfThread
;
loopCnt
++
)
{
sprintf
(
tmpFileName
,
".tables.tmp.%d"
,
loopCnt
);
remove
(
tmpFileName
);
}
free
(
tmpCommand
);
return
-
1
;
}
numOfThread
++
;
}
memset
(
tableRecord
.
name
,
0
,
sizeof
(
STableRecord
));
strncpy
(
tableRecord
.
name
,
(
char
*
)
row
[
0
],
fields
[
0
].
bytes
);
strcpy
(
tableRecord
.
metric
,
metric
);
twrite
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
));
numOfTable
++
;
if
(
numOfTable
>=
arguments
->
table_batch
)
{
numOfTable
=
0
;
tclose
(
fd
);
fd
=
-
1
;
}
}
tclose
(
fd
);
fd
=
-
1
;
taos_free_result
(
tmpResult
);
*
totalNumOfThread
=
numOfThread
;
free
(
tmpCommand
);
return
0
;
}
int
taosDumpOut
(
struct
arguments
*
arguments
)
{
TAOS
*
taos
=
NULL
;
TAOS_RES
*
result
=
NULL
;
char
*
command
=
NULL
;
TAOS_ROW
row
;
char
*
temp
=
NULL
;
FILE
*
fp
=
NULL
;
int
count
=
0
;
int
32_t
count
=
0
;
STableRecordInfo
tableRecordInfo
;
fp
=
fopen
(
arguments
->
output
,
"w"
);
char
tmpBuf
[
TSDB_FILENAME_LEN
+
1
]
=
{
0
};
if
(
arguments
->
outpath
[
0
]
!=
0
)
{
sprintf
(
tmpBuf
,
"%s/dbs.sql"
,
arguments
->
outpath
);
}
else
{
sprintf
(
tmpBuf
,
"dbs.sql"
);
}
fp
=
fopen
(
tmpBuf
,
"w"
);
if
(
fp
==
NULL
)
{
fprintf
(
stderr
,
"failed to open file %s
\n
"
,
arguments
->
output
);
fprintf
(
stderr
,
"failed to open file %s
\n
"
,
tmpBuf
);
return
-
1
;
}
...
...
@@ -446,15 +634,12 @@ int taosDumpOut(struct arguments *arguments) {
goto
_exit_failure
;
}
temp
=
(
char
*
)
malloc
(
2
*
COMMAND_SIZE
);
if
(
temp
==
NULL
)
{
command
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
command
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
goto
_exit_failure
;
}
command
=
temp
;
buffer
=
command
+
COMMAND_SIZE
;
/* Connect to server */
taos
=
taos_connect
(
arguments
->
host
,
arguments
->
user
,
arguments
->
password
,
NULL
,
arguments
->
port
);
if
(
taos
==
NULL
)
{
...
...
@@ -482,18 +667,19 @@ int taosDumpOut(struct arguments *arguments) {
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
result
);
while
((
row
=
taos_fetch_row
(
result
))
!=
NULL
)
{
// sys database name : 'monitor', but subsequent version changed to 'log'
if
(
strncasecmp
(
row
[
TSDB_SHOW_DB_NAME_INDEX
],
"monitor"
,
fields
[
TSDB_SHOW_DB_NAME_INDEX
].
bytes
)
==
0
&&
(
!
arguments
->
allow_sys
))
continue
;
if
(
arguments
->
databases
)
{
if
(
arguments
->
databases
)
{
// input multi dbs
for
(
int
i
=
0
;
arguments
->
arg_list
[
i
];
i
++
)
{
if
(
strncasecmp
(
arguments
->
arg_list
[
i
],
(
char
*
)
row
[
TSDB_SHOW_DB_NAME_INDEX
],
fields
[
TSDB_SHOW_DB_NAME_INDEX
].
bytes
)
==
0
)
goto
_dump_db_point
;
}
continue
;
}
else
if
(
!
arguments
->
all_databases
)
{
}
else
if
(
!
arguments
->
all_databases
)
{
// only input one db
if
(
strncasecmp
(
arguments
->
arg_list
[
0
],
(
char
*
)
row
[
TSDB_SHOW_DB_NAME_INDEX
],
fields
[
TSDB_SHOW_DB_NAME_INDEX
].
bytes
)
==
0
)
goto
_dump_db_point
;
...
...
@@ -510,19 +696,19 @@ int taosDumpOut(struct arguments *arguments) {
}
strncpy
(
dbInfos
[
count
]
->
name
,
(
char
*
)
row
[
TSDB_SHOW_DB_NAME_INDEX
],
fields
[
TSDB_SHOW_DB_NAME_INDEX
].
bytes
);
if
(
strcmp
(
arguments
->
user
,
"root"
)
==
0
)
{
dbInfos
[
count
]
->
replica
=
(
int
)(
*
((
int16_t
*
)
row
[
TSDB_SHOW_DB_REPLICA_INDEX
]));
dbInfos
[
count
]
->
days
=
(
int
)(
*
((
int16_t
*
)
row
[
TSDB_SHOW_DB_DAYS_INDEX
]));
dbInfos
[
count
]
->
keep
=
*
((
int
*
)
row
[
TSDB_SHOW_DB_KEEP_INDEX
]);
dbInfos
[
count
]
->
tables
=
*
((
int
*
)
row
[
TSDB_SHOW_DB_TABLES_INDEX
]);
dbInfos
[
count
]
->
rows
=
*
((
int
*
)
row
[
TSDB_SHOW_DB_ROWS_INDEX
]);
dbInfos
[
count
]
->
cache
=
*
((
int
*
)
row
[
TSDB_SHOW_DB_CACHE_INDEX
]);
dbInfos
[
count
]
->
ablocks
=
*
((
int
*
)
row
[
TSDB_SHOW_DB_ABLOCKS_INDEX
]);
dbInfos
[
count
]
->
tblocks
=
(
int
)(
*
((
int16_t
*
)
row
[
TSDB_SHOW_DB_TBLOCKS_INDEX
]));
dbInfos
[
count
]
->
ctime
=
*
((
int
*
)
row
[
TSDB_SHOW_DB_CTIME_INDEX
]);
dbInfos
[
count
]
->
clog
=
(
int
)(
*
((
int8_t
*
)
row
[
TSDB_SHOW_DB_CLOG_INDEX
]));
dbInfos
[
count
]
->
comp
=
(
int
)(
*
((
int8_t
*
)
row
[
TSDB_SHOW_DB_COMP_INDEX
]));
}
#if 0
dbInfos[count]->replica = (int)(*((int16_t *)row[TSDB_SHOW_DB_REPLICA_INDEX]));
dbInfos[count]->days = (int)(*((int16_t *)row[TSDB_SHOW_DB_DAYS_INDEX]));
dbInfos[count]->keep = *((int *)row[TSDB_SHOW_DB_KEEP_INDEX]);
dbInfos[count]->tables = *((int *)row[TSDB_SHOW_DB_TABLES_INDEX]);
dbInfos[count]->rows = *((int *)row[TSDB_SHOW_DB_ROWS_INDEX]);
dbInfos[count]->cache = *((int *)row[TSDB_SHOW_DB_CACHE_INDEX]);
dbInfos[count]->ablocks = *((int *)row[TSDB_SHOW_DB_ABLOCKS_INDEX]);
dbInfos[count]->tblocks = (int)(*((int16_t *)row[TSDB_SHOW_DB_TBLOCKS_INDEX]));
dbInfos[count]->ctime = *((int *)row[TSDB_SHOW_DB_CTIME_INDEX]);
dbInfos[count]->clog = (int)(*((int8_t *)row[TSDB_SHOW_DB_CLOG_INDEX]));
dbInfos[count]->comp = (int)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX]));
#endif
count
++
;
...
...
@@ -534,21 +720,19 @@ int taosDumpOut(struct arguments *arguments) {
}
}
taos_free_result
(
result
);
if
(
count
==
0
)
{
fprintf
(
stderr
,
"No databases valid to dump
\n
"
);
goto
_exit_failure
;
}
if
(
arguments
->
databases
||
arguments
->
all_databases
)
{
if
(
arguments
->
databases
||
arguments
->
all_databases
)
{
// case: taosdump --databases dbx dby ... OR taosdump --all-databases
for
(
int
i
=
0
;
i
<
count
;
i
++
)
{
taosDumpDb
(
dbInfos
[
i
],
arguments
,
fp
);
taosDumpDb
(
dbInfos
[
i
],
arguments
,
fp
,
taos
);
}
}
else
{
if
(
arguments
->
arg_list_len
==
1
)
{
taosDumpDb
(
dbInfos
[
0
],
arguments
,
fp
);
}
else
{
if
(
arguments
->
arg_list_len
==
1
)
{
// case: taosdump <db>
taosDumpDb
(
dbInfos
[
0
],
arguments
,
fp
,
taos
);
}
else
{
// case: taosdump <db> tablex tabley ...
taosDumpCreateDbClause
(
dbInfos
[
0
],
arguments
->
with_property
,
fp
);
sprintf
(
command
,
"use %s"
,
dbInfos
[
0
]
->
name
);
...
...
@@ -559,18 +743,42 @@ int taosDumpOut(struct arguments *arguments) {
fprintf
(
fp
,
"USE %s;
\n\n
"
,
dbInfos
[
0
]
->
name
);
int32_t
totalNumOfThread
=
1
;
// 0: all normal talbe into .tables.tmp.0
int
normalTblFd
=
-
1
;
int32_t
retCode
;
for
(
int
i
=
1
;
arguments
->
arg_list
[
i
];
i
++
)
{
if
(
taosGetTableRecordInfo
(
arguments
->
arg_list
[
i
],
&
tableRecordInfo
)
<
0
)
{
fprintf
(
stderr
,
"invalide table %s
\n
"
,
arguments
->
arg_list
[
i
]);
if
(
taosGetTableRecordInfo
(
arguments
->
arg_list
[
i
],
&
tableRecordInfo
,
taos
)
<
0
)
{
fprintf
(
stderr
,
"in
put the in
valide table %s
\n
"
,
arguments
->
arg_list
[
i
]);
continue
;
}
if
(
tableRecordInfo
.
isMetric
)
{
// dump whole metric
taosDumpMetric
(
tableRecordInfo
.
tableRecord
.
metric
,
arguments
,
fp
);
}
else
{
// dump MTable and NTable
taosDumpTable
(
tableRecordInfo
.
tableRecord
.
name
,
tableRecordInfo
.
tableRecord
.
metric
,
arguments
,
fp
);
if
(
tableRecordInfo
.
isMetric
)
{
// dump all table of this metric
retCode
=
taosSaveTableOfMetricToTempFile
(
taos
,
tableRecordInfo
.
tableRecord
.
metric
,
arguments
,
&
totalNumOfThread
);
}
else
{
// dump this normal meter
retCode
=
taosSaveAllNormalTableToTempFile
(
taos
,
tableRecordInfo
.
tableRecord
.
name
,
&
normalTblFd
);
}
if
(
retCode
<
0
)
{
if
(
-
1
!=
normalTblFd
){
tclose
(
normalTblFd
);
}
goto
_clean_tmp_file
;
}
}
if
(
-
1
!=
normalTblFd
){
tclose
(
normalTblFd
);
}
// start multi threads to dumpout
taosStartDumpOutWorkThreads
(
arguments
,
totalNumOfThread
,
dbInfos
[
0
]
->
name
);
char
tmpFileName
[
TSDB_FILENAME_LEN
+
1
];
_clean_tmp_file:
for
(
int
loopCnt
=
0
;
loopCnt
<
totalNumOfThread
;
loopCnt
++
)
{
sprintf
(
tmpFileName
,
".tables.tmp.%d"
,
loopCnt
);
remove
(
tmpFileName
);
}
}
}
...
...
@@ -578,346 +786,579 @@ int taosDumpOut(struct arguments *arguments) {
fclose
(
fp
);
taos_close
(
taos
);
taos_free_result
(
result
);
free
(
temp
);
taosFreeDbInfos
();
tfree
(
command
);
taosFreeDbInfos
();
fprintf
(
stderr
,
"dump out rows: %"
PRId64
"
\n
"
,
totalDumpOutRows
);
return
0
;
_exit_failure:
fclose
(
fp
);
taos_close
(
taos
);
taos_free_result
(
result
);
free
(
temp
);
tfree
(
command
);
taosFreeDbInfos
();
fprintf
(
stderr
,
"dump out rows: %"
PRId64
"
\n
"
,
totalDumpOutRows
);
return
-
1
;
}
void
taosDumpCreateDbClause
(
SDbInfo
*
dbInfo
,
bool
isDumpProperty
,
FILE
*
fp
)
{
char
*
pstr
=
buffer
;
pstr
+=
sprintf
(
pstr
,
"CREATE DATABASE IF NOT EXISTS %s"
,
dbInfo
->
name
);
if
(
isDumpProperty
)
{
pstr
+=
sprintf
(
pstr
,
" REPLICA %d DAYS %d KEEP %d TABLES %d ROWS %d CACHE %d ABLOCKS %d TBLOCKS %d CTIME %d CLOG %d COMP %d"
,
dbInfo
->
replica
,
dbInfo
->
days
,
dbInfo
->
keep
,
dbInfo
->
tables
,
dbInfo
->
rows
,
dbInfo
->
cache
,
dbInfo
->
ablocks
,
dbInfo
->
tblocks
,
dbInfo
->
ctime
,
dbInfo
->
clog
,
dbInfo
->
comp
);
}
fprintf
(
fp
,
"%s
\n\n
"
,
buffer
);
}
int
taosDumpDb
(
SDbInfo
*
dbInfo
,
struct
arguments
*
arguments
,
FILE
*
fp
)
{
TAOS_ROW
row
;
int
fd
=
-
1
;
STableRecord
tableRecord
;
taosDumpCreateDbClause
(
dbInfo
,
arguments
->
with_property
,
fp
);
int
taosGetTableDes
(
char
*
table
,
STableDef
*
tableDes
,
TAOS
*
taosCon
)
{
TAOS_ROW
row
=
NULL
;
TAOS_RES
*
tmpResult
=
NULL
;
int
count
=
0
;
sprintf
(
command
,
"use %s"
,
dbInfo
->
name
);
if
(
t
aos_query
(
taos
,
command
)
!=
0
)
{
fprintf
(
stderr
,
"
invalid database %s
\n
"
,
dbInfo
->
name
);
char
*
tempCommand
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
t
empCommand
==
NULL
)
{
fprintf
(
stderr
,
"
failed to allocate memory
\n
"
);
return
-
1
;
}
fprintf
(
fp
,
"USE %s
\n\n
"
,
dbInfo
->
name
);
sprintf
(
command
,
"show tables"
);
if
(
taos_query
(
taos
,
command
)
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s
\n
"
,
command
);
sprintf
(
tempCommand
,
"describe %s"
,
table
);
if
(
taos_query
(
taosCon
,
tempCommand
)
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s
\n
"
,
tempCommand
);
free
(
tempCommand
);
return
-
1
;
}
result
=
taos_use_result
(
taos
);
if
(
r
esult
==
NULL
)
{
tmpResult
=
taos_use_result
(
taosCon
);
if
(
tmpR
esult
==
NULL
)
{
fprintf
(
stderr
,
"failed to use result
\n
"
);
free
(
tempCommand
);
return
-
1
;
}
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
r
esult
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
tmpR
esult
);
fd
=
open
(
".table.tmp"
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRGRP
|
S_IXGRP
|
S_IROTH
);
if
(
fd
==
-
1
)
{
fprintf
(
stderr
,
"failed to open temp file
\n
"
);
taos_free_result
(
result
);
return
-
1
;
}
strcpy
(
tableDes
->
name
,
table
);
while
((
row
=
taos_fetch_row
(
result
))
!=
NULL
)
{
memset
(
&
tableRecord
,
0
,
sizeof
(
STableRecord
));
strncpy
(
tableRecord
.
name
,
(
char
*
)
row
[
TSDB_SHOW_TABLES_NAME_INDEX
],
fields
[
TSDB_SHOW_TABLES_NAME_INDEX
].
bytes
);
strncpy
(
tableRecord
.
metric
,
(
char
*
)
row
[
TSDB_SHOW_TABLES_METRIC_INDEX
],
fields
[
TSDB_SHOW_TABLES_METRIC_INDEX
].
bytes
);
while
((
row
=
taos_fetch_row
(
tmpResult
))
!=
NULL
)
{
strncpy
(
tableDes
->
cols
[
count
].
field
,
(
char
*
)
row
[
TSDB_DESCRIBE_METRIC_FIELD_INDEX
],
fields
[
TSDB_DESCRIBE_METRIC_FIELD_INDEX
].
bytes
);
strncpy
(
tableDes
->
cols
[
count
].
type
,
(
char
*
)
row
[
TSDB_DESCRIBE_METRIC_TYPE_INDEX
],
fields
[
TSDB_DESCRIBE_METRIC_TYPE_INDEX
].
bytes
);
tableDes
->
cols
[
count
].
length
=
*
((
int
*
)
row
[
TSDB_DESCRIBE_METRIC_LENGTH_INDEX
]);
strncpy
(
tableDes
->
cols
[
count
].
note
,
(
char
*
)
row
[
TSDB_DESCRIBE_METRIC_NOTE_INDEX
],
fields
[
TSDB_DESCRIBE_METRIC_NOTE_INDEX
].
bytes
);
twrite
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
))
;
count
++
;
}
taos_free_result
(
result
);
lseek
(
fd
,
0
,
SEEK_SET
);
while
(
read
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
))
>
0
)
{
taosDumpTable
(
tableRecord
.
name
,
tableRecord
.
metric
,
arguments
,
fp
);
}
taos_free_result
(
tmpResult
);
tmpResult
=
NULL
;
tclose
(
fd
);
remove
(
".table.tmp"
);
free
(
tempCommand
);
return
0
;
return
count
;
}
void
taosDumpCreateTableClause
(
STableDef
*
tableDes
,
int
numOfCols
,
struct
arguments
*
arguments
,
FILE
*
fp
)
{
char
*
pstr
=
NULL
;
pstr
=
buffer
;
int
counter
=
0
;
int
count_temp
=
0
;
int32_t
taosDumpTable
(
char
*
table
,
char
*
metric
,
struct
arguments
*
arguments
,
FILE
*
fp
,
TAOS
*
taosCon
)
{
int
count
=
0
;
pstr
+=
sprintf
(
buffer
,
"CREATE TABLE IF NOT EXISTS %s"
,
tableDes
->
name
);
STableDef
*
tableDes
=
(
STableDef
*
)
calloc
(
1
,
sizeof
(
STableDef
)
+
sizeof
(
SColDes
)
*
TSDB_MAX_COLUMNS
);
for
(;
counter
<
numOfCols
;
counter
++
)
{
if
(
tableDes
->
cols
[
counter
].
note
[
0
]
!=
'\0'
)
break
;
if
(
metric
!=
NULL
&&
metric
[
0
]
!=
'\0'
)
{
// dump table schema which is created by using super table
/*
count = taosGetTableDes(metric, tableDes, taosCon);
if
(
counter
==
0
)
{
pstr
+=
sprintf
(
pstr
,
" (%s %s"
,
tableDes
->
cols
[
counter
].
field
,
tableDes
->
cols
[
counter
].
type
);
}
else
{
pstr
+=
sprintf
(
pstr
,
", %s %s"
,
tableDes
->
cols
[
counter
].
field
,
tableDes
->
cols
[
counter
].
type
);
if (count < 0) {
free(tableDes);
return -1;
}
if
(
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"binary"
)
==
0
||
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"nchar"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
"(%d)"
,
tableDes
->
cols
[
counter
].
length
);
}
}
taosDumpCreateTableClause(tableDes, count, fp);
count_temp
=
counter
;
memset(tableDes, 0, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS);
*/
for
(;
counter
<
numOfCols
;
counter
++
)
{
if
(
counter
==
count_temp
)
{
pstr
+=
sprintf
(
pstr
,
") TAGS (%s %s"
,
tableDes
->
cols
[
counter
].
field
,
tableDes
->
cols
[
counter
].
type
);
}
else
{
pstr
+=
sprintf
(
pstr
,
", %s %s"
,
tableDes
->
cols
[
counter
].
field
,
tableDes
->
cols
[
counter
].
type
)
;
count
=
taosGetTableDes
(
table
,
tableDes
,
taosCon
);
if
(
count
<
0
)
{
free
(
tableDes
);
return
-
1
;
}
if
(
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"binary"
)
==
0
||
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"nchar"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
"(%d)"
,
tableDes
->
cols
[
counter
].
length
);
taosDumpCreateMTableClause
(
tableDes
,
metric
,
count
,
fp
);
}
else
{
// dump table definition
count
=
taosGetTableDes
(
table
,
tableDes
,
taosCon
);
if
(
count
<
0
)
{
free
(
tableDes
);
return
-
1
;
}
taosDumpCreateTableClause
(
tableDes
,
count
,
fp
);
}
pstr
+=
sprintf
(
pstr
,
")"
);
free
(
tableDes
);
fprintf
(
fp
,
"%s
\n\n
"
,
buffer
);
return
taosDumpTableData
(
fp
,
table
,
arguments
,
taosCon
);
}
void
taosDumpCreateMTableClause
(
STableDef
*
tableDes
,
char
*
metric
,
int
numOfCols
,
struct
arguments
*
arguments
,
FILE
*
fp
)
{
char
*
pstr
=
NULL
;
pstr
=
buffer
;
int
counter
=
0
;
int
count_temp
=
0
;
pstr
+=
sprintf
(
buffer
,
"CREATE TABLE IF NOT EXISTS %s USING %s TAGS ("
,
tableDes
->
name
,
metric
);
void
taosDumpCreateDbClause
(
SDbInfo
*
dbInfo
,
bool
isDumpProperty
,
FILE
*
fp
)
{
for
(;
counter
<
numOfCols
;
counter
++
)
{
if
(
tableDes
->
cols
[
counter
].
note
[
0
]
!=
'\0'
)
break
;
char
*
tmpCommand
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
tmpCommand
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
return
;
}
assert
(
counter
<
numOfCols
);
count_temp
=
counter
;
for
(;
counter
<
numOfCols
;
counter
++
)
{
if
(
counter
!=
count_temp
)
{
if
(
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"binary"
)
==
0
||
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"nchar"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
",
\'
%s
\'
"
,
tableDes
->
cols
[
counter
].
note
);
}
else
{
pstr
+=
sprintf
(
pstr
,
", %s"
,
tableDes
->
cols
[
counter
].
note
);
}
}
else
{
if
(
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"binary"
)
==
0
||
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"nchar"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
"
\'
%s
\'
"
,
tableDes
->
cols
[
counter
].
note
);
}
else
{
pstr
+=
sprintf
(
pstr
,
"%s"
,
tableDes
->
cols
[
counter
].
note
);
}
/* pstr += sprintf(pstr, "%s", tableDes->cols[counter].note); */
}
char
*
pstr
=
tmpCommand
;
/* if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || strcasecmp(tableDes->cols[counter].type, "nchar")
* == 0) { */
/* pstr += sprintf(pstr, "(%d)", tableDes->cols[counter].length); */
/* } */
pstr
+=
sprintf
(
pstr
,
"CREATE DATABASE IF NOT EXISTS %s"
,
dbInfo
->
name
);
if
(
isDumpProperty
)
{
pstr
+=
sprintf
(
pstr
,
" REPLICA %d DAYS %d KEEP %d TABLES %d ROWS %d CACHE %d ABLOCKS %d TBLOCKS %d CTIME %d CLOG %d COMP %d"
,
dbInfo
->
replica
,
dbInfo
->
days
,
dbInfo
->
keep
,
dbInfo
->
tables
,
dbInfo
->
rows
,
dbInfo
->
cache
,
dbInfo
->
ablocks
,
dbInfo
->
tblocks
,
dbInfo
->
ctime
,
dbInfo
->
clog
,
dbInfo
->
comp
);
}
pstr
+=
sprintf
(
pstr
,
")"
);
fprintf
(
fp
,
"%s
\n\n
"
,
buffer
);
fprintf
(
fp
,
"%s
\n\n
"
,
tmpCommand
);
free
(
tmpCommand
);
}
int
taosGetTableDes
(
char
*
table
,
STableDef
*
tableDes
)
{
TAOS_ROW
row
=
NULL
;
int
count
=
0
;
sprintf
(
command
,
"describe %s"
,
table
);
if
(
taos_query
(
taos
,
command
)
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s
\n
"
,
command
);
return
-
1
;
void
*
taosDumpOutWorkThreadFp
(
void
*
arg
)
{
SThreadParaObj
*
pThread
=
(
SThreadParaObj
*
)
arg
;
STableRecord
tableRecord
;
int
fd
;
char
tmpFileName
[
TSDB_FILENAME_LEN
+
1
]
=
{
0
};
sprintf
(
tmpFileName
,
".tables.tmp.%d"
,
pThread
->
threadIndex
);
fd
=
open
(
tmpFileName
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRGRP
|
S_IXGRP
|
S_IROTH
);
if
(
fd
==
-
1
)
{
fprintf
(
stderr
,
"taosDumpTableFp() failed to open temp file: %s
\n
"
,
tmpFileName
);
return
NULL
;
}
result
=
taos_use_result
(
taos
);
if
(
result
==
NULL
)
{
fprintf
(
stderr
,
"failed to use result
\n
"
);
return
-
1
;
FILE
*
fp
=
NULL
;
memset
(
tmpFileName
,
0
,
TSDB_FILENAME_LEN
);
if
(
tsArguments
.
outpath
[
0
]
!=
0
)
{
sprintf
(
tmpFileName
,
"%s/%s.tables.%d.sql"
,
tsArguments
.
outpath
,
pThread
->
dbName
,
pThread
->
threadIndex
);
}
else
{
sprintf
(
tmpFileName
,
"%s.tables.%d.sql"
,
pThread
->
dbName
,
pThread
->
threadIndex
);
}
fp
=
fopen
(
tmpFileName
,
"w"
);
if
(
fp
==
NULL
)
{
fprintf
(
stderr
,
"failed to open file %s
\n
"
,
tmpFileName
);
return
NULL
;
}
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
result
);
memset
(
tmpFileName
,
0
,
TSDB_FILENAME_LEN
);
sprintf
(
tmpFileName
,
"use %s"
,
pThread
->
dbName
);
if
(
taos_query
(
pThread
->
taosCon
,
tmpFileName
)
!=
0
)
{
fprintf
(
stderr
,
"invalid database %s
\n
"
,
pThread
->
dbName
);
return
NULL
;
}
strcpy
(
tableDes
->
name
,
table
);
fprintf
(
fp
,
"USE %s
\n\n
"
,
pThread
->
dbName
);
while
(
read
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
))
>
0
)
{
taosDumpTable
(
tableRecord
.
name
,
tableRecord
.
metric
,
&
tsArguments
,
fp
,
pThread
->
taosCon
);
}
while
((
row
=
taos_fetch_row
(
result
))
!=
NULL
)
{
strncpy
(
tableDes
->
cols
[
count
].
field
,
(
char
*
)
row
[
TSDB_DESCRIBE_METRIC_FIELD_INDEX
],
fields
[
TSDB_DESCRIBE_METRIC_FIELD_INDEX
].
bytes
);
strncpy
(
tableDes
->
cols
[
count
].
type
,
(
char
*
)
row
[
TSDB_DESCRIBE_METRIC_TYPE_INDEX
],
fields
[
TSDB_DESCRIBE_METRIC_TYPE_INDEX
].
bytes
);
tableDes
->
cols
[
count
].
length
=
*
((
int
*
)
row
[
TSDB_DESCRIBE_METRIC_LENGTH_INDEX
]);
strncpy
(
tableDes
->
cols
[
count
].
note
,
(
char
*
)
row
[
TSDB_DESCRIBE_METRIC_NOTE_INDEX
],
fields
[
TSDB_DESCRIBE_METRIC_NOTE_INDEX
].
bytes
);
tclose
(
fd
);
fclose
(
fp
);
count
++
;
return
NULL
;
}
static
void
taosStartDumpOutWorkThreads
(
struct
arguments
*
args
,
int32_t
numOfThread
,
char
*
dbName
)
{
pthread_attr_t
thattr
;
SThreadParaObj
*
threadObj
=
(
SThreadParaObj
*
)
calloc
(
numOfThread
,
sizeof
(
SThreadParaObj
));
for
(
int
t
=
0
;
t
<
numOfThread
;
++
t
)
{
SThreadParaObj
*
pThread
=
threadObj
+
t
;
pThread
->
threadIndex
=
t
;
pThread
->
totalThreads
=
numOfThread
;
strcpy
(
pThread
->
dbName
,
dbName
);
pThread
->
taosCon
=
taos_connect
(
args
->
host
,
args
->
user
,
args
->
password
,
NULL
,
args
->
port
);
if
(
pThread
->
taosCon
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: thread:%d failed connect to TDengine, error:%s
\n
"
,
pThread
->
threadIndex
,
taos_errstr
(
pThread
->
taosCon
));
exit
(
0
);
}
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
(
pThread
->
threadID
),
&
thattr
,
taosDumpOutWorkThreadFp
,
(
void
*
)
pThread
)
!=
0
)
{
fprintf
(
stderr
,
"ERROR: thread:%d failed to start
\n
"
,
pThread
->
threadIndex
);
exit
(
0
);
}
}
taos_free_result
(
result
);
result
=
NULL
;
for
(
int32_t
t
=
0
;
t
<
numOfThread
;
++
t
)
{
pthread_join
(
threadObj
[
t
].
threadID
,
NULL
);
}
return
count
;
for
(
int32_t
t
=
0
;
t
<
numOfThread
;
++
t
)
{
taos_close
(
threadObj
[
t
].
taosCon
);
}
free
(
threadObj
);
}
int32_t
taosDumpTable
(
char
*
table
,
char
*
metric
,
struct
arguments
*
arguments
,
FILE
*
fp
)
{
int32_t
taosDumpStable
(
char
*
table
,
FILE
*
fp
,
TAOS
*
taosCon
)
{
int
count
=
0
;
STableDef
*
tableDes
=
(
STableDef
*
)
calloc
(
1
,
sizeof
(
STableDef
)
+
sizeof
(
SColDes
)
*
TSDB_MAX_COLUMNS
);
if
(
NULL
==
tableDes
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
exit
(
-
1
);
}
if
(
metric
!=
NULL
&&
metric
[
0
]
!=
'\0'
)
{
// dump metric definition
count
=
taosGetTableDes
(
metric
,
tableDes
);
count
=
taosGetTableDes
(
table
,
tableDes
,
taosCon
);
if
(
count
<
0
)
{
free
(
tableDes
);
return
-
1
;
}
if
(
count
<
0
)
{
free
(
tableDes
);
fprintf
(
stderr
,
"failed to get stable schema
\n
"
);
exit
(
-
1
);
}
taosDumpCreateTableClause
(
tableDes
,
count
,
arguments
,
fp
);
taosDumpCreateTableClause
(
tableDes
,
count
,
fp
);
memset
(
tableDes
,
0
,
sizeof
(
STableDef
)
+
sizeof
(
SColDes
)
*
TSDB_MAX_COLUMNS
);
free
(
tableDes
);
return
0
;
}
count
=
taosGetTableDes
(
table
,
tableDes
);
if
(
count
<
0
)
{
free
(
tableDes
);
return
-
1
;
}
int32_t
taosDumpCreateSuperTableClause
(
TAOS
*
taosCon
,
char
*
dbName
,
FILE
*
fp
)
{
TAOS_ROW
row
;
int
fd
=
-
1
;
STableRecord
tableRecord
;
taosDumpCreateMTableClause
(
tableDes
,
metric
,
count
,
arguments
,
fp
);
char
*
tmpCommand
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
tmpCommand
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
exit
(
-
1
);
}
}
else
{
// dump table definition
count
=
taosGetTableDes
(
table
,
tableDes
);
sprintf
(
tmpCommand
,
"use %s"
,
dbName
);
if
(
taos_query
(
taosCon
,
tmpCommand
)
!=
0
)
{
fprintf
(
stderr
,
"invalid database %s, error: %s
\n
"
,
dbName
,
taos_errstr
(
taosCon
));
free
(
tmpCommand
);
exit
(
-
1
);
}
if
(
count
<
0
)
{
free
(
tableDes
);
return
-
1
;
}
sprintf
(
tmpCommand
,
"show stables"
);
if
(
taos_query
(
taosCon
,
tmpCommand
)
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s, error: %s
\n
"
,
tmpCommand
,
taos_errstr
(
taosCon
));
free
(
tmpCommand
);
exit
(
-
1
);
}
taosDumpCreateTableClause
(
tableDes
,
count
,
arguments
,
fp
);
TAOS_RES
*
tmpResult
=
NULL
;
tmpResult
=
taos_use_result
(
taosCon
);
if
(
tmpResult
==
NULL
)
{
fprintf
(
stderr
,
"failed to use result: %s
\n
"
,
taos_errstr
(
taosCon
));
free
(
tmpCommand
);
exit
(
-
1
);
}
free
(
tableDes
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
tmpResult
);
return
taosDumpTableData
(
fp
,
table
,
arguments
);
char
tmpFileName
[
TSDB_FILENAME_LEN
+
1
];
memset
(
tmpFileName
,
0
,
TSDB_FILENAME_LEN
);
sprintf
(
tmpFileName
,
".stables.tmp"
);
fd
=
open
(
tmpFileName
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRGRP
|
S_IXGRP
|
S_IROTH
);
if
(
fd
==
-
1
)
{
fprintf
(
stderr
,
"failed to open temp file: %s
\n
"
,
tmpFileName
);
taos_free_result
(
tmpResult
);
free
(
tmpCommand
);
remove
(
".stables.tmp"
);
exit
(
-
1
);
}
while
((
row
=
taos_fetch_row
(
tmpResult
))
!=
NULL
)
{
memset
(
&
tableRecord
,
0
,
sizeof
(
STableRecord
));
strncpy
(
tableRecord
.
name
,
(
char
*
)
row
[
TSDB_SHOW_TABLES_NAME_INDEX
],
fields
[
TSDB_SHOW_TABLES_NAME_INDEX
].
bytes
);
twrite
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
));
}
taos_free_result
(
tmpResult
);
lseek
(
fd
,
0
,
SEEK_SET
);
while
(
read
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
))
>
0
)
{
(
void
)
taosDumpStable
(
tableRecord
.
name
,
fp
,
taosCon
);
}
tclose
(
fd
);
remove
(
".stables.tmp"
);
free
(
tmpCommand
);
return
0
;
}
int32_t
taosDumpMetric
(
char
*
metric
,
struct
arguments
*
arguments
,
FILE
*
fp
)
{
TAOS_ROW
row
=
NULL
;
int
taosDumpDb
(
SDbInfo
*
dbInfo
,
struct
arguments
*
arguments
,
FILE
*
fp
,
TAOS
*
taosCon
)
{
TAOS_ROW
row
;
int
fd
=
-
1
;
STableRecord
tableRecord
;
strcpy
(
tableRecord
.
metric
,
metric
);
taosDumpCreateDbClause
(
dbInfo
,
arguments
->
with_property
,
fp
);
sprintf
(
command
,
"select tbname from %s"
,
metric
);
if
(
t
aos_query
(
taos
,
command
)
!=
0
)
{
fprintf
(
stderr
,
"failed to
run command %s
\n
"
,
command
);
char
*
tmpCommand
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
t
mpCommand
==
NULL
)
{
fprintf
(
stderr
,
"failed to
allocate memory
\n
"
);
return
-
1
;
}
result
=
taos_use_result
(
taos
);
if
(
result
==
NULL
)
{
fprintf
(
stderr
,
"failed to use result
\n
"
);
sprintf
(
tmpCommand
,
"use %s"
,
dbInfo
->
name
);
if
(
taos_query
(
taosCon
,
tmpCommand
)
!=
0
)
{
fprintf
(
stderr
,
"invalid database %s
\n
"
,
dbInfo
->
name
);
free
(
tmpCommand
);
return
-
1
;
}
fd
=
open
(
".table.tmp"
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRGRP
|
S_IXGRP
|
S_IROTH
);
if
(
fd
<
0
)
{
fprintf
(
stderr
,
"failed to open temp file"
);
fprintf
(
fp
,
"USE %s
\n\n
"
,
dbInfo
->
name
);
(
void
)
taosDumpCreateSuperTableClause
(
taosCon
,
dbInfo
->
name
,
fp
);
sprintf
(
tmpCommand
,
"show tables"
);
if
(
taos_query
(
taosCon
,
tmpCommand
)
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s
\n
"
,
tmpCommand
);
free
(
tmpCommand
);
return
-
1
;
}
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
result
);
TAOS_RES
*
tmpResult
=
NULL
;
tmpResult
=
taos_use_result
(
taosCon
);
if
(
tmpResult
==
NULL
)
{
fprintf
(
stderr
,
"failed to use result
\n
"
);
free
(
tmpCommand
);
return
-
1
;
}
while
((
row
=
taos_fetch_row
(
result
))
!=
NULL
)
{
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
tmpResult
);
int32_t
numOfTable
=
0
;
int32_t
numOfThread
=
0
;
char
tmpFileName
[
TSDB_FILENAME_LEN
+
1
];
while
((
row
=
taos_fetch_row
(
tmpResult
))
!=
NULL
)
{
if
(
0
==
numOfTable
)
{
memset
(
tmpFileName
,
0
,
TSDB_FILENAME_LEN
);
sprintf
(
tmpFileName
,
".tables.tmp.%d"
,
numOfThread
);
fd
=
open
(
tmpFileName
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRGRP
|
S_IXGRP
|
S_IROTH
);
if
(
fd
==
-
1
)
{
fprintf
(
stderr
,
"failed to open temp file: %s
\n
"
,
tmpFileName
);
taos_free_result
(
tmpResult
);
for
(
int32_t
loopCnt
=
0
;
loopCnt
<
numOfThread
;
loopCnt
++
)
{
sprintf
(
tmpFileName
,
".tables.tmp.%d"
,
loopCnt
);
remove
(
tmpFileName
);
}
free
(
tmpCommand
);
return
-
1
;
}
numOfThread
++
;
}
memset
(
&
tableRecord
,
0
,
sizeof
(
STableRecord
));
strncpy
(
tableRecord
.
name
,
(
char
*
)
row
[
0
],
fields
[
0
].
bytes
);
strcpy
(
tableRecord
.
metric
,
metric
);
strncpy
(
tableRecord
.
name
,
(
char
*
)
row
[
TSDB_SHOW_TABLES_NAME_INDEX
],
fields
[
TSDB_SHOW_TABLES_NAME_INDEX
].
bytes
);
strncpy
(
tableRecord
.
metric
,
(
char
*
)
row
[
TSDB_SHOW_TABLES_METRIC_INDEX
],
fields
[
TSDB_SHOW_TABLES_METRIC_INDEX
].
bytes
);
twrite
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
));
numOfTable
++
;
if
(
numOfTable
>=
arguments
->
table_batch
)
{
numOfTable
=
0
;
tclose
(
fd
);
fd
=
-
1
;
}
}
tclose
(
fd
);
fd
=
-
1
;
taos_free_result
(
tmpResult
);
// start multi threads to dumpout
taosStartDumpOutWorkThreads
(
arguments
,
numOfThread
,
dbInfo
->
name
);
for
(
int
loopCnt
=
0
;
loopCnt
<
numOfThread
;
loopCnt
++
)
{
sprintf
(
tmpFileName
,
".tables.tmp.%d"
,
loopCnt
);
remove
(
tmpFileName
);
}
free
(
tmpCommand
);
taos_free_result
(
result
)
;
result
=
NULL
;
return
0
;
}
lseek
(
fd
,
0
,
SEEK_SET
);
void
taosDumpCreateTableClause
(
STableDef
*
tableDes
,
int
numOfCols
,
FILE
*
fp
)
{
int
counter
=
0
;
int
count_temp
=
0
;
while
(
read
(
fd
,
&
tableRecord
,
sizeof
(
STableRecord
))
>
0
)
{
taosDumpTable
(
tableRecord
.
name
,
tableRecord
.
metric
,
arguments
,
fp
);
char
*
tmpBuf
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
tmpBuf
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
return
;
}
tclose
(
fd
);
remove
(
".table.tmp"
);
char
*
pstr
=
tmpBuf
;
return
0
;
pstr
+=
sprintf
(
tmpBuf
,
"CREATE TABLE IF NOT EXISTS %s"
,
tableDes
->
name
);
for
(;
counter
<
numOfCols
;
counter
++
)
{
if
(
tableDes
->
cols
[
counter
].
note
[
0
]
!=
'\0'
)
break
;
if
(
counter
==
0
)
{
pstr
+=
sprintf
(
pstr
,
" (%s %s"
,
tableDes
->
cols
[
counter
].
field
,
tableDes
->
cols
[
counter
].
type
);
}
else
{
pstr
+=
sprintf
(
pstr
,
", %s %s"
,
tableDes
->
cols
[
counter
].
field
,
tableDes
->
cols
[
counter
].
type
);
}
if
(
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"binary"
)
==
0
||
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"nchar"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
"(%d)"
,
tableDes
->
cols
[
counter
].
length
);
}
}
count_temp
=
counter
;
for
(;
counter
<
numOfCols
;
counter
++
)
{
if
(
counter
==
count_temp
)
{
pstr
+=
sprintf
(
pstr
,
") TAGS (%s %s"
,
tableDes
->
cols
[
counter
].
field
,
tableDes
->
cols
[
counter
].
type
);
}
else
{
pstr
+=
sprintf
(
pstr
,
", %s %s"
,
tableDes
->
cols
[
counter
].
field
,
tableDes
->
cols
[
counter
].
type
);
}
if
(
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"binary"
)
==
0
||
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"nchar"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
"(%d)"
,
tableDes
->
cols
[
counter
].
length
);
}
}
pstr
+=
sprintf
(
pstr
,
")"
);
fprintf
(
fp
,
"%s
\n
"
,
tmpBuf
);
free
(
tmpBuf
);
}
void
taosDumpCreateMTableClause
(
STableDef
*
tableDes
,
char
*
metric
,
int
numOfCols
,
FILE
*
fp
)
{
int
counter
=
0
;
int
count_temp
=
0
;
char
*
tmpBuf
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
tmpBuf
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
return
;
}
char
*
pstr
=
NULL
;
pstr
=
tmpBuf
;
pstr
+=
sprintf
(
tmpBuf
,
"CREATE TABLE IF NOT EXISTS %s USING %s TAGS ("
,
tableDes
->
name
,
metric
);
for
(;
counter
<
numOfCols
;
counter
++
)
{
if
(
tableDes
->
cols
[
counter
].
note
[
0
]
!=
'\0'
)
break
;
}
assert
(
counter
<
numOfCols
);
count_temp
=
counter
;
for
(;
counter
<
numOfCols
;
counter
++
)
{
if
(
counter
!=
count_temp
)
{
if
(
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"binary"
)
==
0
||
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"nchar"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
",
\'
%s
\'
"
,
tableDes
->
cols
[
counter
].
note
);
}
else
{
pstr
+=
sprintf
(
pstr
,
", %s"
,
tableDes
->
cols
[
counter
].
note
);
}
}
else
{
if
(
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"binary"
)
==
0
||
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"nchar"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
"
\'
%s
\'
"
,
tableDes
->
cols
[
counter
].
note
);
}
else
{
pstr
+=
sprintf
(
pstr
,
"%s"
,
tableDes
->
cols
[
counter
].
note
);
}
/* pstr += sprintf(pstr, "%s", tableDes->cols[counter].note); */
}
/* if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || strcasecmp(tableDes->cols[counter].type, "nchar")
* == 0) { */
/* pstr += sprintf(pstr, "(%d)", tableDes->cols[counter].length); */
/* } */
}
pstr
+=
sprintf
(
pstr
,
")"
);
fprintf
(
fp
,
"%s
\n
"
,
tmpBuf
);
free
(
tmpBuf
);
}
int
taosDumpTableData
(
FILE
*
fp
,
char
*
tbname
,
struct
arguments
*
arguments
)
{
int
taosDumpTableData
(
FILE
*
fp
,
char
*
tbname
,
struct
arguments
*
arguments
,
TAOS
*
taosCon
)
{
/* char temp[MAX_COMMAND_SIZE] = "\0"; */
int64_t
totalRows
=
0
;
int
count
=
0
;
char
*
pstr
=
NULL
;
TAOS_ROW
row
=
NULL
;
int
numFields
=
0
;
char
*
tbuf
=
NULL
;
if
(
arguments
->
schemaonly
)
return
0
;
char
*
tmpCommand
=
(
char
*
)
calloc
(
1
,
COMMAND_SIZE
);
if
(
tmpCommand
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
return
-
1
;
}
char
*
tmpBuffer
=
(
char
*
)
calloc
(
1
,
COMMAND_SIZE
);
if
(
tmpBuffer
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
free
(
tmpCommand
);
return
-
1
;
}
pstr
=
tmpBuffer
;
sprintf
(
command
,
"select * from %s where _c0 >= %"
PRId64
" and _c0 <= %"
PRId64
" order by _c0 asc"
,
tbname
,
arguments
->
start_time
,
if
(
arguments
->
schemaonly
)
{
free
(
tmpCommand
);
free
(
tmpBuffer
);
return
0
;
}
sprintf
(
tmpCommand
,
"select * from %s where _c0 >= %"
PRId64
" and _c0 <= %"
PRId64
" order by _c0 asc"
,
tbname
,
arguments
->
start_time
,
arguments
->
end_time
);
if
(
taos_query
(
taos
,
command
)
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s, reason: %s
\n
"
,
command
,
taos_errstr
(
taos
));
if
(
taos_query
(
taosCon
,
tmpCommand
)
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s, reason: %s
\n
"
,
tmpCommand
,
taos_errstr
(
taosCon
));
free
(
tmpCommand
);
free
(
tmpBuffer
);
return
-
1
;
}
result
=
taos_use_result
(
taos
);
if
(
result
==
NULL
)
{
TAOS_RES
*
tmpResult
=
NULL
;
tmpResult
=
taos_use_result
(
taosCon
);
if
(
tmpResult
==
NULL
)
{
fprintf
(
stderr
,
"failed to use result
\n
"
);
free
(
tmpCommand
);
free
(
tmpBuffer
);
return
-
1
;
}
numFields
=
taos_field_count
(
taos
);
numFields
=
taos_field_count
(
taos
Con
);
assert
(
numFields
>
0
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
r
esult
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
tmpR
esult
);
tbuf
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
tbuf
==
NULL
)
{
fprintf
(
stderr
,
"No enough memory
\n
"
);
free
(
tmpCommand
);
free
(
tmpBuffer
);
taos_free_result
(
tmpResult
);
return
-
1
;
}
count
=
0
;
while
((
row
=
taos_fetch_row
(
r
esult
))
!=
NULL
)
{
pstr
=
b
uffer
;
while
((
row
=
taos_fetch_row
(
tmpR
esult
))
!=
NULL
)
{
pstr
=
tmpB
uffer
;
if
(
count
==
0
)
{
pstr
+=
sprintf
(
pstr
,
"I
NSE
RT INTO %s VALUES ("
,
tbname
);
pstr
+=
sprintf
(
pstr
,
"I
MPO
RT INTO %s VALUES ("
,
tbname
);
}
else
{
pstr
+=
sprintf
(
pstr
,
"("
);
}
...
...
@@ -969,24 +1410,32 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments) {
break
;
}
}
pstr
+=
sprintf
(
pstr
,
")"
);
pstr
+=
sprintf
(
pstr
,
")
"
);
totalRows
++
;
count
++
;
fprintf
(
fp
,
"%s"
,
b
uffer
);
fprintf
(
fp
,
"%s"
,
tmpB
uffer
);
if
(
count
>=
arguments
->
data_batch
)
{
fprintf
(
fp
,
"
\n
"
);
count
=
0
;
}
else
{
fprintf
(
fp
,
"
\\\n
"
);
}
}
//
else {
//
fprintf(fp, "\\\n");
//
}
}
atomic_add_fetch_64
(
&
totalDumpOutRows
,
totalRows
);
fprintf
(
fp
,
"
\n
"
);
if
(
tbuf
)
free
(
tbuf
);
taos_free_result
(
result
);
result
=
NULL
;
if
(
tbuf
)
{
free
(
tbuf
);
}
taos_free_result
(
tmpResult
);
tmpResult
=
NULL
;
free
(
tmpCommand
);
free
(
tmpBuffer
);
return
0
;
}
...
...
@@ -1000,23 +1449,29 @@ int taosCheckParam(struct arguments *arguments) {
fprintf
(
stderr
,
"start time is larger than end time
\n
"
);
return
-
1
;
}
if
(
arguments
->
arg_list_len
==
0
)
{
if
((
!
arguments
->
all_databases
)
&&
(
!
arguments
->
isDumpIn
))
{
fprintf
(
stderr
,
"taosdump requires parameters
\n
"
);
return
-
1
;
}
}
if
(
arguments
->
isDumpIn
&&
(
strcmp
(
arguments
->
outp
ut
,
DEFAULT_DUMP_FILE
)
!=
0
))
{
fprintf
(
stderr
,
"duplicate parameter input and output file
\n
"
);
/*
if (arguments->isDumpIn && (strcmp(arguments->outp
ath
, DEFAULT_DUMP_FILE) != 0)) {
fprintf(stderr, "duplicate parameter input and output file
path
\n");
return -1;
}
*/
if
(
!
arguments
->
isDumpIn
&&
arguments
->
encode
!=
NULL
)
{
fprintf
(
stderr
,
"invalid option in dump out
\n
"
);
return
-
1
;
}
if
(
arguments
->
table_batch
<=
0
)
{
fprintf
(
stderr
,
"invalid option in dump out
\n
"
);
return
-
1
;
}
return
0
;
}
...
...
@@ -1075,36 +1530,273 @@ void taosReplaceCtrlChar(char *str) {
*
pstr
=
'\0'
;
}
int
taosDumpIn
(
struct
arguments
*
arguments
)
{
assert
(
arguments
->
isDumpIn
);
char
*
ascii_literal_list
[]
=
{
"
\\
x00"
,
"
\\
x01"
,
"
\\
x02"
,
"
\\
x03"
,
"
\\
x04"
,
"
\\
x05"
,
"
\\
x06"
,
"
\\
x07"
,
"
\\
x08"
,
"
\\
t"
,
"
\\
n"
,
"
\\
x0b"
,
"
\\
x0c"
,
"
\\
r"
,
"
\\
x0e"
,
"
\\
x0f"
,
"
\\
x10"
,
"
\\
x11"
,
"
\\
x12"
,
"
\\
x13"
,
"
\\
x14"
,
"
\\
x15"
,
"
\\
x16"
,
"
\\
x17"
,
"
\\
x18"
,
"
\\
x19"
,
"
\\
x1a"
,
"
\\
x1b"
,
"
\\
x1c"
,
"
\\
x1d"
,
"
\\
x1e"
,
"
\\
x1f"
,
" "
,
"!"
,
"
\\\"
"
,
"#"
,
"$"
,
"%"
,
"&"
,
"
\\
'"
,
"("
,
")"
,
"*"
,
"+"
,
","
,
"-"
,
"."
,
"/"
,
"0"
,
"1"
,
"2"
,
"3"
,
"4"
,
"5"
,
"6"
,
"7"
,
"8"
,
"9"
,
":"
,
";"
,
"<"
,
"="
,
">"
,
"?"
,
"@"
,
"A"
,
"B"
,
"C"
,
"D"
,
"E"
,
"F"
,
"G"
,
"H"
,
"I"
,
"J"
,
"K"
,
"L"
,
"M"
,
"N"
,
"O"
,
"P"
,
"Q"
,
"R"
,
"S"
,
"T"
,
"U"
,
"V"
,
"W"
,
"X"
,
"Y"
,
"Z"
,
"["
,
"
\\\\
"
,
"]"
,
"^"
,
"_"
,
"`"
,
"a"
,
"b"
,
"c"
,
"d"
,
"e"
,
"f"
,
"g"
,
"h"
,
"i"
,
"j"
,
"k"
,
"l"
,
"m"
,
"n"
,
"o"
,
"p"
,
"q"
,
"r"
,
"s"
,
"t"
,
"u"
,
"v"
,
"w"
,
"x"
,
"y"
,
"z"
,
"{"
,
"|"
,
"}"
,
"~"
,
"
\\
x7f"
,
"
\\
x80"
,
"
\\
x81"
,
"
\\
x82"
,
"
\\
x83"
,
"
\\
x84"
,
"
\\
x85"
,
"
\\
x86"
,
"
\\
x87"
,
"
\\
x88"
,
"
\\
x89"
,
"
\\
x8a"
,
"
\\
x8b"
,
"
\\
x8c"
,
"
\\
x8d"
,
"
\\
x8e"
,
"
\\
x8f"
,
"
\\
x90"
,
"
\\
x91"
,
"
\\
x92"
,
"
\\
x93"
,
"
\\
x94"
,
"
\\
x95"
,
"
\\
x96"
,
"
\\
x97"
,
"
\\
x98"
,
"
\\
x99"
,
"
\\
x9a"
,
"
\\
x9b"
,
"
\\
x9c"
,
"
\\
x9d"
,
"
\\
x9e"
,
"
\\
x9f"
,
"
\\
xa0"
,
"
\\
xa1"
,
"
\\
xa2"
,
"
\\
xa3"
,
"
\\
xa4"
,
"
\\
xa5"
,
"
\\
xa6"
,
"
\\
xa7"
,
"
\\
xa8"
,
"
\\
xa9"
,
"
\\
xaa"
,
"
\\
xab"
,
"
\\
xac"
,
"
\\
xad"
,
"
\\
xae"
,
"
\\
xaf"
,
"
\\
xb0"
,
"
\\
xb1"
,
"
\\
xb2"
,
"
\\
xb3"
,
"
\\
xb4"
,
"
\\
xb5"
,
"
\\
xb6"
,
"
\\
xb7"
,
"
\\
xb8"
,
"
\\
xb9"
,
"
\\
xba"
,
"
\\
xbb"
,
"
\\
xbc"
,
"
\\
xbd"
,
"
\\
xbe"
,
"
\\
xbf"
,
"
\\
xc0"
,
"
\\
xc1"
,
"
\\
xc2"
,
"
\\
xc3"
,
"
\\
xc4"
,
"
\\
xc5"
,
"
\\
xc6"
,
"
\\
xc7"
,
"
\\
xc8"
,
"
\\
xc9"
,
"
\\
xca"
,
"
\\
xcb"
,
"
\\
xcc"
,
"
\\
xcd"
,
"
\\
xce"
,
"
\\
xcf"
,
"
\\
xd0"
,
"
\\
xd1"
,
"
\\
xd2"
,
"
\\
xd3"
,
"
\\
xd4"
,
"
\\
xd5"
,
"
\\
xd6"
,
"
\\
xd7"
,
"
\\
xd8"
,
"
\\
xd9"
,
"
\\
xda"
,
"
\\
xdb"
,
"
\\
xdc"
,
"
\\
xdd"
,
"
\\
xde"
,
"
\\
xdf"
,
"
\\
xe0"
,
"
\\
xe1"
,
"
\\
xe2"
,
"
\\
xe3"
,
"
\\
xe4"
,
"
\\
xe5"
,
"
\\
xe6"
,
"
\\
xe7"
,
"
\\
xe8"
,
"
\\
xe9"
,
"
\\
xea"
,
"
\\
xeb"
,
"
\\
xec"
,
"
\\
xed"
,
"
\\
xee"
,
"
\\
xef"
,
"
\\
xf0"
,
"
\\
xf1"
,
"
\\
xf2"
,
"
\\
xf3"
,
"
\\
xf4"
,
"
\\
xf5"
,
"
\\
xf6"
,
"
\\
xf7"
,
"
\\
xf8"
,
"
\\
xf9"
,
"
\\
xfa"
,
"
\\
xfb"
,
"
\\
xfc"
,
"
\\
xfd"
,
"
\\
xfe"
,
"
\\
xff"
};
int
tsize
=
0
;
FILE
*
fp
=
NULL
;
char
*
line
=
NULL
;
_Bool
isRun
=
true
;
size_t
line_size
=
0
;
char
*
pstr
=
NULL
,
*
lstr
=
NULL
;
iconv_t
cd
=
(
iconv_t
)
-
1
;
size_t
inbytesleft
=
0
;
size_t
outbytesleft
=
COMMAND_SIZE
;
char
fcharset
[
64
];
char
*
tcommand
=
NULL
;
fp
=
fopen
(
arguments
->
input
,
"r"
);
int
converStringToReadable
(
char
*
str
,
int
size
,
char
*
buf
,
int
bufsize
)
{
char
*
pstr
=
str
;
char
*
pbuf
=
buf
;
while
(
size
>
0
)
{
if
(
*
pstr
==
'\0'
)
break
;
pbuf
=
stpcpy
(
pbuf
,
ascii_literal_list
[((
uint8_t
)(
*
pstr
))]);
pstr
++
;
size
--
;
}
*
pbuf
=
'\0'
;
return
0
;
}
int
convertNCharToReadable
(
char
*
str
,
int
size
,
char
*
buf
,
int
bufsize
)
{
char
*
pstr
=
str
;
char
*
pbuf
=
buf
;
// TODO
wchar_t
wc
;
while
(
size
>
0
)
{
if
(
*
pstr
==
'\0'
)
break
;
int
byte_width
=
mbtowc
(
&
wc
,
pstr
,
MB_CUR_MAX
);
if
((
int
)
wc
<
256
)
{
pbuf
=
stpcpy
(
pbuf
,
ascii_literal_list
[(
int
)
wc
]);
}
else
{
memcpy
(
pbuf
,
pstr
,
byte_width
);
pbuf
+=
byte_width
;
}
pstr
+=
byte_width
;
}
*
pbuf
=
'\0'
;
return
0
;
}
void
taosDumpCharset
(
FILE
*
fp
)
{
char
charsetline
[
256
];
fseek
(
fp
,
0
,
SEEK_SET
);
sprintf
(
charsetline
,
"#!%s
\n
"
,
tsCharset
);
fwrite
(
charsetline
,
strlen
(
charsetline
),
1
,
fp
);
}
void
taosLoadFileCharset
(
FILE
*
fp
,
char
*
fcharset
)
{
char
*
line
=
NULL
;
size_t
line_size
=
0
;
fseek
(
fp
,
0
,
SEEK_SET
);
ssize_t
size
=
getline
(
&
line
,
&
line_size
,
fp
);
if
(
size
<=
2
)
{
goto
_exit_no_charset
;
}
if
(
strncmp
(
line
,
"#!"
,
2
)
!=
0
)
{
goto
_exit_no_charset
;
}
if
(
line
[
size
-
1
]
==
'\n'
)
{
line
[
size
-
1
]
=
'\0'
;
size
--
;
}
strcpy
(
fcharset
,
line
+
2
);
tfree
(
line
);
return
;
_exit_no_charset:
fseek
(
fp
,
0
,
SEEK_SET
);
*
fcharset
=
'\0'
;
tfree
(
line
);
return
;
}
// ======== dumpIn support multi threads functions ================================//
static
char
**
tsDumpInSqlFiles
=
NULL
;
static
int32_t
tsSqlFileNum
=
0
;
static
char
tsDbSqlFile
[
TSDB_FILENAME_LEN
]
=
{
0
};
static
char
tsfCharset
[
64
]
=
{
0
};
static
int
taosGetFilesNum
(
const
char
*
directoryName
,
const
char
*
prefix
)
{
char
cmd
[
1024
]
=
{
0
};
sprintf
(
cmd
,
"ls %s/*.%s | wc -l "
,
directoryName
,
prefix
);
FILE
*
fp
=
popen
(
cmd
,
"r"
);
if
(
fp
==
NULL
)
{
fprintf
(
stderr
,
"
failed to open input file %s
\n
"
,
arguments
->
input
);
return
-
1
;
fprintf
(
stderr
,
"
ERROR: failed to execute:%s, error:%s
\n
"
,
cmd
,
strerror
(
errno
)
);
exit
(
0
)
;
}
taosLoadFileCharset
(
fp
,
fcharset
);
int
fileNum
=
0
;
if
(
fscanf
(
fp
,
"%d"
,
&
fileNum
)
!=
1
)
{
fprintf
(
stderr
,
"ERROR: failed to execute:%s, parse result error
\n
"
,
cmd
);
exit
(
0
);
}
taos
=
taos_connect
(
arguments
->
host
,
arguments
->
user
,
arguments
->
password
,
NULL
,
arguments
->
port
);
if
(
taos
==
NULL
)
{
fprintf
(
stderr
,
"failed to connect to TDengine server
\n
"
);
goto
_dumpin_exit_failure
;
if
(
fileNum
<=
0
)
{
fprintf
(
stderr
,
"ERROR: directory:%s is empry
\n
"
,
directoryName
);
exit
(
0
);
}
command
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
pclose
(
fp
);
return
fileNum
;
}
static
void
taosParseDirectory
(
const
char
*
directoryName
,
const
char
*
prefix
,
char
**
fileArray
,
int
totalFiles
)
{
char
cmd
[
1024
]
=
{
0
};
sprintf
(
cmd
,
"ls %s/*.%s | sort"
,
directoryName
,
prefix
);
FILE
*
fp
=
popen
(
cmd
,
"r"
);
if
(
fp
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: failed to execute:%s, error:%s
\n
"
,
cmd
,
strerror
(
errno
));
exit
(
0
);
}
int
fileNum
=
0
;
while
(
fscanf
(
fp
,
"%s"
,
fileArray
[
fileNum
++
]))
{
if
(
strcmp
(
fileArray
[
fileNum
-
1
],
tsDbSqlFile
)
==
0
)
{
fileNum
--
;
}
if
(
fileNum
>=
totalFiles
)
{
break
;
}
}
if
(
fileNum
!=
totalFiles
)
{
fprintf
(
stderr
,
"ERROR: directory:%s changed while read
\n
"
,
directoryName
);
exit
(
0
);
}
pclose
(
fp
);
}
static
void
taosCheckTablesSQLFile
(
const
char
*
directoryName
)
{
char
cmd
[
1024
]
=
{
0
};
sprintf
(
cmd
,
"ls %s/dbs.sql"
,
directoryName
);
FILE
*
fp
=
popen
(
cmd
,
"r"
);
if
(
fp
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: failed to execute:%s, error:%s
\n
"
,
cmd
,
strerror
(
errno
));
exit
(
0
);
}
while
(
fscanf
(
fp
,
"%s"
,
tsDbSqlFile
))
{
break
;
}
pclose
(
fp
);
}
static
void
taosMallocSQLFiles
()
{
tsDumpInSqlFiles
=
(
char
**
)
calloc
(
tsSqlFileNum
,
sizeof
(
char
*
));
for
(
int
i
=
0
;
i
<
tsSqlFileNum
;
i
++
)
{
tsDumpInSqlFiles
[
i
]
=
calloc
(
1
,
TSDB_FILENAME_LEN
);
}
}
static
void
taosFreeSQLFiles
()
{
for
(
int
i
=
0
;
i
<
tsSqlFileNum
;
i
++
)
{
tfree
(
tsDumpInSqlFiles
[
i
]);
}
tfree
(
tsDumpInSqlFiles
);
}
static
void
taosGetDirectoryFileList
(
char
*
inputDir
)
{
struct
stat
fileStat
;
if
(
stat
(
inputDir
,
&
fileStat
)
<
0
)
{
fprintf
(
stderr
,
"ERROR: %s not exist
\n
"
,
inputDir
);
exit
(
0
);
}
if
(
fileStat
.
st_mode
&
S_IFDIR
)
{
taosCheckTablesSQLFile
(
inputDir
);
tsSqlFileNum
=
taosGetFilesNum
(
inputDir
,
"sql"
);
int
totalSQLFileNum
=
tsSqlFileNum
;
if
(
tsDbSqlFile
[
0
]
!=
0
)
{
tsSqlFileNum
--
;
}
taosMallocSQLFiles
();
taosParseDirectory
(
inputDir
,
"sql"
,
tsDumpInSqlFiles
,
tsSqlFileNum
);
fprintf
(
stdout
,
"
\n
start to dispose %d files in %s
\n
"
,
totalSQLFileNum
,
inputDir
);
}
else
{
fprintf
(
stderr
,
"ERROR: %s is not a directory
\n
"
,
inputDir
);
exit
(
0
);
}
}
static
FILE
*
taosOpenDumpInFile
(
char
*
fptr
)
{
wordexp_t
full_path
;
if
(
wordexp
(
fptr
,
&
full_path
,
0
)
!=
0
)
{
fprintf
(
stderr
,
"ERROR: illegal file name: %s
\n
"
,
fptr
);
return
NULL
;
}
char
*
fname
=
full_path
.
we_wordv
[
0
];
if
(
access
(
fname
,
F_OK
)
!=
0
)
{
fprintf
(
stderr
,
"ERROR: file %s is not exist
\n
"
,
fptr
);
wordfree
(
&
full_path
);
return
NULL
;
}
if
(
access
(
fname
,
R_OK
)
!=
0
)
{
fprintf
(
stderr
,
"ERROR: file %s is not readable
\n
"
,
fptr
);
wordfree
(
&
full_path
);
return
NULL
;
}
FILE
*
f
=
fopen
(
fname
,
"r"
);
if
(
f
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: failed to open file %s
\n
"
,
fname
);
wordfree
(
&
full_path
);
return
NULL
;
}
wordfree
(
&
full_path
);
return
f
;
}
int
taosDumpInOneFile_old
(
TAOS
*
taos
,
FILE
*
fp
,
char
*
fcharset
,
char
*
encode
)
{
char
*
command
=
NULL
;
char
*
lcommand
=
NULL
;
int
tsize
=
0
;
char
*
line
=
NULL
;
_Bool
isRun
=
true
;
size_t
line_size
=
0
;
char
*
pstr
=
NULL
;
char
*
lstr
=
NULL
;
size_t
inbytesleft
=
0
;
size_t
outbytesleft
=
COMMAND_SIZE
;
char
*
tcommand
=
NULL
;
char
*
charsetOfFile
=
NULL
;
iconv_t
cd
=
(
iconv_t
)(
-
1
);
command
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
lcommand
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
command
==
NULL
||
lcommand
==
NULL
)
{
fprintf
(
stderr
,
"failed to connect to allocate memory
\n
"
);
...
...
@@ -1113,12 +1805,14 @@ int taosDumpIn(struct arguments *arguments) {
// Resolve locale
if
(
*
fcharset
!=
'\0'
)
{
arguments
->
encode
=
fcharset
;
charsetOfFile
=
fcharset
;
}
else
{
charsetOfFile
=
encode
;
}
if
(
arguments
->
encode
!=
NULL
&&
strcasecmp
(
tsCharset
,
arguments
->
encod
e
)
!=
0
)
{
cd
=
iconv_open
(
tsCharset
,
arguments
->
encod
e
);
if
(
cd
==
(
iconv_t
)
-
1
)
{
if
(
charsetOfFile
!=
NULL
&&
strcasecmp
(
tsCharset
,
charsetOfFil
e
)
!=
0
)
{
cd
=
iconv_open
(
tsCharset
,
charsetOfFil
e
);
if
(
cd
==
(
(
iconv_t
)(
-
1
))
)
{
fprintf
(
stderr
,
"Failed to open iconv handle
\n
"
);
goto
_dumpin_exit_failure
;
}
...
...
@@ -1137,17 +1831,21 @@ int taosDumpIn(struct arguments *arguments) {
pstr
=
command
;
lstr
=
lcommand
;
outbytesleft
=
COMMAND_SIZE
;
if
(
cd
!=
(
iconv_t
)
-
1
)
{
if
(
cd
!=
(
(
iconv_t
)(
-
1
))
)
{
iconv
(
cd
,
&
pstr
,
&
inbytesleft
,
&
lstr
,
&
outbytesleft
);
tcommand
=
lcommand
;
}
else
{
tcommand
=
command
;
}
taosReplaceCtrlChar
(
tcommand
);
if
(
taos_query
(
taos
,
tcommand
)
!=
0
)
if
(
taos_query
(
taos
,
tcommand
)
!=
0
)
{
fprintf
(
stderr
,
"linenu: %"
PRId64
" failed to run command %s reason:%s
\n
continue...
\n
"
,
linenu
,
command
,
taos_errstr
(
taos
));
exit
(
0
);
}
pstr
=
command
;
pstr
[
0
]
=
'\0'
;
tsize
=
0
;
...
...
@@ -1185,16 +1883,18 @@ int taosDumpIn(struct arguments *arguments) {
pstr
=
command
;
lstr
=
lcommand
;
outbytesleft
=
COMMAND_SIZE
;
if
(
cd
!=
(
iconv_t
)
-
1
)
{
if
(
cd
!=
(
(
iconv_t
)(
-
1
))
)
{
iconv
(
cd
,
&
pstr
,
&
inbytesleft
,
&
lstr
,
&
outbytesleft
);
tcommand
=
lcommand
;
}
else
{
tcommand
=
command
;
}
taosReplaceCtrlChar
(
tcommand
);
if
(
taos_query
(
taos
,
tcommand
)
!=
0
)
if
(
taos_query
(
taos
,
tcommand
)
!=
0
)
{
fprintf
(
stderr
,
"linenu:%"
PRId64
" failed to run command %s reason: %s
\n
continue...
\n
"
,
linenu
,
command
,
taos_errstr
(
taos
));
exit
(
0
);
}
}
pstr
=
command
;
...
...
@@ -1208,7 +1908,7 @@ int taosDumpIn(struct arguments *arguments) {
pstr
=
command
;
lstr
=
lcommand
;
outbytesleft
=
COMMAND_SIZE
;
if
(
cd
!=
(
iconv_t
)
-
1
)
{
if
(
cd
!=
(
(
iconv_t
)(
-
1
))
)
{
iconv
(
cd
,
&
pstr
,
&
inbytesleft
,
&
lstr
,
&
outbytesleft
);
tcommand
=
lcommand
;
}
else
{
...
...
@@ -1220,7 +1920,7 @@ int taosDumpIn(struct arguments *arguments) {
taos_errstr
(
taos
));
}
if
(
cd
!=
(
iconv_t
)
-
1
)
iconv_close
(
cd
);
if
(
cd
!=
(
(
iconv_t
)(
-
1
))
)
iconv_close
(
cd
);
tfree
(
line
);
tfree
(
command
);
tfree
(
lcommand
);
...
...
@@ -1229,7 +1929,7 @@ int taosDumpIn(struct arguments *arguments) {
return
0
;
_dumpin_exit_failure:
if
(
cd
!=
(
iconv_t
)
-
1
)
iconv_close
(
cd
);
if
(
cd
!=
(
(
iconv_t
)(
-
1
))
)
iconv_close
(
cd
);
tfree
(
command
);
tfree
(
lcommand
);
taos_close
(
taos
);
...
...
@@ -1237,97 +1937,144 @@ _dumpin_exit_failure:
return
-
1
;
}
char
*
ascii_literal_list
[]
=
{
"
\\
x00"
,
"
\\
x01"
,
"
\\
x02"
,
"
\\
x03"
,
"
\\
x04"
,
"
\\
x05"
,
"
\\
x06"
,
"
\\
x07"
,
"
\\
x08"
,
"
\\
t"
,
"
\\
n"
,
"
\\
x0b"
,
"
\\
x0c"
,
"
\\
r"
,
"
\\
x0e"
,
"
\\
x0f"
,
"
\\
x10"
,
"
\\
x11"
,
"
\\
x12"
,
"
\\
x13"
,
"
\\
x14"
,
"
\\
x15"
,
"
\\
x16"
,
"
\\
x17"
,
"
\\
x18"
,
"
\\
x19"
,
"
\\
x1a"
,
"
\\
x1b"
,
"
\\
x1c"
,
"
\\
x1d"
,
"
\\
x1e"
,
"
\\
x1f"
,
" "
,
"!"
,
"
\\\"
"
,
"#"
,
"$"
,
"%"
,
"&"
,
"
\\
'"
,
"("
,
")"
,
"*"
,
"+"
,
","
,
"-"
,
"."
,
"/"
,
"0"
,
"1"
,
"2"
,
"3"
,
"4"
,
"5"
,
"6"
,
"7"
,
"8"
,
"9"
,
":"
,
";"
,
"<"
,
"="
,
">"
,
"?"
,
"@"
,
"A"
,
"B"
,
"C"
,
"D"
,
"E"
,
"F"
,
"G"
,
"H"
,
"I"
,
"J"
,
"K"
,
"L"
,
"M"
,
"N"
,
"O"
,
"P"
,
"Q"
,
"R"
,
"S"
,
"T"
,
"U"
,
"V"
,
"W"
,
"X"
,
"Y"
,
"Z"
,
"["
,
"
\\\\
"
,
"]"
,
"^"
,
"_"
,
"`"
,
"a"
,
"b"
,
"c"
,
"d"
,
"e"
,
"f"
,
"g"
,
"h"
,
"i"
,
"j"
,
"k"
,
"l"
,
"m"
,
"n"
,
"o"
,
"p"
,
"q"
,
"r"
,
"s"
,
"t"
,
"u"
,
"v"
,
"w"
,
"x"
,
"y"
,
"z"
,
"{"
,
"|"
,
"}"
,
"~"
,
"
\\
x7f"
,
"
\\
x80"
,
"
\\
x81"
,
"
\\
x82"
,
"
\\
x83"
,
"
\\
x84"
,
"
\\
x85"
,
"
\\
x86"
,
"
\\
x87"
,
"
\\
x88"
,
"
\\
x89"
,
"
\\
x8a"
,
"
\\
x8b"
,
"
\\
x8c"
,
"
\\
x8d"
,
"
\\
x8e"
,
"
\\
x8f"
,
"
\\
x90"
,
"
\\
x91"
,
"
\\
x92"
,
"
\\
x93"
,
"
\\
x94"
,
"
\\
x95"
,
"
\\
x96"
,
"
\\
x97"
,
"
\\
x98"
,
"
\\
x99"
,
"
\\
x9a"
,
"
\\
x9b"
,
"
\\
x9c"
,
"
\\
x9d"
,
"
\\
x9e"
,
"
\\
x9f"
,
"
\\
xa0"
,
"
\\
xa1"
,
"
\\
xa2"
,
"
\\
xa3"
,
"
\\
xa4"
,
"
\\
xa5"
,
"
\\
xa6"
,
"
\\
xa7"
,
"
\\
xa8"
,
"
\\
xa9"
,
"
\\
xaa"
,
"
\\
xab"
,
"
\\
xac"
,
"
\\
xad"
,
"
\\
xae"
,
"
\\
xaf"
,
"
\\
xb0"
,
"
\\
xb1"
,
"
\\
xb2"
,
"
\\
xb3"
,
"
\\
xb4"
,
"
\\
xb5"
,
"
\\
xb6"
,
"
\\
xb7"
,
"
\\
xb8"
,
"
\\
xb9"
,
"
\\
xba"
,
"
\\
xbb"
,
"
\\
xbc"
,
"
\\
xbd"
,
"
\\
xbe"
,
"
\\
xbf"
,
"
\\
xc0"
,
"
\\
xc1"
,
"
\\
xc2"
,
"
\\
xc3"
,
"
\\
xc4"
,
"
\\
xc5"
,
"
\\
xc6"
,
"
\\
xc7"
,
"
\\
xc8"
,
"
\\
xc9"
,
"
\\
xca"
,
"
\\
xcb"
,
"
\\
xcc"
,
"
\\
xcd"
,
"
\\
xce"
,
"
\\
xcf"
,
"
\\
xd0"
,
"
\\
xd1"
,
"
\\
xd2"
,
"
\\
xd3"
,
"
\\
xd4"
,
"
\\
xd5"
,
"
\\
xd6"
,
"
\\
xd7"
,
"
\\
xd8"
,
"
\\
xd9"
,
"
\\
xda"
,
"
\\
xdb"
,
"
\\
xdc"
,
"
\\
xdd"
,
"
\\
xde"
,
"
\\
xdf"
,
"
\\
xe0"
,
"
\\
xe1"
,
"
\\
xe2"
,
"
\\
xe3"
,
"
\\
xe4"
,
"
\\
xe5"
,
"
\\
xe6"
,
"
\\
xe7"
,
"
\\
xe8"
,
"
\\
xe9"
,
"
\\
xea"
,
"
\\
xeb"
,
"
\\
xec"
,
"
\\
xed"
,
"
\\
xee"
,
"
\\
xef"
,
"
\\
xf0"
,
"
\\
xf1"
,
"
\\
xf2"
,
"
\\
xf3"
,
"
\\
xf4"
,
"
\\
xf5"
,
"
\\
xf6"
,
"
\\
xf7"
,
"
\\
xf8"
,
"
\\
xf9"
,
"
\\
xfa"
,
"
\\
xfb"
,
"
\\
xfc"
,
"
\\
xfd"
,
"
\\
xfe"
,
"
\\
xff"
};
int
taosDumpInOneFile
(
TAOS
*
taos
,
FILE
*
fp
,
char
*
fcharset
,
char
*
encode
)
{
int
read_len
=
0
;
char
*
cmd
=
NULL
;
size_t
cmd_len
=
0
;
char
*
line
=
NULL
;
size_t
line_len
=
0
;
int
converStringToReadable
(
char
*
str
,
int
size
,
char
*
buf
,
int
bufsize
)
{
char
*
pstr
=
str
;
char
*
pbuf
=
buf
;
while
(
size
>
0
)
{
if
(
*
pstr
==
'\0'
)
break
;
pbuf
=
stpcpy
(
pbuf
,
ascii_literal_list
[((
uint8_t
)(
*
pstr
))]);
pstr
++
;
size
--
;
cmd
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
cmd
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
return
-
1
;
}
*
pbuf
=
'\0'
;
return
0
;
}
int
convertNCharToReadable
(
char
*
str
,
int
size
,
char
*
buf
,
int
bufsize
)
{
char
*
pstr
=
str
;
char
*
pbuf
=
buf
;
// TODO
wchar_t
wc
;
while
(
size
>
0
)
{
if
(
*
pstr
==
'\0'
)
break
;
int
byte_width
=
mbtowc
(
&
wc
,
pstr
,
MB_CUR_MAX
);
int
lineNo
=
0
;
while
((
read_len
=
getline
(
&
line
,
&
line_len
,
fp
))
!=
-
1
)
{
++
lineNo
;
if
(
read_len
>=
COMMAND_SIZE
)
continue
;
line
[
--
read_len
]
=
'\0'
;
if
((
int
)
wc
<
256
)
{
pbuf
=
stpcpy
(
pbuf
,
ascii_literal_list
[(
int
)
wc
]);
}
else
{
memcpy
(
pbuf
,
pstr
,
byte_width
);
pbuf
+=
byte_width
;
//if (read_len == 0 || isCommentLine(line)) { // line starts with #
if
(
read_len
==
0
)
{
continue
;
}
pstr
+=
byte_width
;
}
*
pbuf
=
'\0'
;
if
(
line
[
read_len
-
1
]
==
'\\'
)
{
line
[
read_len
-
1
]
=
' '
;
memcpy
(
cmd
+
cmd_len
,
line
,
read_len
);
cmd_len
+=
read_len
;
continue
;
}
memcpy
(
cmd
+
cmd_len
,
line
,
read_len
);
if
(
taos_query
(
taos
,
cmd
))
{
fprintf
(
stderr
,
"DB error: %s line:%d
\n
"
,
taos_errstr
(
taos
),
lineNo
);
/* free local resouce: allocated memory/metric-meta refcnt */
TAOS_RES
*
pRes
=
taos_use_result
(
taos
);
taos_free_result
(
pRes
);
}
memset
(
cmd
,
0
,
COMMAND_SIZE
);
cmd_len
=
0
;
}
tfree
(
cmd
);
tfree
(
line
);
fclose
(
fp
);
return
0
;
}
void
taosDumpCharset
(
FILE
*
fp
)
{
char
charsetline
[
256
];
void
*
taosDumpInWorkThreadFp
(
void
*
arg
)
{
SThreadParaObj
*
pThread
=
(
SThreadParaObj
*
)
arg
;
for
(
int32_t
f
=
0
;
f
<
tsSqlFileNum
;
++
f
)
{
if
(
f
%
pThread
->
totalThreads
==
pThread
->
threadIndex
)
{
char
*
SQLFileName
=
tsDumpInSqlFiles
[
f
];
FILE
*
fp
=
taosOpenDumpInFile
(
SQLFileName
);
if
(
NULL
==
fp
)
{
continue
;
}
taosDumpInOneFile
(
pThread
->
taosCon
,
fp
,
tsfCharset
,
tsArguments
.
encode
);
}
}
fseek
(
fp
,
0
,
SEEK_SET
);
sprintf
(
charsetline
,
"#!%s
\n
"
,
tsCharset
);
fwrite
(
charsetline
,
strlen
(
charsetline
),
1
,
fp
);
return
NULL
;
}
void
taosLoadFileCharset
(
FILE
*
fp
,
char
*
fcharset
)
{
char
*
line
=
NULL
;
size_t
line_size
=
0
;
static
void
taosStartDumpInWorkThreads
(
struct
arguments
*
args
)
{
pthread_attr_t
thattr
;
SThreadParaObj
*
pThread
;
int32_t
totalThreads
=
args
->
thread_num
;
fseek
(
fp
,
0
,
SEEK_SET
);
ssize_t
size
=
getline
(
&
line
,
&
line_size
,
fp
);
if
(
size
<=
2
)
{
goto
_exit_no_charset
;
if
(
totalThreads
>
tsSqlFileNum
)
{
totalThreads
=
tsSqlFileNum
;
}
SThreadParaObj
*
threadObj
=
(
SThreadParaObj
*
)
calloc
(
totalThreads
,
sizeof
(
SThreadParaObj
));
for
(
int32_t
t
=
0
;
t
<
totalThreads
;
++
t
)
{
pThread
=
threadObj
+
t
;
pThread
->
threadIndex
=
t
;
pThread
->
totalThreads
=
totalThreads
;
pThread
->
taosCon
=
taos_connect
(
args
->
host
,
args
->
user
,
args
->
password
,
NULL
,
args
->
port
);
if
(
pThread
->
taosCon
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: thread:%d failed connect to TDengine, error:%s
\n
"
,
pThread
->
threadIndex
,
taos_errstr
(
pThread
->
taosCon
));
exit
(
0
);
}
if
(
strncmp
(
line
,
"#!"
,
2
)
!=
0
)
{
goto
_exit_no_charset
;
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
(
pThread
->
threadID
),
&
thattr
,
taosDumpInWorkThreadFp
,
(
void
*
)
pThread
)
!=
0
)
{
fprintf
(
stderr
,
"ERROR: thread:%d failed to start
\n
"
,
pThread
->
threadIndex
);
exit
(
0
);
}
}
if
(
line
[
size
-
1
]
==
'\n'
)
{
line
[
size
-
1
]
=
'\0'
;
size
--
;
for
(
int
t
=
0
;
t
<
totalThreads
;
++
t
)
{
pthread_join
(
threadObj
[
t
].
threadID
,
NULL
)
;
}
strcpy
(
fcharset
,
line
+
2
);
tfree
(
line
);
return
;
for
(
int
t
=
0
;
t
<
totalThreads
;
++
t
)
{
taos_close
(
threadObj
[
t
].
taosCon
);
}
free
(
threadObj
);
}
int
taosDumpIn
(
struct
arguments
*
arguments
)
{
assert
(
arguments
->
isDumpIn
);
TAOS
*
taos
=
NULL
;
FILE
*
fp
=
NULL
;
taos
=
taos_connect
(
arguments
->
host
,
arguments
->
user
,
arguments
->
password
,
NULL
,
arguments
->
port
);
if
(
taos
==
NULL
)
{
fprintf
(
stderr
,
"failed to connect to TDengine server
\n
"
);
return
-
1
;
}
taosGetDirectoryFileList
(
arguments
->
inpath
);
if
(
tsDbSqlFile
[
0
]
!=
0
)
{
fp
=
taosOpenDumpInFile
(
tsDbSqlFile
);
if
(
NULL
==
fp
)
{
fprintf
(
stderr
,
"failed to open input file %s
\n
"
,
tsDbSqlFile
);
return
-
1
;
}
taosLoadFileCharset
(
fp
,
tsfCharset
);
taosDumpInOneFile
(
taos
,
fp
,
tsfCharset
,
arguments
->
encode
);
}
taosStartDumpInWorkThreads
(
arguments
);
taos_close
(
taos
);
taosFreeSQLFiles
();
return
0
;
}
_exit_no_charset:
fseek
(
fp
,
0
,
SEEK_SET
);
*
fcharset
=
'\0'
;
tfree
(
line
);
return
;
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录