Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
DolphinScheduler
提交
5d34d9e2
DolphinScheduler
项目概览
apache
/
DolphinScheduler
上一次同步 1 年多
通知
704
Star
9572
Fork
3514
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
5d34d9e2
编写于
7月 04, 2022
作者:
K
Kerwin
提交者:
GitHub
7月 04, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
cherry pick #9107 (#10756)
上级
367b29b7
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
221 addition
and
34 deletion
+221
-34
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
...lphinscheduler/api/service/impl/ResourcesServiceImpl.java
+78
-16
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java
...dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java
+7
-1
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ResourceDao.java
.../org/apache/dolphinscheduler/dao/upgrade/ResourceDao.java
+94
-2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
...a/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
+42
-15
未找到文件。
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
浏览文件 @
5d34d9e2
...
...
@@ -17,13 +17,10 @@
package
org.apache.dolphinscheduler.api.service.impl
;
import
com.baomidou.mybatisplus.core.metadata.IPage
;
import
com.baomidou.mybatisplus.extension.plugins.pagination.Page
;
import
com.fasterxml.jackson.databind.SerializationFeature
;
import
com.google.common.io.Files
;
import
org.apache.commons.beanutils.BeanMap
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.lang.StringUtils
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
ALIAS
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
CONTENT
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
JAR
;
import
org.apache.dolphinscheduler.api.dto.resources.ResourceComponent
;
import
org.apache.dolphinscheduler.api.dto.resources.filter.ResourceFilter
;
import
org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor
;
...
...
@@ -40,10 +37,38 @@ import org.apache.dolphinscheduler.common.utils.FileUtils;
import
org.apache.dolphinscheduler.common.utils.HadoopUtils
;
import
org.apache.dolphinscheduler.common.utils.JSONUtils
;
import
org.apache.dolphinscheduler.common.utils.PropertyUtils
;
import
org.apache.dolphinscheduler.dao.entity.*
;
import
org.apache.dolphinscheduler.dao.mapper.*
;
import
org.apache.dolphinscheduler.dao.entity.Resource
;
import
org.apache.dolphinscheduler.dao.entity.ResourcesUser
;
import
org.apache.dolphinscheduler.dao.entity.Tenant
;
import
org.apache.dolphinscheduler.dao.entity.UdfFunc
;
import
org.apache.dolphinscheduler.dao.entity.User
;
import
org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper
;
import
org.apache.dolphinscheduler.dao.mapper.ResourceMapper
;
import
org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper
;
import
org.apache.dolphinscheduler.dao.mapper.TenantMapper
;
import
org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper
;
import
org.apache.dolphinscheduler.dao.mapper.UserMapper
;
import
org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils
;
import
org.apache.dolphinscheduler.spi.enums.ResourceType
;
import
org.apache.commons.beanutils.BeanMap
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.lang.StringUtils
;
import
java.io.IOException
;
import
java.text.MessageFormat
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.Date
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Set
;
import
java.util.UUID
;
import
java.util.regex.Matcher
;
import
java.util.stream.Collectors
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
...
...
@@ -52,13 +77,11 @@ import org.springframework.stereotype.Service;
import
org.springframework.transaction.annotation.Transactional
;
import
org.springframework.web.multipart.MultipartFile
;
import
java.io.IOException
;
import
java.text.MessageFormat
;
import
java.util.*
;
import
java.util.regex.Matcher
;
import
java.util.stream.Collectors
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.*;
import
com.baomidou.mybatisplus.core.metadata.IPage
;
import
com.baomidou.mybatisplus.extension.plugins.pagination.Page
;
import
com.fasterxml.jackson.databind.SerializationFeature
;
import
com.google.common.base.Joiner
;
import
com.google.common.io.Files
;
/**
* resources service impl
...
...
@@ -198,6 +221,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
try
{
resourcesMapper
.
insert
(
resource
);
updateParentResourceSize
(
resource
,
resource
.
getSize
());
putMsg
(
result
,
Status
.
SUCCESS
);
Map
<
Object
,
Object
>
dataMap
=
new
BeanMap
(
resource
);
Map
<
String
,
Object
>
resultMap
=
new
HashMap
<>();
...
...
@@ -221,6 +245,33 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return
result
;
}
/**
* update the folder's size of the resource
*
* @param resource the current resource
* @param size size
*/
private
void
updateParentResourceSize
(
Resource
resource
,
long
size
)
{
if
(
resource
.
getSize
()
>
0
)
{
String
[]
splits
=
resource
.
getFullName
().
split
(
"/"
);
for
(
int
i
=
1
;
i
<
splits
.
length
;
i
++)
{
String
parentFullName
=
Joiner
.
on
(
"/"
).
join
(
Arrays
.
copyOfRange
(
splits
,
0
,
i
));
if
(
StringUtils
.
isNotBlank
(
parentFullName
))
{
List
<
Resource
>
resources
=
resourcesMapper
.
queryResource
(
parentFullName
,
resource
.
getType
().
ordinal
());
if
(
CollectionUtils
.
isNotEmpty
(
resources
))
{
Resource
parentResource
=
resources
.
get
(
0
);
if
(
parentResource
.
getSize
()
+
size
>=
0
)
{
parentResource
.
setSize
(
parentResource
.
getSize
()
+
size
);
}
else
{
parentResource
.
setSize
(
0L
);
}
resourcesMapper
.
updateById
(
parentResource
);
}
}
}
}
}
/**
* check resource is exists
*
...
...
@@ -338,6 +389,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
// updateResource data
Date
now
=
new
Date
();
long
originFileSize
=
resource
.
getSize
();
resource
.
setAlias
(
name
);
resource
.
setFileName
(
name
);
...
...
@@ -423,6 +475,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
throw
new
ServiceException
(
String
.
format
(
"delete resource: %s failed."
,
originFullName
));
}
}
updateParentResourceSize
(
resource
,
resource
.
getSize
()
-
originFileSize
);
return
result
;
}
...
...
@@ -705,11 +759,15 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
String
hdfsFilename
=
HadoopUtils
.
getHdfsFileName
(
resource
.
getType
(),
tenantCode
,
resource
.
getFullName
());
//delete data in database
resourcesMapper
.
selectBatchIds
(
Arrays
.
asList
(
needDeleteResourceIdArray
)).
forEach
(
item
->
{
updateParentResourceSize
(
item
,
item
.
getSize
()
*
-
1
);
});
resourcesMapper
.
deleteIds
(
needDeleteResourceIdArray
);
resourceUserMapper
.
deleteResourceUserArray
(
0
,
needDeleteResourceIdArray
);
//delete file on hdfs
HadoopUtils
.
getInstance
().
delete
(
hdfsFilename
,
true
);
putMsg
(
result
,
Status
.
SUCCESS
);
return
result
;
...
...
@@ -901,6 +959,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
Resource
resource
=
new
Resource
(
pid
,
name
,
fullName
,
false
,
desc
,
name
,
loginUser
.
getId
(),
type
,
content
.
getBytes
().
length
,
now
,
now
);
resourcesMapper
.
insert
(
resource
);
updateParentResourceSize
(
resource
,
resource
.
getSize
());
putMsg
(
result
,
Status
.
SUCCESS
);
Map
<
Object
,
Object
>
dataMap
=
new
BeanMap
(
resource
);
...
...
@@ -995,10 +1054,13 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
if
(
StringUtils
.
isEmpty
(
tenantCode
))
{
return
result
;
}
long
originFileSize
=
resource
.
getSize
();
resource
.
setSize
(
content
.
getBytes
().
length
);
resource
.
setUpdateTime
(
new
Date
());
resourcesMapper
.
updateById
(
resource
);
updateParentResourceSize
(
resource
,
resource
.
getSize
()
-
originFileSize
);
result
=
uploadContentToHdfs
(
resource
.
getFullName
(),
tenantCode
,
content
);
if
(!
result
.
getCode
().
equals
(
Status
.
SUCCESS
.
getCode
()))
{
throw
new
ServiceException
(
result
.
getMsg
());
...
...
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java
浏览文件 @
5d34d9e2
...
...
@@ -75,6 +75,7 @@ public class DolphinSchedulerManager {
logger
.
info
(
"Start initializing the DolphinScheduler manager table structure"
);
upgradeDao
.
initSchema
();
}
public
void
upgradeDolphinScheduler
()
throws
IOException
{
// Gets a list of all upgrades
List
<
String
>
schemaList
=
SchemaUtils
.
getAllSchemaList
();
...
...
@@ -97,12 +98,13 @@ public class DolphinSchedulerManager {
}
// The target version of the upgrade
String
schemaVersion
=
""
;
String
currentVersion
=
version
;
for
(
String
schemaDir
:
schemaList
)
{
schemaVersion
=
schemaDir
.
split
(
"_"
)[
0
];
if
(
SchemaUtils
.
isAGreatVersion
(
schemaVersion
,
version
))
{
logger
.
info
(
"upgrade DolphinScheduler metadata version from {} to {}"
,
version
,
schemaVersion
);
logger
.
info
(
"Begin upgrading DolphinScheduler's table structure"
);
upgradeDao
.
upgradeDolphinScheduler
(
schemaDir
);
upgradeDao
.
upgradeDolphinScheduler
(
schemaDir
);
if
(
"1.3.0"
.
equals
(
schemaVersion
))
{
upgradeDao
.
upgradeDolphinSchedulerWorkerGroup
();
}
else
if
(
"1.3.2"
.
equals
(
schemaVersion
))
{
...
...
@@ -113,6 +115,10 @@ public class DolphinSchedulerManager {
version
=
schemaVersion
;
}
}
if
(
SchemaUtils
.
isAGreatVersion
(
"2.0.6"
,
currentVersion
)
&&
SchemaUtils
.
isAGreatVersion
(
SchemaUtils
.
getSoftVersion
(),
currentVersion
))
{
upgradeDao
.
upgradeDolphinSchedulerResourceFileSize
();
}
}
// Assign the value of the version field in the version table to the version of the product
...
...
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ResourceDao.java
浏览文件 @
5d34d9e2
...
...
@@ -18,14 +18,21 @@
package
org.apache.dolphinscheduler.dao.upgrade
;
import
org.apache.dolphinscheduler.common.utils.ConnectionUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.apache.dolphinscheduler.spi.utils.StringUtils
;
import
java.sql.Connection
;
import
java.sql.PreparedStatement
;
import
java.sql.ResultSet
;
import
java.sql.SQLException
;
import
java.util.Arrays
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Objects
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.google.common.base.Joiner
;
/**
* resource dao
...
...
@@ -65,4 +72,89 @@ public class ResourceDao {
return
resourceMap
;
}
/**
* list all resources by the type
*
* @param conn connection
* @return map that key is full_name and value is the folder's size
*/
private
Map
<
String
,
Long
>
listAllResourcesByFileType
(
Connection
conn
,
int
type
)
{
Map
<
String
,
Long
>
resourceSizeMap
=
new
HashMap
<>();
String
sql
=
String
.
format
(
"SELECT full_name, type, size, is_directory FROM t_ds_resources where type = %d"
,
type
);
ResultSet
rs
=
null
;
PreparedStatement
pstmt
=
null
;
try
{
pstmt
=
conn
.
prepareStatement
(
sql
);
rs
=
pstmt
.
executeQuery
();
while
(
rs
.
next
())
{
String
fullName
=
rs
.
getString
(
"full_name"
);
Boolean
isDirectory
=
rs
.
getBoolean
(
"is_directory"
);
long
fileSize
=
rs
.
getLong
(
"size"
);
if
(
StringUtils
.
isNotBlank
(
fullName
)
&&
!
isDirectory
)
{
String
[]
splits
=
fullName
.
split
(
"/"
);
for
(
int
i
=
1
;
i
<
splits
.
length
;
i
++)
{
String
parentFullName
=
Joiner
.
on
(
"/"
).
join
(
Arrays
.
copyOfRange
(
splits
,
0
,
splits
.
length
-
i
));
if
(
StringUtils
.
isNotEmpty
(
parentFullName
))
{
long
size
=
resourceSizeMap
.
getOrDefault
(
parentFullName
,
0L
);
resourceSizeMap
.
put
(
parentFullName
,
size
+
fileSize
);
}
}
}
}
}
catch
(
Exception
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
throw
new
RuntimeException
(
"sql: "
+
sql
,
e
);
}
finally
{
if
(
Objects
.
nonNull
(
pstmt
))
{
try
{
if
(!
pstmt
.
isClosed
())
{
pstmt
.
close
();
}
}
catch
(
SQLException
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
}
}
return
resourceSizeMap
;
}
/**
* update the folder's size
*
* @param conn connection
*/
public
void
updateResourceFolderSizeByFileType
(
Connection
conn
,
int
type
)
{
Map
<
String
,
Long
>
resourceSizeMap
=
listAllResourcesByFileType
(
conn
,
type
);
String
sql
=
"UPDATE t_ds_resources SET size=? where type=? and full_name=? and is_directory = true"
;
PreparedStatement
pstmt
=
null
;
try
{
pstmt
=
conn
.
prepareStatement
(
sql
);
for
(
Map
.
Entry
<
String
,
Long
>
entry
:
resourceSizeMap
.
entrySet
())
{
pstmt
.
setLong
(
1
,
entry
.
getValue
());
pstmt
.
setInt
(
2
,
type
);
pstmt
.
setString
(
3
,
entry
.
getKey
());
pstmt
.
addBatch
();
}
pstmt
.
executeBatch
();
}
catch
(
Exception
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
throw
new
RuntimeException
(
"sql: "
+
sql
,
e
);
}
finally
{
if
(
Objects
.
nonNull
(
pstmt
))
{
try
{
if
(!
pstmt
.
isClosed
())
{
pstmt
.
close
();
}
}
catch
(
SQLException
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
}
ConnectionUtils
.
releaseResource
(
conn
);
}
}
}
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
浏览文件 @
5d34d9e2
...
...
@@ -17,15 +17,12 @@
package
org.apache.dolphinscheduler.dao.upgrade
;
import
com.fasterxml.jackson.core.type.TypeReference
;
import
com.fasterxml.jackson.databind.JsonNode
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.node.ArrayNode
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.dolphinscheduler.common.Constants
;
import
org.apache.dolphinscheduler.common.enums.*
;
import
org.apache.dolphinscheduler.common.enums.ConditionType
;
import
org.apache.dolphinscheduler.common.enums.Flag
;
import
org.apache.dolphinscheduler.common.enums.Priority
;
import
org.apache.dolphinscheduler.common.enums.TaskType
;
import
org.apache.dolphinscheduler.common.enums.TimeoutFlag
;
import
org.apache.dolphinscheduler.common.process.ResourceInfo
;
import
org.apache.dolphinscheduler.common.task.TaskTimeoutParameter
;
import
org.apache.dolphinscheduler.common.utils.CodeGenerateUtils
;
...
...
@@ -37,12 +34,10 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import
org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog
;
import
org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog
;
import
org.apache.dolphinscheduler.spi.enums.DbType
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.core.io.ClassPathResource
;
import
org.springframework.core.io.Resource
;
import
javax.sql.DataSource
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.lang.StringUtils
;
import
java.io.FileNotFoundException
;
import
java.io.IOException
;
import
java.io.InputStreamReader
;
...
...
@@ -51,9 +46,27 @@ import java.sql.Connection;
import
java.sql.PreparedStatement
;
import
java.sql.ResultSet
;
import
java.sql.SQLException
;
import
java.util.*
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Optional
;
import
java.util.stream.Collectors
;
import
javax.sql.DataSource
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.core.io.ClassPathResource
;
import
org.springframework.core.io.Resource
;
import
com.fasterxml.jackson.core.type.TypeReference
;
import
com.fasterxml.jackson.databind.JsonNode
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.node.ArrayNode
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
public
abstract
class
UpgradeDao
{
public
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
UpgradeDao
.
class
);
private
static
final
String
T_VERSION_NAME
=
"t_escheduler_version"
;
...
...
@@ -150,6 +163,21 @@ public abstract class UpgradeDao {
upgradeDolphinSchedulerDDL
(
schemaDir
,
"dolphinscheduler_ddl_post.sql"
);
}
/**
* upgrade DolphinScheduler to 2.0.6
*/
public
void
upgradeDolphinSchedulerResourceFileSize
()
{
ResourceDao
resourceDao
=
new
ResourceDao
();
try
{
// update the size of the folder that is the type of file.
resourceDao
.
updateResourceFolderSizeByFileType
(
dataSource
.
getConnection
(),
0
);
// update the size of the folder that is the type of udf.
resourceDao
.
updateResourceFolderSizeByFileType
(
dataSource
.
getConnection
(),
1
);
}
catch
(
Exception
ex
)
{
logger
.
error
(
"Failed to upgrade because of failing to update the folder's size of resource files."
);
}
}
/**
* updateProcessDefinitionJsonWorkerGroup
*/
...
...
@@ -344,7 +372,6 @@ public abstract class UpgradeDao {
}
}
/**
* update version
*
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录