Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
2b21a75a
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2b21a75a
编写于
12月 16, 2019
作者:
S
slguan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
#928
上级
3ed436cc
变更
5
显示空白变更内容
内联
并排
Showing
5 changed file
with
300 addition
and
2 deletion
+300
-2
src/kit/shell/inc/shell.h
src/kit/shell/inc/shell.h
+4
-0
src/kit/shell/src/shellEngine.c
src/kit/shell/src/shellEngine.c
+9
-1
src/kit/shell/src/shellImport.c
src/kit/shell/src/shellImport.c
+261
-0
src/kit/shell/src/shellLinux.c
src/kit/shell/src/shellLinux.c
+13
-0
src/kit/shell/src/shellMain.c
src/kit/shell/src/shellMain.c
+13
-1
未找到文件。
src/kit/shell/inc/shell.h
浏览文件 @
2b21a75a
...
...
@@ -58,6 +58,8 @@ struct arguments {
bool
is_raw_time
;
bool
is_use_passwd
;
char
file
[
TSDB_FILENAME_LEN
];
char
dir
[
TSDB_FILENAME_LEN
];
int
threadNum
;
char
*
commands
;
int
abort
;
};
...
...
@@ -74,12 +76,14 @@ void shellRunCommandOnServer(TAOS* con, char command[]);
void
read_history
();
void
write_history
();
void
source_file
(
TAOS
*
con
,
char
*
fptr
);
void
source_dir
(
TAOS
*
con
,
struct
arguments
*
args
);
void
get_history_path
(
char
*
history
);
void
cleanup_handler
(
void
*
arg
);
void
exitShell
();
int
shellDumpResult
(
TAOS
*
con
,
char
*
fname
,
int
*
error_no
,
bool
printMode
);
void
shellPrintNChar
(
char
*
str
,
int
width
,
bool
printMode
);
void
shellGetGrantInfo
(
void
*
con
);
int
isCommentLine
(
char
*
line
);
#define max(a, b) ((int)(a) < (int)(b) ? (int)(b) : (int)(a))
/**************** Global variable declarations ****************/
...
...
src/kit/shell/src/shellEngine.c
浏览文件 @
2b21a75a
...
...
@@ -110,6 +110,14 @@ TAOS *shellInit(struct arguments *args) {
exit
(
EXIT_SUCCESS
);
}
#ifdef LINUX
if
(
args
->
dir
[
0
]
!=
0
)
{
source_dir
(
con
,
args
);
taos_close
(
con
);
exit
(
EXIT_SUCCESS
);
}
#endif
printf
(
SERVER_VERSION
,
taos_get_server_info
(
con
));
return
con
;
...
...
@@ -762,7 +770,7 @@ void taos_error(TAOS *con) {
taos_free_result
(
pRes
);
}
static
int
isCommentLine
(
char
*
line
)
{
int
isCommentLine
(
char
*
line
)
{
if
(
line
==
NULL
)
return
1
;
return
regex_match
(
line
,
"^
\\
s*#.*"
,
REG_EXTENDED
);
...
...
src/kit/shell/src/shellImport.c
0 → 100644
浏览文件 @
2b21a75a
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _XOPEN_SOURCE
#define _DEFAULT_SOURCE
#include "os.h"
#include "shell.h"
#include "shellCommand.h"
#include "ttime.h"
#include "tutil.h"
static
char
**
shellSQLFiles
=
NULL
;
static
int32_t
shellSQLFileNum
=
0
;
static
char
shellTablesSQLFile
[
TSDB_FILENAME_LEN
]
=
{
0
};
typedef
struct
{
pthread_t
threadID
;
int
threadIndex
;
int
totalThreads
;
void
*
taos
;
}
ShellThreadObj
;
static
int
shellGetFilesNum
(
const
char
*
directoryName
,
const
char
*
prefix
)
{
char
cmd
[
1024
]
=
{
0
};
sprintf
(
cmd
,
"ls %s/*.%s | wc -l "
,
directoryName
,
prefix
);
FILE
*
fp
=
popen
(
cmd
,
"r"
);
if
(
fp
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: failed to execute:%s, error:%s
\n
"
,
cmd
,
strerror
(
errno
));
exit
(
0
);
}
int
fileNum
=
0
;
if
(
fscanf
(
fp
,
"%d"
,
&
fileNum
)
!=
1
)
{
fprintf
(
stderr
,
"ERROR: failed to execute:%s, parse result error
\n
"
,
cmd
);
exit
(
0
);
}
if
(
fileNum
<=
0
)
{
fprintf
(
stderr
,
"ERROR: directory:%s is empry
\n
"
,
directoryName
);
exit
(
0
);
}
pclose
(
fp
);
return
fileNum
;
}
static
void
shellParseDirectory
(
const
char
*
directoryName
,
const
char
*
prefix
,
char
**
fileArray
,
int
totalFiles
)
{
char
cmd
[
1024
]
=
{
0
};
sprintf
(
cmd
,
"ls %s/*.%s | sort"
,
directoryName
,
prefix
);
FILE
*
fp
=
popen
(
cmd
,
"r"
);
if
(
fp
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: failed to execute:%s, error:%s
\n
"
,
cmd
,
strerror
(
errno
));
exit
(
0
);
}
int
fileNum
=
0
;
while
(
fscanf
(
fp
,
"%s"
,
fileArray
[
fileNum
++
]))
{
if
(
strcmp
(
fileArray
[
fileNum
-
1
],
shellTablesSQLFile
)
==
0
)
{
fileNum
--
;
}
if
(
fileNum
>=
totalFiles
)
{
break
;
}
}
if
(
fileNum
!=
totalFiles
)
{
fprintf
(
stderr
,
"ERROR: directory:%s changed while read
\n
"
,
directoryName
);
exit
(
0
);
}
pclose
(
fp
);
}
static
void
shellCheckTablesSQLFile
(
const
char
*
directoryName
)
{
char
cmd
[
1024
]
=
{
0
};
sprintf
(
cmd
,
"ls %s/tables.sql"
,
directoryName
);
FILE
*
fp
=
popen
(
cmd
,
"r"
);
if
(
fp
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: failed to execute:%s, error:%s
\n
"
,
cmd
,
strerror
(
errno
));
exit
(
0
);
}
while
(
fscanf
(
fp
,
"%s"
,
shellTablesSQLFile
))
{
break
;
}
pclose
(
fp
);
}
static
void
shellMallocSQLFiles
()
{
shellSQLFiles
=
(
char
**
)
calloc
(
shellSQLFileNum
,
sizeof
(
char
*
));
for
(
int
i
=
0
;
i
<
shellSQLFileNum
;
i
++
)
{
shellSQLFiles
[
i
]
=
calloc
(
1
,
TSDB_FILENAME_LEN
);
}
}
static
void
shellGetDirectoryFileList
(
char
*
inputDir
)
{
struct
stat
fileStat
;
if
(
stat
(
inputDir
,
&
fileStat
)
<
0
)
{
fprintf
(
stderr
,
"ERROR: %s not exist
\n
"
,
inputDir
);
exit
(
0
);
}
if
(
fileStat
.
st_mode
&
S_IFDIR
)
{
shellCheckTablesSQLFile
(
inputDir
);
shellSQLFileNum
=
shellGetFilesNum
(
inputDir
,
"sql"
);
int
totalSQLFileNum
=
shellSQLFileNum
;
if
(
shellTablesSQLFile
[
0
]
!=
0
)
{
shellSQLFileNum
--
;
}
shellMallocSQLFiles
();
shellParseDirectory
(
inputDir
,
"sql"
,
shellSQLFiles
,
shellSQLFileNum
);
fprintf
(
stdout
,
"start to dispose %d files in %s
\n
"
,
totalSQLFileNum
,
inputDir
);
}
else
{
fprintf
(
stderr
,
"ERROR: %s is not a directory
\n
"
,
inputDir
);
exit
(
0
);
}
}
static
void
shellSourceFile
(
TAOS
*
con
,
char
*
fptr
)
{
wordexp_t
full_path
;
int
read_len
=
0
;
char
*
cmd
=
malloc
(
MAX_COMMAND_SIZE
);
size_t
cmd_len
=
0
;
char
*
line
=
NULL
;
size_t
line_len
=
0
;
if
(
wordexp
(
fptr
,
&
full_path
,
0
)
!=
0
)
{
fprintf
(
stderr
,
"ERROR: illegal file name
\n
"
);
return
;
}
char
*
fname
=
full_path
.
we_wordv
[
0
];
if
(
access
(
fname
,
R_OK
)
==
-
1
)
{
fprintf
(
stderr
,
"ERROR: file %s is not readable
\n
"
,
fptr
);
wordfree
(
&
full_path
);
return
;
}
FILE
*
f
=
fopen
(
fname
,
"r"
);
if
(
f
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: failed to open file %s
\n
"
,
fname
);
wordfree
(
&
full_path
);
return
;
}
fprintf
(
stdout
,
"start to dispose file:%s
\n
"
,
fname
);
while
((
read_len
=
getline
(
&
line
,
&
line_len
,
f
))
!=
-
1
)
{
if
(
read_len
>=
MAX_COMMAND_SIZE
)
continue
;
line
[
--
read_len
]
=
'\0'
;
if
(
read_len
==
0
||
isCommentLine
(
line
))
{
// line starts with #
continue
;
}
if
(
line
[
read_len
-
1
]
==
'\\'
)
{
line
[
read_len
-
1
]
=
' '
;
memcpy
(
cmd
+
cmd_len
,
line
,
read_len
);
cmd_len
+=
read_len
;
continue
;
}
memcpy
(
cmd
+
cmd_len
,
line
,
read_len
);
if
(
taos_query
(
con
,
cmd
))
{
taos_error
(
con
);
}
memset
(
cmd
,
0
,
MAX_COMMAND_SIZE
);
cmd_len
=
0
;
}
free
(
cmd
);
if
(
line
)
free
(
line
);
wordfree
(
&
full_path
);
fclose
(
f
);
}
void
*
shellImportThreadFp
(
void
*
arg
)
{
ShellThreadObj
*
pThread
=
(
ShellThreadObj
*
)
arg
;
for
(
int
f
=
0
;
f
<
shellSQLFileNum
;
++
f
)
{
if
(
f
%
pThread
->
totalThreads
==
pThread
->
threadIndex
)
{
char
*
SQLFileName
=
shellSQLFiles
[
f
];
shellSourceFile
(
pThread
->
taos
,
SQLFileName
);
}
}
return
NULL
;
}
static
void
shellRunImportThreads
(
struct
arguments
*
args
)
{
pthread_attr_t
thattr
;
ShellThreadObj
*
threadObj
=
(
ShellThreadObj
*
)
calloc
(
args
->
threadNum
,
sizeof
(
ShellThreadObj
));
for
(
int
t
=
0
;
t
<
args
->
threadNum
;
++
t
)
{
ShellThreadObj
*
pThread
=
threadObj
+
t
;
pThread
->
threadIndex
=
t
;
pThread
->
totalThreads
=
args
->
threadNum
;
pThread
->
taos
=
taos_connect
(
args
->
host
,
args
->
user
,
args
->
password
,
args
->
database
,
tsMgmtShellPort
);
if
(
pThread
->
taos
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: thread:%d failed connect to TDengine, error:%s
\n
"
,
pThread
->
threadIndex
,
taos_errstr
(
pThread
->
taos
));
exit
(
0
);
}
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
(
pThread
->
threadID
),
&
thattr
,
shellImportThreadFp
,
(
void
*
)
pThread
)
!=
0
)
{
fprintf
(
stderr
,
"ERROR: thread:%d failed to start
\n
"
,
pThread
->
threadIndex
);
exit
(
0
);
}
}
for
(
int
t
=
0
;
t
<
args
->
threadNum
;
++
t
)
{
pthread_join
(
threadObj
[
t
].
threadID
,
NULL
);
}
for
(
int
t
=
0
;
t
<
args
->
threadNum
;
++
t
)
{
taos_close
(
threadObj
[
t
].
taos
);
}
free
(
threadObj
);
}
void
source_dir
(
TAOS
*
con
,
struct
arguments
*
args
)
{
shellGetDirectoryFileList
(
args
->
dir
);
int64_t
start
=
taosGetTimestampMs
();
if
(
shellTablesSQLFile
[
0
]
!=
0
)
{
shellSourceFile
(
con
,
shellTablesSQLFile
);
int64_t
end
=
taosGetTimestampMs
();
fprintf
(
stdout
,
"import %s finished, time spent %.2f seconds
\n
"
,
shellTablesSQLFile
,
(
end
-
start
)
/
1000
.
0
);
}
shellRunImportThreads
(
args
);
int64_t
end
=
taosGetTimestampMs
();
fprintf
(
stdout
,
"import %s finished, time spent %.2f seconds
\n
"
,
args
->
dir
,
(
end
-
start
)
/
1000
.
0
);
}
src/kit/shell/src/shellLinux.c
浏览文件 @
2b21a75a
...
...
@@ -40,6 +40,8 @@ static struct argp_option options[] = {
{
"commands"
,
's'
,
"COMMANDS"
,
0
,
"Commands to run without enter the shell."
},
{
"raw-time"
,
'r'
,
0
,
0
,
"Output time as uint64_t."
},
{
"file"
,
'f'
,
"FILE"
,
0
,
"Script to run without enter the shell."
},
{
"directory"
,
'D'
,
"DIRECTORY"
,
0
,
"Use multi-thread to import all SQL files in the directory separately."
},
{
"thread"
,
'T'
,
"THREADNUM"
,
0
,
"Number of threads when using multi-thread to import data."
},
{
"database"
,
'd'
,
"DATABASE"
,
0
,
"Database to use when connecting to the server."
},
{
"timezone"
,
't'
,
"TIMEZONE"
,
0
,
"Time zone of the shell, default is local."
},
{
0
}};
...
...
@@ -89,6 +91,17 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
strcpy
(
arguments
->
file
,
full_path
.
we_wordv
[
0
]);
wordfree
(
&
full_path
);
break
;
case
'D'
:
if
(
wordexp
(
arg
,
&
full_path
,
0
)
!=
0
)
{
fprintf
(
stderr
,
"Invalid path %s
\n
"
,
arg
);
return
-
1
;
}
strcpy
(
arguments
->
dir
,
full_path
.
we_wordv
[
0
]);
wordfree
(
&
full_path
);
break
;
case
'T'
:
arguments
->
threadNum
=
atoi
(
arg
);
break
;
case
'd'
:
arguments
->
database
=
arg
;
break
;
...
...
src/kit/shell/src/shellMain.c
浏览文件 @
2b21a75a
...
...
@@ -62,7 +62,19 @@ int checkVersion() {
}
// Global configurations
struct
arguments
args
=
{
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
false
,
false
,
"
\0
"
,
NULL
};
struct
arguments
args
=
{
.
host
=
NULL
,
.
password
=
NULL
,
.
user
=
NULL
,
.
database
=
NULL
,
.
timezone
=
NULL
,
.
is_raw_time
=
false
,
.
is_use_passwd
=
false
,
.
file
=
"
\0
"
,
.
dir
=
"
\0
"
,
.
threadNum
=
5
,
.
commands
=
NULL
};
/*
* Main function.
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录