Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
lhongjum2003
DolphinScheduler
提交
fb5c8646
DolphinScheduler
项目概览
lhongjum2003
/
DolphinScheduler
与 Fork 源项目一致
Fork自
apache / DolphinScheduler
通知
2
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
fb5c8646
编写于
7月 22, 2019
作者:
journey2018
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
the resource upload switch is switched, and the file system is consistent.
上级
7b0ff0ca
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
71 addition
and
43 deletion
+71
-43
escheduler-api/src/main/java/cn/escheduler/api/service/BaseService.java
.../src/main/java/cn/escheduler/api/service/BaseService.java
+17
-0
escheduler-api/src/main/java/cn/escheduler/api/service/LoggerService.java
...rc/main/java/cn/escheduler/api/service/LoggerService.java
+8
-7
escheduler-api/src/main/java/cn/escheduler/api/service/ProjectService.java
...c/main/java/cn/escheduler/api/service/ProjectService.java
+0
-6
escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java
...main/java/cn/escheduler/api/service/ResourcesService.java
+7
-8
escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java
...rc/main/java/cn/escheduler/api/service/TenantService.java
+2
-8
escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java
...src/main/java/cn/escheduler/api/service/UsersService.java
+22
-9
escheduler-api/src/main/java/cn/escheduler/api/service/WorkerGroupService.java
...in/java/cn/escheduler/api/service/WorkerGroupService.java
+1
-1
escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java
...src/main/java/cn/escheduler/common/utils/HadoopUtils.java
+14
-4
未找到文件。
escheduler-api/src/main/java/cn/escheduler/api/service/BaseService.java
浏览文件 @
fb5c8646
...
...
@@ -20,6 +20,7 @@ import cn.escheduler.api.enums.Status;
import
cn.escheduler.api.utils.Constants
;
import
cn.escheduler.api.utils.Result
;
import
cn.escheduler.common.enums.UserType
;
import
cn.escheduler.common.utils.HadoopUtils
;
import
cn.escheduler.dao.model.User
;
import
org.apache.commons.lang3.StringUtils
;
...
...
@@ -110,4 +111,20 @@ public class BaseService {
return
null
;
}
/**
* create tenant dir if not exists
* @param tenantCode
* @throws Exception
*/
protected
void
createTenantDirIfNotExists
(
String
tenantCode
)
throws
Exception
{
String
resourcePath
=
HadoopUtils
.
getHdfsResDir
(
tenantCode
);
String
udfsPath
=
HadoopUtils
.
getHdfsUdfDir
(
tenantCode
);
/**
* init resource path and udf path
*/
HadoopUtils
.
getInstance
().
mkdir
(
resourcePath
);
HadoopUtils
.
getInstance
().
mkdir
(
udfsPath
);
}
}
escheduler-api/src/main/java/cn/escheduler/api/service/LoggerService.java
浏览文件 @
fb5c8646
...
...
@@ -49,21 +49,22 @@ public class LoggerService {
*/
public
Result
queryLog
(
int
taskInstId
,
int
skipLineNum
,
int
limit
)
{
TaskInstance
taskInstance
=
processDao
.
findTaskInstanceById
(
taskInstId
);
String
host
=
taskInstance
.
getHost
();
if
(
StringUtils
.
isEmpty
(
host
)){
return
new
Result
(
Status
.
TASK_INSTANCE_HOST_NOT_FOUND
.
getCode
(),
Status
.
TASK_INSTANCE_HOST_NOT_FOUND
.
getMsg
());
}
logger
.
info
(
"log host : {} , logPath : {} , logServer port : {}"
,
host
,
taskInstance
.
getLogPath
(),
Constants
.
RPC_PORT
);
Result
result
=
new
Result
(
Status
.
SUCCESS
.
getCode
(),
Status
.
SUCCESS
.
getMsg
());
if
(
host
!=
null
){
LogClient
logClient
=
new
LogClient
(
host
,
Constants
.
RPC_PORT
);
String
log
=
logClient
.
rollViewLog
(
taskInstance
.
getLogPath
(),
skipLineNum
,
limit
);
result
.
setData
(
log
);
logger
.
info
(
log
);
}
logger
.
info
(
"log host : {} , logPath : {} , logServer port : {}"
,
host
,
taskInstance
.
getLogPath
(),
Constants
.
RPC_PORT
);
LogClient
logClient
=
new
LogClient
(
host
,
Constants
.
RPC_PORT
);
String
log
=
logClient
.
rollViewLog
(
taskInstance
.
getLogPath
(),
skipLineNum
,
limit
);
result
.
setData
(
log
);
logger
.
info
(
log
);
return
result
;
}
...
...
escheduler-api/src/main/java/cn/escheduler/api/service/ProjectService.java
浏览文件 @
fb5c8646
...
...
@@ -45,12 +45,6 @@ public class ProjectService extends BaseService{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
ProjectService
.
class
);
@Autowired
private
UserMapper
userMapper
;
@Autowired
private
UsersService
userService
;
@Autowired
private
ProjectMapper
projectMapper
;
...
...
escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java
浏览文件 @
fb5c8646
...
...
@@ -339,19 +339,18 @@ public class ResourcesService extends BaseService {
String
resourcePath
=
""
;
if
(
type
.
equals
(
ResourceType
.
FILE
))
{
hdfsFilename
=
HadoopUtils
.
getHdfsFilename
(
tenantCode
,
name
);
resourcePath
=
HadoopUtils
.
getHdfsDir
(
tenantCode
);
resourcePath
=
HadoopUtils
.
getHdfs
Res
Dir
(
tenantCode
);
}
else
if
(
type
.
equals
(
ResourceType
.
UDF
))
{
hdfsFilename
=
HadoopUtils
.
getHdfsUdfFilename
(
tenantCode
,
name
);
resourcePath
=
HadoopUtils
.
getHdfsUdfDir
(
tenantCode
);
}
try
{
if
(
HadoopUtils
.
getInstance
().
exists
(
resourcePath
))
{
cn
.
escheduler
.
api
.
utils
.
FileUtils
.
copyFile
(
file
,
localFilename
);
HadoopUtils
.
getInstance
().
copyLocalToHdfs
(
localFilename
,
hdfsFilename
,
true
,
true
);
}
else
{
logger
.
error
(
"{} is not exist"
,
resourcePath
);
return
false
;
// if tenant dir not exists
if
(!
HadoopUtils
.
getInstance
().
exists
(
resourcePath
))
{
createTenantDirIfNotExists
(
tenantCode
);
}
cn
.
escheduler
.
api
.
utils
.
FileUtils
.
copyFile
(
file
,
localFilename
);
HadoopUtils
.
getInstance
().
copyLocalToHdfs
(
localFilename
,
hdfsFilename
,
true
,
true
);
}
catch
(
Exception
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
return
false
;
...
...
@@ -678,7 +677,7 @@ public class ResourcesService extends BaseService {
// get file hdfs path
hdfsFileName
=
HadoopUtils
.
getHdfsFilename
(
tenantCode
,
resourceName
);
String
resourcePath
=
HadoopUtils
.
getHdfsDir
(
tenantCode
);
String
resourcePath
=
HadoopUtils
.
getHdfs
Res
Dir
(
tenantCode
);
logger
.
info
(
"resource hdfs path is {} "
,
hdfsFileName
);
HadoopUtils
hadoopUtils
=
HadoopUtils
.
getInstance
();
...
...
escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java
浏览文件 @
fb5c8646
...
...
@@ -97,13 +97,7 @@ public class TenantService extends BaseService{
// if hdfs startup
if
(
PropertyUtils
.
getResUploadStartupState
()){
String
resourcePath
=
HadoopUtils
.
getHdfsDataBasePath
()
+
"/"
+
tenantCode
+
"/resources"
;
String
udfsPath
=
HadoopUtils
.
getHdfsUdfDir
(
tenantCode
);
/**
* init resource path and udf path
*/
HadoopUtils
.
getInstance
().
mkdir
(
resourcePath
);
HadoopUtils
.
getInstance
().
mkdir
(
udfsPath
);
createTenantDirIfNotExists
(
tenantCode
);
}
putMsg
(
result
,
Status
.
SUCCESS
);
...
...
@@ -240,7 +234,7 @@ public class TenantService extends BaseService{
String
tenantPath
=
HadoopUtils
.
getHdfsDataBasePath
()
+
"/"
+
tenant
.
getTenantCode
();
if
(
HadoopUtils
.
getInstance
().
exists
(
tenantPath
)){
String
resourcePath
=
HadoopUtils
.
getHdfsDir
(
tenant
.
getTenantCode
());
String
resourcePath
=
HadoopUtils
.
getHdfs
Res
Dir
(
tenant
.
getTenantCode
());
FileStatus
[]
fileStatus
=
HadoopUtils
.
getInstance
().
listFileStatus
(
resourcePath
);
if
(
fileStatus
.
length
>
0
)
{
putMsg
(
result
,
Status
.
HDFS_TERANT_RESOURCES_FILE_EXISTS
);
...
...
escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java
浏览文件 @
fb5c8646
...
...
@@ -124,10 +124,13 @@ public class UsersService extends BaseService {
userMapper
.
insert
(
user
);
Tenant
tenant
=
tenantMapper
.
queryById
(
tenantId
);
//
if hdfs
startup
//
resource upload
startup
if
(
PropertyUtils
.
getResUploadStartupState
()){
String
userPath
=
HadoopUtils
.
getHdfsDataBasePath
()
+
"/"
+
tenant
.
getTenantCode
()
+
"/home/"
+
user
.
getId
();
// if tenant not exists
if
(!
HadoopUtils
.
getInstance
().
exists
(
HadoopUtils
.
getHdfsTenantDir
(
tenant
.
getTenantCode
()))){
createTenantDirIfNotExists
(
tenant
.
getTenantCode
());
}
String
userPath
=
HadoopUtils
.
getHdfsUserDir
(
tenant
.
getTenantCode
(),
user
.
getId
());
HadoopUtils
.
getInstance
().
mkdir
(
userPath
);
}
...
...
@@ -247,11 +250,12 @@ public class UsersService extends BaseService {
// if hdfs startup
if
(
PropertyUtils
.
getResUploadStartupState
()
&&
oldTenant
!=
null
){
String
newTenantCode
=
newTenant
.
getTenantCode
();
String
oldResourcePath
=
HadoopUtils
.
getHdfs
DataBasePath
()
+
"/"
+
oldTenant
.
getTenantCode
()
+
"/resources"
;
String
oldResourcePath
=
HadoopUtils
.
getHdfs
ResDir
(
oldTenant
.
getTenantCode
())
;
String
oldUdfsPath
=
HadoopUtils
.
getHdfsUdfDir
(
oldTenant
.
getTenantCode
());
// if old tenant dir exists
if
(
HadoopUtils
.
getInstance
().
exists
(
oldResourcePath
)){
String
newResourcePath
=
HadoopUtils
.
getHdfs
DataBasePath
()
+
"/"
+
newTenantCode
+
"/resources"
;
String
newResourcePath
=
HadoopUtils
.
getHdfs
ResDir
(
newTenantCode
)
;
String
newUdfsPath
=
HadoopUtils
.
getHdfsUdfDir
(
newTenantCode
);
//file resources list
...
...
@@ -271,13 +275,22 @@ public class UsersService extends BaseService {
}
//Delete the user from the old tenant directory
String
oldUserPath
=
HadoopUtils
.
getHdfs
DataBasePath
()
+
"/"
+
oldTenant
.
getTenantCode
()
+
"/home/"
+
userId
;
String
oldUserPath
=
HadoopUtils
.
getHdfs
UserDir
(
oldTenant
.
getTenantCode
(),
userId
)
;
HadoopUtils
.
getInstance
().
delete
(
oldUserPath
,
true
);
}
else
{
// if old tenant dir not exists , create
createTenantDirIfNotExists
(
oldTenant
.
getTenantCode
());
}
if
(
HadoopUtils
.
getInstance
().
exists
(
HadoopUtils
.
getHdfsTenantDir
(
newTenant
.
getTenantCode
()))){
//create user in the new tenant directory
String
newUserPath
=
HadoopUtils
.
getHdfsUserDir
(
newTenant
.
getTenantCode
(),
user
.
getId
());
HadoopUtils
.
getInstance
().
mkdir
(
newUserPath
);
}
else
{
// if new tenant dir not exists , create
createTenantDirIfNotExists
(
newTenant
.
getTenantCode
());
}
//create user in the new tenant directory
String
newUserPath
=
HadoopUtils
.
getHdfsDataBasePath
()
+
"/"
+
newTenant
.
getTenantCode
()
+
"/home/"
+
user
.
getId
();
HadoopUtils
.
getInstance
().
mkdir
(
newUserPath
);
}
}
user
.
setTenantId
(
tenantId
);
...
...
escheduler-api/src/main/java/cn/escheduler/api/service/WorkerGroupService.java
浏览文件 @
fb5c8646
...
...
@@ -136,7 +136,7 @@ public class WorkerGroupService extends BaseService {
Map
<
String
,
Object
>
result
=
new
HashMap
<>(
5
);
int
delete
=
workerGroupMapper
.
deleteById
(
id
);
workerGroupMapper
.
deleteById
(
id
);
putMsg
(
result
,
Status
.
SUCCESS
);
return
result
;
}
...
...
escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java
浏览文件 @
fb5c8646
...
...
@@ -410,12 +410,22 @@ public class HadoopUtils implements Closeable {
* @param tenantCode tenant code
* @return hdfs resource dir
*/
public
static
String
getHdfsDir
(
String
tenantCode
)
{
public
static
String
getHdfs
Res
Dir
(
String
tenantCode
)
{
return
String
.
format
(
"%s/resources"
,
getHdfsTenantDir
(
tenantCode
));
}
/**
* get udf dir on hdfs
* hdfs user dir
*
* @param tenantCode tenant code
* @return hdfs resource dir
*/
public
static
String
getHdfsUserDir
(
String
tenantCode
,
int
userId
)
{
return
String
.
format
(
"%s/home/%d"
,
getHdfsTenantDir
(
tenantCode
),
userId
);
}
/**
* hdfs udf dir
*
* @param tenantCode tenant code
* @return get udf dir on hdfs
...
...
@@ -432,7 +442,7 @@ public class HadoopUtils implements Closeable {
* @return get absolute path and name for file on hdfs
*/
public
static
String
getHdfsFilename
(
String
tenantCode
,
String
filename
)
{
return
String
.
format
(
"%s/%s"
,
getHdfsDir
(
tenantCode
),
filename
);
return
String
.
format
(
"%s/%s"
,
getHdfs
Res
Dir
(
tenantCode
),
filename
);
}
/**
...
...
@@ -449,7 +459,7 @@ public class HadoopUtils implements Closeable {
/**
* @return file directory of tenants on hdfs
*/
p
rivate
static
String
getHdfsTenantDir
(
String
tenantCode
)
{
p
ublic
static
String
getHdfsTenantDir
(
String
tenantCode
)
{
return
String
.
format
(
"%s/%s"
,
getHdfsDataBasePath
(),
tenantCode
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录