Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
ghsby
o2oa
提交
34df7c5a
o2oa
项目概览
ghsby
/
o2oa
落后 Fork 源项目 2880 个版本
Fork自
浙江兰德纵横网络技术股份有限公司 / o2oa
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
o2oa
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
34df7c5a
编写于
6月 30, 2021
作者:
O
o2null
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'feature/hdfs' into 'wrdp'
Feature/hdfs See merge request o2oa/o2oa!4279
上级
f5f2efe0
4c856ccc
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
409 addition
and
157 deletion
+409
-157
o2server/pom.xml
o2server/pom.xml
+12
-3
o2server/x_base_core_project/src/main/java/com/x/base/core/entity/StorageObject.java
...t/src/main/java/com/x/base/core/entity/StorageObject.java
+213
-126
o2server/x_base_core_project/src/main/java/com/x/base/core/entity/StorageProtocol.java
...src/main/java/com/x/base/core/entity/StorageProtocol.java
+1
-1
o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Config.java
.../src/main/java/com/x/base/core/project/config/Config.java
+125
-5
o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/ExternalStorageSource.java
...com/x/base/core/project/config/ExternalStorageSource.java
+24
-22
o2server/x_console/src/main/java/com/x/server/console/Hadoop.java
.../x_console/src/main/java/com/x/server/console/Hadoop.java
+32
-0
o2server/x_console/src/main/java/com/x/server/console/Main.java
...er/x_console/src/main/java/com/x/server/console/Main.java
+2
-0
未找到文件。
o2server/pom.xml
浏览文件 @
34df7c5a
...
...
@@ -366,6 +366,10 @@
<groupId>
com.github.whvcse
</groupId>
<artifactId>
easy-captcha
</artifactId>
</dependency>
<dependency>
<groupId>
org.apache.hadoop
</groupId>
<artifactId>
hadoop-client
</artifactId>
</dependency>
</dependencies>
<build>
...
...
@@ -410,7 +414,7 @@
</compilerArgs>
</configuration>
</plugin>
<!--This plugin's configuration is used to store Eclipse m2e settings
<!--This plugin's configuration is used to store Eclipse m2e settings
only. It has no influence on the Maven build itself. -->
<plugin>
<groupId>
org.eclipse.m2e
</groupId>
...
...
@@ -854,7 +858,7 @@
<artifactId>
javapoet
</artifactId>
<version>
1.11.1
</version>
</dependency>
<!--dependency> <groupId>javax.visrec</groupId> <artifactId>visrec-api</artifactId>
<!--dependency> <groupId>javax.visrec</groupId> <artifactId>visrec-api</artifactId>
<version>20200316</version> </dependency -->
<dependency>
<groupId>
com.github.neuroph
</groupId>
...
...
@@ -886,7 +890,7 @@
<artifactId>
slf4j-api
</artifactId>
<version>
1.7.25
</version>
</dependency>
<!-- dependency> <groupId>cglib</groupId> <artifactId>cglib</artifactId>
<!-- dependency> <groupId>cglib</groupId> <artifactId>cglib</artifactId>
<version>3.2.0</version> </dependency -->
<dependency>
<groupId>
com.alibaba
</groupId>
...
...
@@ -1093,6 +1097,11 @@
<artifactId>
easy-captcha
</artifactId>
<version>
1.6.2
</version>
</dependency>
<dependency>
<groupId>
org.apache.hadoop
</groupId>
<artifactId>
hadoop-client
</artifactId>
<version>
3.3.1
</version>
</dependency>
</dependencies>
</dependencyManagement>
<repositories>
...
...
o2server/x_base_core_project/src/main/java/com/x/base/core/entity/StorageObject.java
浏览文件 @
34df7c5a
...
...
@@ -2,8 +2,10 @@ package com.x.base.core.entity;
import
java.io.ByteArrayInputStream
;
import
java.io.ByteArrayOutputStream
;
import
java.io.FileNotFoundException
;
import
java.io.InputStream
;
import
java.io.OutputStream
;
import
java.io.UnsupportedEncodingException
;
import
java.net.URLEncoder
;
import
java.util.Date
;
import
java.util.Objects
;
...
...
@@ -33,18 +35,18 @@ import com.x.base.core.project.tools.DefaultCharset;
@MappedSuperclass
public
abstract
class
StorageObject
extends
SliceJpaObject
{
private
static
FileSystemManager
FILESYSTEMANAGERINSTANCE
;
private
static
FileSystemManager
fileSystemManagerInstance
;
private
synchronized
FileSystemManager
getFileSystemManager
()
throws
Exception
{
if
(
FILESYSTEMANAGERINSTANCE
==
null
)
{
private
synchronized
FileSystemManager
getFileSystemManager
()
throws
FileSystem
Exception
{
if
(
fileSystemManagerInstance
==
null
)
{
StandardFileSystemManager
fs
=
new
StandardFileSystemManager
();
fs
.
setFilesCache
(
new
NullFilesCache
());
fs
.
setCacheStrategy
(
CacheStrategy
.
ON_RESOLVE
);
fs
.
init
();
FILESYSTEMANAGERINSTANCE
=
fs
;
fileSystemManagerInstance
=
fs
;
}
return
FILESYSTEMANAGERINSTANCE
;
return
fileSystemManagerInstance
;
}
private
static
final
long
serialVersionUID
=
7823729771901802653L
;
...
...
@@ -53,33 +55,35 @@ public abstract class StorageObject extends SliceJpaObject {
public
static
final
String
DELETE_OPERATE
=
"delete"
;
abstract
public
String
path
()
throws
Exception
;
public
abstract
String
path
()
throws
Exception
;
abstract
public
String
getStorage
();
public
abstract
String
getStorage
();
abstract
public
void
setStorage
(
String
storage
);
public
abstract
void
setStorage
(
String
storage
);
abstract
public
Long
getLength
();
public
abstract
Long
getLength
();
abstract
public
void
setLength
(
Long
length
);
public
abstract
void
setLength
(
Long
length
);
abstract
public
String
getName
();
public
abstract
String
getName
();
abstract
public
void
setName
(
String
name
);
public
abstract
void
setName
(
String
name
);
abstract
public
String
getExtension
();
public
abstract
String
getExtension
();
abstract
public
void
setExtension
(
String
extension
);
public
abstract
void
setExtension
(
String
extension
);
abstract
public
Date
getLastUpdateTime
();
public
abstract
Date
getLastUpdateTime
();
abstract
public
void
setLastUpdateTime
(
Date
lastUpdateTime
);
public
abstract
void
setLastUpdateTime
(
Date
lastUpdateTime
);
abstract
public
Boolean
getDeepPath
();
public
abstract
Boolean
getDeepPath
();
abstract
public
void
setDeepPath
(
Boolean
deepPath
);
public
abstract
void
setDeepPath
(
Boolean
deepPath
);
public
String
path
(
String
operate
)
throws
Exception
{
return
this
.
path
();}
public
String
path
(
String
operate
)
throws
Exception
{
return
this
.
path
();
}
@Transient
private
byte
[]
bytes
;
...
...
@@ -148,44 +152,16 @@ public abstract class StorageObject extends SliceJpaObject {
/** 更新Content内容 */
public
Long
updateContent
(
StorageMapping
mapping
,
byte
[]
bytes
)
throws
Exception
{
long
length
=
-
1L
;
FileSystemManager
manager
=
this
.
getFileSystemManager
();
String
prefix
=
this
.
getPrefix
(
mapping
);
String
path
=
this
.
path
();
if
(
StringUtils
.
isEmpty
(
path
))
{
throw
new
Exception
(
"path can not be empty."
);
}
FileSystemOptions
options
=
this
.
getOptions
(
mapping
);
/*
* 需要进行两次判断,在前端使用nginx分发的情况下,可能同时触发多个文件的上传,多个文件同时上传可能会同时创建文件的存储目录,会在后台导致错误
* org.apache.commons.vfs2.FileSystemException: Could not create folder
* "ftp://processPlatform:***@o2.server01.com:20040/20200601/1beb018a-5009-4baa-a9ef-7e903f9d48ef".
* 这种情况下再次发起请求尝试获取文件可以解决这个问题.
*/
for
(
int
i
=
0
;
i
<
2
;
i
++)
{
try
(
FileObject
fo
=
manager
.
resolveFile
(
prefix
+
PATHSEPARATOR
+
path
,
options
);
OutputStream
output
=
fo
.
getContent
().
getOutputStream
())
{
length
=
IOUtils
.
copyLarge
(
new
ByteArrayInputStream
(
bytes
),
output
);
this
.
setLength
(
length
);
if
((!
Objects
.
equals
(
StorageProtocol
.
webdav
,
mapping
.
getProtocol
()))
&&
(!
Objects
.
equals
(
StorageProtocol
.
sftp
,
mapping
.
getProtocol
())))
{
/* webdav关闭会试图去关闭commons.httpClient */
manager
.
closeFileSystem
(
fo
.
getFileSystem
());
}
this
.
setStorage
(
mapping
.
getName
());
this
.
setLastUpdateTime
(
new
Date
());
break
;
}
catch
(
FileSystemException
fse
)
{
if
(
i
!=
0
)
{
// 第一次错误先跳过,直接执行第二次.如果第二次错误那么报错.
throw
fse
;
}
}
if
(
Objects
.
equals
(
StorageProtocol
.
hdfs
,
mapping
.
getProtocol
()))
{
return
this
.
hdfsUpdateContent
(
mapping
,
bytes
);
}
else
{
return
this
.
vfsUpdateContent
(
mapping
,
bytes
);
}
return
length
;
}
/
** 读出内容 */
/
/ 读出内容
public
byte
[]
readContent
(
StorageMapping
mapping
)
throws
Exception
{
try
(
ByteArrayOutputStream
baos
=
new
ByteArrayOutputStream
())
{
readContent
(
mapping
,
baos
);
...
...
@@ -193,113 +169,70 @@ public abstract class StorageObject extends SliceJpaObject {
}
}
/
** 将内容流出到output */
/
/ 将内容流出到output
public
Long
readContent
(
StorageMapping
mapping
,
OutputStream
output
)
throws
Exception
{
long
length
=
-
1L
;
FileSystemManager
manager
=
this
.
getFileSystemManager
();
String
prefix
=
this
.
getPrefix
(
mapping
);
String
path
=
this
.
path
();
FileSystemOptions
options
=
this
.
getOptions
(
mapping
);
try
(
FileObject
fo
=
manager
.
resolveFile
(
prefix
+
PATHSEPARATOR
+
path
,
options
))
{
if
(
fo
.
exists
()
&&
fo
.
isFile
())
{
try
(
InputStream
input
=
fo
.
getContent
().
getInputStream
())
{
length
=
IOUtils
.
copyLarge
(
input
,
output
);
}
}
else
{
throw
new
Exception
(
fo
.
getPublicURIString
()
+
" not existed, object:"
+
this
.
toString
()
+
"."
);
}
// manager.closeFileSystem(fo.getFileSystem());
if
(!
Objects
.
equals
(
StorageProtocol
.
webdav
,
mapping
.
getProtocol
()))
{
/* webdav关闭会试图去关闭commons.httpClient */
manager
.
closeFileSystem
(
fo
.
getFileSystem
());
}
if
(
Objects
.
equals
(
mapping
.
getProtocol
(),
StorageProtocol
.
hdfs
))
{
return
hdfsReadContent
(
mapping
,
output
);
}
else
{
return
vfsReadContent
(
mapping
,
output
);
}
return
length
;
}
/
** 检查是否存在内容 */
/
/ 检查是否存在内容
public
boolean
existContent
(
StorageMapping
mapping
)
throws
Exception
{
FileSystemManager
manager
=
this
.
getFileSystemManager
();
String
prefix
=
this
.
getPrefix
(
mapping
);
String
path
=
this
.
path
();
FileSystemOptions
options
=
this
.
getOptions
(
mapping
);
try
(
FileObject
fo
=
manager
.
resolveFile
(
prefix
+
PATHSEPARATOR
+
path
,
options
))
{
if
(
fo
.
exists
()
&&
fo
.
isFile
())
{
return
true
;
}
return
false
;
if
(
Objects
.
equals
(
mapping
.
getProtocol
(),
StorageProtocol
.
hdfs
))
{
return
hdfsExistContent
(
mapping
);
}
else
{
return
vfsExistContent
(
mapping
);
}
}
/** 删除内容,同时判断上一级目录(只判断一级)是否为空,为空则删除上一级目录 */
public
void
deleteContent
(
StorageMapping
mapping
)
throws
Exception
{
FileSystemManager
manager
=
this
.
getFileSystemManager
();
String
prefix
=
this
.
getPrefix
(
mapping
);
String
path
=
this
.
path
(
DELETE_OPERATE
);
FileSystemOptions
options
=
this
.
getOptions
(
mapping
);
try
(
FileObject
fo
=
manager
.
resolveFile
(
prefix
+
PATHSEPARATOR
+
path
,
options
))
{
if
(
fo
.
exists
()
&&
fo
.
isFile
())
{
fo
.
delete
();
if
((!
StringUtils
.
startsWith
(
path
,
PATHSEPARATOR
))
&&
(
StringUtils
.
contains
(
path
,
PATHSEPARATOR
)))
{
FileObject
parent
=
fo
.
getParent
();
if
((
null
!=
parent
)
&&
parent
.
exists
()
&&
parent
.
isFolder
())
{
if
(
parent
.
getChildren
().
length
==
0
)
{
parent
.
delete
();
}
}
}
}
// manager.closeFileSystem(fo.getFileSystem());
if
(!
Objects
.
equals
(
StorageProtocol
.
webdav
,
mapping
.
getProtocol
()))
{
/* webdav关闭会试图去关闭commons.httpClient */
manager
.
closeFileSystem
(
fo
.
getFileSystem
());
}
if
(
Objects
.
equals
(
mapping
.
getProtocol
(),
StorageProtocol
.
hdfs
))
{
hdfsDeleteContent
(
mapping
);
}
else
{
vfsDeleteContent
(
mapping
);
}
}
/
* 取得完整访问路径的前半部分 */
private
String
getPrefix
(
StorageMapping
mapping
)
throws
Exception
{
/
/ 取得完整访问路径的前半部分
private
String
getPrefix
(
StorageMapping
mapping
)
throws
IllegalStateException
,
UnsupportedEncoding
Exception
{
String
prefix
=
""
;
if
(
null
==
mapping
.
getProtocol
())
{
throw
new
Exception
(
"storage protocol is null."
);
throw
new
IllegalState
Exception
(
"storage protocol is null."
);
}
switch
(
mapping
.
getProtocol
())
{
// bzip2,file, ftp, ftps, gzip, hdfs, http, https, jar, ram, res, sftp,
// tar, temp, webdav, zip, cifs, mime;
case
ftp:
// ftp://[ username[: password]@] hostname[: port][ relative-path]
prefix
=
"ftp://"
+
URLEncoder
.
encode
(
mapping
.
getUsername
(),
DefaultCharset
.
name
)
+
":"
+
URLEncoder
.
encode
(
mapping
.
getPassword
(),
DefaultCharset
.
name
)
+
"@"
+
mapping
.
getHost
()
+
":"
+
mapping
.
getPort
();
break
;
case
ftps:
// ftps://[ username[: password]@] hostname[: port][ relative-path]
prefix
=
"ftps://"
+
URLEncoder
.
encode
(
mapping
.
getUsername
(),
DefaultCharset
.
name
)
+
":"
+
URLEncoder
.
encode
(
mapping
.
getPassword
(),
DefaultCharset
.
name
)
+
"@"
+
mapping
.
getHost
()
+
":"
+
mapping
.
getPort
();
break
;
case
sftp:
// ftps://[ username[: password]@] hostname[: port][ relative-path]
prefix
=
"sftp://"
+
URLEncoder
.
encode
(
mapping
.
getUsername
(),
DefaultCharset
.
name
)
+
":"
+
URLEncoder
.
encode
(
mapping
.
getPassword
(),
DefaultCharset
.
name
)
+
"@"
+
mapping
.
getHost
()
+
":"
+
mapping
.
getPort
();
break
;
case
cifs:
// smb://[ username[: password]@] hostname[: port][ absolute-path]
prefix
=
"smb://"
+
URLEncoder
.
encode
(
mapping
.
getUsername
(),
DefaultCharset
.
name
)
+
":"
+
URLEncoder
.
encode
(
mapping
.
getPassword
(),
DefaultCharset
.
name
)
+
"@"
+
mapping
.
getHost
()
+
":"
+
mapping
.
getPort
();
break
;
case
webdav:
// webdav://[ username[: password]@] hostname[: port][ absolute-path]
prefix
=
"webdav://"
+
URLEncoder
.
encode
(
mapping
.
getUsername
(),
DefaultCharset
.
name
)
+
":"
+
URLEncoder
.
encode
(
mapping
.
getPassword
(),
DefaultCharset
.
name
)
+
"@"
+
mapping
.
getHost
()
+
":"
+
mapping
.
getPort
();
break
;
case
file:
// [file://] absolute-path
prefix
=
"file://"
;
break
;
case
hdfs:
// 路径不采用带用户名的homeDirctory,直接返回
return
StringUtils
.
isEmpty
(
mapping
.
getPrefix
())
?
"/"
:
(
"/"
+
mapping
.
getPrefix
());
default
:
break
;
}
...
...
@@ -309,15 +242,13 @@ public abstract class StorageObject extends SliceJpaObject {
private
FileSystemOptions
getOptions
(
StorageMapping
mapping
)
throws
Exception
{
FileSystemOptions
opts
=
new
FileSystemOptions
();
if
(
null
==
mapping
.
getProtocol
())
{
throw
new
Exception
(
"storage protocol is null."
);
throw
new
IllegalState
Exception
(
"storage protocol is null."
);
}
switch
(
mapping
.
getProtocol
())
{
// bzip2,file, ftp, ftps, gzip, hdfs, http, https, jar, ram, res, sftp,
// tar, temp, webdav, zip, cifs, mime;
case
sftp:
FtpFileSystemConfigBuilder
sftpBuilder
=
FtpFileSystemConfigBuilder
.
getInstance
();
sftpBuilder
.
setPassiveMode
(
opts
,
Config
.
vfs
().
getSftp
().
getPassive
());
/
** 强制不校验IP */
/
/ 强制不校验IP
sftpBuilder
.
setRemoteVerification
(
opts
,
false
);
sftpBuilder
.
setFileType
(
opts
,
FtpFileType
.
BINARY
);
sftpBuilder
.
setConnectTimeout
(
opts
,
10000
);
...
...
@@ -341,9 +272,7 @@ public abstract class StorageObject extends SliceJpaObject {
* java.net.Socket.connect(Socket.java:589)
*/
ftpBuilder
.
setPassiveMode
(
opts
,
Config
.
vfs
().
getFtp
().
getPassive
());
// builder.setPassiveMode(opts, false);
// builder.setPassiveMode(opts, true);
/** 强制不校验IP */
// 强制不校验IP
ftpBuilder
.
setRemoteVerification
(
opts
,
false
);
// FtpFileType.BINARY is the default
ftpBuilder
.
setFileType
(
opts
,
FtpFileType
.
BINARY
);
...
...
@@ -354,7 +283,7 @@ public abstract class StorageObject extends SliceJpaObject {
case
ftps:
FtpsFileSystemConfigBuilder
ftpsBuilder
=
FtpsFileSystemConfigBuilder
.
getInstance
();
ftpsBuilder
.
setPassiveMode
(
opts
,
Config
.
vfs
().
getFtp
().
getPassive
());
/
** 强制不校验IP */
/
/ 强制不校验IP
ftpsBuilder
.
setRemoteVerification
(
opts
,
false
);
// FtpFileType.BINARY is the default
ftpsBuilder
.
setFileType
(
opts
,
FtpFileType
.
BINARY
);
...
...
@@ -373,7 +302,6 @@ public abstract class StorageObject extends SliceJpaObject {
webdavBuilder
.
setMaxConnectionsPerHost
(
opts
,
200
);
webdavBuilder
.
setMaxTotalConnections
(
opts
,
200
);
webdavBuilder
.
setFollowRedirect
(
opts
,
true
);
// webdavBuilder.setVersioning(opts, true);
break
;
case
file:
break
;
...
...
@@ -383,4 +311,163 @@ public abstract class StorageObject extends SliceJpaObject {
return
opts
;
}
private
Long
vfsUpdateContent
(
StorageMapping
mapping
,
byte
[]
bytes
)
throws
Exception
{
String
prefix
=
this
.
getPrefix
(
mapping
);
String
path
=
this
.
path
();
if
(
StringUtils
.
isEmpty
(
path
))
{
throw
new
IllegalStateException
(
"path can not be empty."
);
}
FileSystemOptions
options
=
this
.
getOptions
(
mapping
);
long
length
=
-
1L
;
FileSystemManager
manager
=
this
.
getFileSystemManager
();
/*
* 需要进行两次判断,在前端使用nginx分发的情况下,可能同时触发多个文件的上传,多个文件同时上传可能会同时创建文件的存储目录,会在后台导致错误
* org.apache.commons.vfs2.FileSystemException: Could not create folder
* "ftp://processPlatform:***@o2.server01.com:20040/20200601/1beb018a-5009-4baa-a9ef-7e903f9d48ef".
* 这种情况下再次发起请求尝试获取文件可以解决这个问题.
*/
for
(
int
i
=
0
;
i
<
2
;
i
++)
{
try
(
FileObject
fo
=
manager
.
resolveFile
(
prefix
+
PATHSEPARATOR
+
path
,
options
);
OutputStream
output
=
fo
.
getContent
().
getOutputStream
())
{
length
=
IOUtils
.
copyLarge
(
new
ByteArrayInputStream
(
bytes
),
output
);
this
.
setLength
(
length
);
if
((!
Objects
.
equals
(
StorageProtocol
.
webdav
,
mapping
.
getProtocol
()))
&&
(!
Objects
.
equals
(
StorageProtocol
.
sftp
,
mapping
.
getProtocol
())))
{
/* webdav关闭会试图去关闭commons.httpClient */
manager
.
closeFileSystem
(
fo
.
getFileSystem
());
}
this
.
setStorage
(
mapping
.
getName
());
this
.
setLastUpdateTime
(
new
Date
());
break
;
}
catch
(
FileSystemException
fse
)
{
if
(
i
!=
0
)
{
// 第一次错误先跳过,直接执行第二次.如果第二次错误那么报错.
throw
fse
;
}
}
}
return
length
;
}
// vfs读取数据
private
Long
vfsReadContent
(
StorageMapping
mapping
,
OutputStream
output
)
throws
Exception
{
long
length
=
-
1L
;
FileSystemManager
manager
=
this
.
getFileSystemManager
();
String
prefix
=
this
.
getPrefix
(
mapping
);
String
path
=
this
.
path
();
FileSystemOptions
options
=
this
.
getOptions
(
mapping
);
try
(
FileObject
fo
=
manager
.
resolveFile
(
prefix
+
PATHSEPARATOR
+
path
,
options
))
{
if
(
fo
.
exists
()
&&
fo
.
isFile
())
{
try
(
InputStream
input
=
fo
.
getContent
().
getInputStream
())
{
length
=
IOUtils
.
copyLarge
(
input
,
output
);
}
}
else
{
throw
new
FileNotFoundException
(
fo
.
getPublicURIString
()
+
" not existed, object:"
+
this
.
toString
()
+
"."
);
}
if
(!
Objects
.
equals
(
StorageProtocol
.
webdav
,
mapping
.
getProtocol
()))
{
/* webdav关闭会试图去关闭commons.httpClient */
manager
.
closeFileSystem
(
fo
.
getFileSystem
());
}
}
return
length
;
}
private
boolean
vfsExistContent
(
StorageMapping
mapping
)
throws
Exception
{
FileSystemManager
manager
=
this
.
getFileSystemManager
();
String
prefix
=
this
.
getPrefix
(
mapping
);
String
path
=
this
.
path
();
FileSystemOptions
options
=
this
.
getOptions
(
mapping
);
try
(
FileObject
fo
=
manager
.
resolveFile
(
prefix
+
PATHSEPARATOR
+
path
,
options
))
{
return
(
fo
.
exists
()
&&
fo
.
isFile
());
}
}
// 删除内容,同时判断上一级目录(只判断一级)是否为空,为空则删除上一级目录
private
void
vfsDeleteContent
(
StorageMapping
mapping
)
throws
Exception
{
FileSystemManager
manager
=
this
.
getFileSystemManager
();
String
prefix
=
this
.
getPrefix
(
mapping
);
String
path
=
this
.
path
(
DELETE_OPERATE
);
FileSystemOptions
options
=
this
.
getOptions
(
mapping
);
try
(
FileObject
fo
=
manager
.
resolveFile
(
prefix
+
PATHSEPARATOR
+
path
,
options
))
{
if
(
fo
.
exists
()
&&
fo
.
isFile
())
{
fo
.
delete
();
if
((!
StringUtils
.
startsWith
(
path
,
PATHSEPARATOR
))
&&
(
StringUtils
.
contains
(
path
,
PATHSEPARATOR
)))
{
FileObject
parent
=
fo
.
getParent
();
if
((
null
!=
parent
)
&&
parent
.
exists
()
&&
parent
.
isFolder
()
&&
(
parent
.
getChildren
().
length
==
0
))
{
parent
.
delete
();
}
}
}
if
(!
Objects
.
equals
(
StorageProtocol
.
webdav
,
mapping
.
getProtocol
()))
{
// webdav关闭会试图去关闭commons.httpClient
manager
.
closeFileSystem
(
fo
.
getFileSystem
());
}
}
}
private
long
hdfsUpdateContent
(
StorageMapping
mapping
,
byte
[]
bytes
)
throws
Exception
{
try
(
org
.
apache
.
hadoop
.
fs
.
FileSystem
fileSystem
=
org
.
apache
.
hadoop
.
fs
.
FileSystem
.
get
(
hdfsConfiguration
(
mapping
)))
{
org
.
apache
.
hadoop
.
fs
.
Path
path
=
new
org
.
apache
.
hadoop
.
fs
.
Path
(
getPrefix
(
mapping
),
this
.
path
());
if
(
fileSystem
.
exists
(
path
))
{
fileSystem
.
delete
(
path
,
false
);
}
try
(
org
.
apache
.
hadoop
.
fs
.
FSDataOutputStream
out
=
fileSystem
.
create
(
path
))
{
out
.
write
(
bytes
);
this
.
setStorage
(
mapping
.
getName
());
this
.
setLastUpdateTime
(
new
Date
());
this
.
setLength
((
long
)
bytes
.
length
);
}
}
return
bytes
.
length
;
}
private
Long
hdfsReadContent
(
StorageMapping
mapping
,
OutputStream
output
)
throws
Exception
{
long
length
=
-
1L
;
try
(
org
.
apache
.
hadoop
.
fs
.
FileSystem
fileSystem
=
org
.
apache
.
hadoop
.
fs
.
FileSystem
.
get
(
hdfsConfiguration
(
mapping
)))
{
org
.
apache
.
hadoop
.
fs
.
Path
path
=
new
org
.
apache
.
hadoop
.
fs
.
Path
(
getPrefix
(
mapping
),
this
.
path
());
if
(
fileSystem
.
exists
(
path
))
{
try
(
org
.
apache
.
hadoop
.
fs
.
FSDataInputStream
inputStream
=
fileSystem
.
open
(
path
))
{
length
=
IOUtils
.
copyLarge
(
inputStream
,
output
);
}
}
else
{
throw
new
FileNotFoundException
(
path
+
" not existed, object:"
+
this
.
toString
()
+
"."
);
}
}
return
length
;
}
private
boolean
hdfsExistContent
(
StorageMapping
mapping
)
throws
Exception
{
try
(
org
.
apache
.
hadoop
.
fs
.
FileSystem
fileSystem
=
org
.
apache
.
hadoop
.
fs
.
FileSystem
.
get
(
hdfsConfiguration
(
mapping
)))
{
org
.
apache
.
hadoop
.
fs
.
Path
path
=
new
org
.
apache
.
hadoop
.
fs
.
Path
(
getPrefix
(
mapping
),
this
.
path
());
return
fileSystem
.
exists
(
path
);
}
}
private
void
hdfsDeleteContent
(
StorageMapping
mapping
)
throws
Exception
{
try
(
org
.
apache
.
hadoop
.
fs
.
FileSystem
fileSystem
=
org
.
apache
.
hadoop
.
fs
.
FileSystem
.
get
(
hdfsConfiguration
(
mapping
)))
{
org
.
apache
.
hadoop
.
fs
.
Path
path
=
new
org
.
apache
.
hadoop
.
fs
.
Path
(
getPrefix
(
mapping
),
this
.
path
());
if
(
fileSystem
.
exists
(
path
))
{
fileSystem
.
delete
(
path
,
false
);
}
}
}
private
org
.
apache
.
hadoop
.
conf
.
Configuration
hdfsConfiguration
(
StorageMapping
mapping
)
{
if
((!
StringUtils
.
equals
(
System
.
getProperty
(
"HADOOP_USER_NAME"
),
mapping
.
getUsername
()))
&&
StringUtils
.
isNotBlank
(
mapping
.
getUsername
()))
{
System
.
setProperty
(
"HADOOP_USER_NAME"
,
mapping
.
getUsername
());
}
org
.
apache
.
hadoop
.
conf
.
Configuration
configuration
=
new
org
.
apache
.
hadoop
.
conf
.
Configuration
();
configuration
.
set
(
"fs.default.name"
,
StorageProtocol
.
hdfs
+
"://"
+
mapping
.
getHost
()
+
":"
+
mapping
.
getPort
());
configuration
.
set
(
"fs.hdfs.impl"
,
"org.apache.hadoop.hdfs.DistributedFileSystem"
);
return
configuration
;
}
}
o2server/x_base_core_project/src/main/java/com/x/base/core/entity/StorageProtocol.java
浏览文件 @
34df7c5a
package
com.x.base.core.entity
;
public
enum
StorageProtocol
{
ftp
,
ftps
,
webdav
,
cifs
,
file
,
sftp
;
ftp
,
ftps
,
webdav
,
cifs
,
file
,
sftp
,
hdfs
;
public
static
final
int
length
=
JpaObject
.
length_16B
;
}
o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Config.java
浏览文件 @
34df7c5a
...
...
@@ -106,6 +106,14 @@ public class Config {
public
static
final
String
DIR_COMMONS_EXT
=
"commons/ext"
;
public
static
final
String
DIR_COMMONS_LANGUAGE
=
"commons/language"
;
public
static
final
String
DIR_COMMONS_FONTS
=
"commons/fonts"
;
public
static
final
String
DIR_COMMONS_HADOOP
=
DIR_COMMONS
+
"/hadoop"
;
public
static
final
String
DIR_COMMONS_HADOOP_WINDOWS
=
DIR_COMMONS_HADOOP
+
"/"
+
OS_WINDOWS
;
public
static
final
String
DIR_COMMONS_HADOOP_AIX
=
DIR_COMMONS
+
"/ "
+
OS_AIX
;
public
static
final
String
DIR_COMMONS_HADOOP_LINUX
=
DIR_COMMONS
+
"/"
+
OS_LINUX
;
public
static
final
String
DIR_COMMONS_HADOOP_MACOS
=
DIR_COMMONS
+
"/"
+
OS_MACOS
;
public
static
final
String
DIR_COMMONS_HADOOP_RASPI
=
DIR_COMMONS
+
"/"
+
OS_RASPI
;
public
static
final
String
DIR_COMMONS_HADOOP_ARM
=
DIR_COMMONS
+
"/"
+
OS_ARM
;
public
static
final
String
DIR_COMMONS_HADOOP_MIPS
=
DIR_COMMONS
+
"/"
+
OS_MIPS
;
public
static
final
String
DIR_CONFIG
=
"config"
;
public
static
final
String
DIR_CONFIG_COVERTOWEBSERVER
=
"config/coverToWebServer"
;
public
static
final
String
DIR_CONFIGSAMPLE
=
"configSample"
;
...
...
@@ -114,11 +122,11 @@ public class Config {
public
static
final
String
DIR_DYNAMIC
=
"dynamic"
;
public
static
final
String
DIR_DYNAMIC_JARS
=
"dynamic/jars"
;
public
static
final
String
DIR_JVM
=
"jvm"
;
public
static
final
String
DIR_JVM_AIX
=
"jvm/aix"
;
public
static
final
String
DIR_JVM_LINUX
=
"jvm/linux"
;
public
static
final
String
DIR_JVM_MACOS
=
"jvm/macos"
;
public
static
final
String
DIR_JVM_WINDOWS
=
"jvm/windows"
;
public
static
final
String
DIR_JVM_NEOKYLIN_LOONGSON
=
"jvm/neokylin_loongson"
;
//
public static final String DIR_JVM_AIX = "jvm/aix";
//
public static final String DIR_JVM_LINUX = "jvm/linux";
//
public static final String DIR_JVM_MACOS = "jvm/macos";
//
public static final String DIR_JVM_WINDOWS = "jvm/windows";
//
public static final String DIR_JVM_NEOKYLIN_LOONGSON = "jvm/neokylin_loongson";
public
static
final
String
DIR_LOCAL
=
"local"
;
public
static
final
String
DIR_LOCAL_BACKUP
=
"local/backup"
;
public
static
final
String
DIR_LOCAL_UPDATE
=
"local/update"
;
...
...
@@ -1339,4 +1347,116 @@ public class Config {
initialContext
().
rebind
(
RESOURCE_NODE_PROCESSPLATFORMEXECUTORS
,
executorServices
);
}
public
static
boolean
isWindowsJava8
()
throws
Exception
{
return
command_java_path
().
startsWith
(
dir_jvm
().
toPath
().
resolve
(
OS_WINDOWS
));
}
public
static
boolean
isLinuxJava8
()
throws
Exception
{
return
command_java_path
().
startsWith
(
dir_jvm
().
toPath
().
resolve
(
OS_LINUX
));
}
public
static
boolean
isRaspiJava8
()
throws
Exception
{
return
command_java_path
().
startsWith
(
dir_jvm
().
toPath
().
resolve
(
OS_RASPI
));
}
public
static
boolean
isArmJava8
()
throws
Exception
{
return
command_java_path
().
startsWith
(
dir_jvm
().
toPath
().
resolve
(
OS_ARM
));
}
public
static
boolean
isMipsJava8
()
throws
Exception
{
return
command_java_path
().
startsWith
(
dir_jvm
().
toPath
().
resolve
(
OS_MIPS
));
}
public
static
boolean
isAixJava8
()
throws
Exception
{
return
command_java_path
().
startsWith
(
dir_jvm
().
toPath
().
resolve
(
OS_AIX
));
}
public
static
boolean
isMacosJava8
()
throws
Exception
{
return
command_java_path
().
startsWith
(
dir_jvm
().
toPath
().
resolve
(
OS_MACOS
));
}
public
static
boolean
isWindowsJava11
()
throws
Exception
{
return
command_java_path
().
startsWith
(
dir_jvm
().
toPath
().
resolve
(
OS_WINDOWS
+
"_"
+
JAVAVERSION_JAVA11
));
}
public
static
boolean
isLinuxJava11
()
throws
Exception
{
return
command_java_path
().
startsWith
(
dir_jvm
().
toPath
().
resolve
(
OS_LINUX
+
"_"
+
JAVAVERSION_JAVA11
));
}
public
static
boolean
isRaspiJava11
()
throws
Exception
{
return
command_java_path
().
startsWith
(
dir_jvm
().
toPath
().
resolve
(
OS_RASPI
+
"_"
+
JAVAVERSION_JAVA11
));
}
public
static
boolean
isArmJava11
()
throws
Exception
{
return
command_java_path
().
startsWith
(
dir_jvm
().
toPath
().
resolve
(
OS_ARM
+
"_"
+
JAVAVERSION_JAVA11
));
}
public
static
boolean
isMipsJava11
()
throws
Exception
{
return
command_java_path
().
startsWith
(
dir_jvm
().
toPath
().
resolve
(
OS_MIPS
+
"_"
+
JAVAVERSION_JAVA11
));
}
public
static
boolean
isAixJava11
()
throws
Exception
{
return
command_java_path
().
startsWith
(
dir_jvm
().
toPath
().
resolve
(
OS_AIX
+
"_"
+
JAVAVERSION_JAVA11
));
}
public
static
boolean
isMacosJava11
()
throws
Exception
{
return
command_java_path
().
startsWith
(
dir_jvm
().
toPath
().
resolve
(
OS_MACOS
+
"_"
+
JAVAVERSION_JAVA11
));
}
public
static
Path
path_commons_hadoop_windows
(
boolean
force
)
throws
Exception
{
Path
path
=
Paths
.
get
(
base
(),
DIR_COMMONS_HADOOP_WINDOWS
);
if
((!
Files
.
exists
(
path
))
&&
force
)
{
Files
.
createDirectories
(
path
);
}
return
path
;
}
public
static
Path
path_commons_hadoop_linux
(
boolean
force
)
throws
Exception
{
Path
path
=
Paths
.
get
(
base
(),
DIR_COMMONS_HADOOP_LINUX
);
if
((!
Files
.
exists
(
path
))
&&
force
)
{
Files
.
createDirectories
(
path
);
}
return
path
;
}
public
static
Path
path_commons_hadoop_aix
(
boolean
force
)
throws
Exception
{
Path
path
=
Paths
.
get
(
base
(),
DIR_COMMONS_HADOOP_AIX
);
if
((!
Files
.
exists
(
path
))
&&
force
)
{
Files
.
createDirectories
(
path
);
}
return
path
;
}
public
static
Path
path_commons_hadoop_macos
(
boolean
force
)
throws
Exception
{
Path
path
=
Paths
.
get
(
base
(),
DIR_COMMONS_HADOOP_MACOS
);
if
((!
Files
.
exists
(
path
))
&&
force
)
{
Files
.
createDirectories
(
path
);
}
return
path
;
}
public
static
Path
path_commons_hadoop_raspi
(
boolean
force
)
throws
Exception
{
Path
path
=
Paths
.
get
(
base
(),
DIR_COMMONS_HADOOP_RASPI
);
if
((!
Files
.
exists
(
path
))
&&
force
)
{
Files
.
createDirectories
(
path
);
}
return
path
;
}
public
static
Path
path_commons_hadoop_arm
(
boolean
force
)
throws
Exception
{
Path
path
=
Paths
.
get
(
base
(),
DIR_COMMONS_HADOOP_ARM
);
if
((!
Files
.
exists
(
path
))
&&
force
)
{
Files
.
createDirectories
(
path
);
}
return
path
;
}
public
static
Path
path_commons_hadoop_mips
(
boolean
force
)
throws
Exception
{
Path
path
=
Paths
.
get
(
base
(),
DIR_COMMONS_HADOOP_MIPS
);
if
((!
Files
.
exists
(
path
))
&&
force
)
{
Files
.
createDirectories
(
path
);
}
return
path
;
}
}
o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/ExternalStorageSource.java
浏览文件 @
34df7c5a
...
...
@@ -9,20 +9,22 @@ import com.x.base.core.project.tools.Crypto;
public
class
ExternalStorageSource
extends
ConfigObject
{
private
static
final
long
serialVersionUID
=
5926439816241094368L
;
// 无需保存
private
transient
String
_password
;
public
ExternalStorageSource
()
{
this
.
protocol
=
default_protocol
;
this
.
username
=
default_username
;
this
.
password
=
default_password
;
this
.
host
=
default_host
;
this
.
port
=
default_port
;
this
.
prefix
=
default_prefix
;
this
.
enable
=
default_enable
;
this
.
weight
=
default_weight
;
this
.
name
=
default_name
;
this
.
deepPath
=
default_deepPath
;
this
.
protocol
=
DEFAULT_PROTOCOL
;
this
.
username
=
DEFAULT_USERNAME
;
this
.
password
=
DEFAULT_PASSWORD
;
this
.
host
=
DEFAULT_HOST
;
this
.
port
=
DEFAULT_PORT
;
this
.
prefix
=
DEFAULT_PREFIX
;
this
.
enable
=
DEFAULT_ENABLE
;
this
.
weight
=
DEFAULT_WEIGHT
;
this
.
name
=
DEFAULT_NAME
;
this
.
deepPath
=
DEFAULT_DEEPPATH
;
}
public
static
ExternalStorageSource
defaultInstance
()
{
...
...
@@ -30,18 +32,18 @@ public class ExternalStorageSource extends ConfigObject {
}
public
static
final
StorageProtocol
default_protocol
=
StorageProtocol
.
webdav
;
public
static
final
String
default_username
=
"admin"
;
public
static
final
String
default_password
=
"admin"
;
public
static
final
String
default_host
=
"127.0.0.1"
;
public
static
final
Integer
default_port
=
8080
;
public
static
final
String
default_prefix
=
""
;
public
static
final
Integer
default_weight
=
100
;
public
static
final
Boolean
default_enable
=
true
;
public
static
final
String
default_name
=
"251"
;
public
static
final
Boolean
default_deepPath
=
false
;
@FieldDescribe
(
"协议,可选值ftp,webdav"
)
public
static
final
StorageProtocol
DEFAULT_PROTOCOL
=
StorageProtocol
.
webdav
;
public
static
final
String
DEFAULT_USERNAME
=
"admin"
;
public
static
final
String
DEFAULT_PASSWORD
=
"admin"
;
public
static
final
String
DEFAULT_HOST
=
"127.0.0.1"
;
public
static
final
Integer
DEFAULT_PORT
=
8080
;
public
static
final
String
DEFAULT_PREFIX
=
""
;
public
static
final
Integer
DEFAULT_WEIGHT
=
100
;
public
static
final
Boolean
DEFAULT_ENABLE
=
true
;
public
static
final
String
DEFAULT_NAME
=
"251"
;
public
static
final
Boolean
DEFAULT_DEEPPATH
=
false
;
@FieldDescribe
(
"协议,可选值ftp,webdav
...
"
)
private
StorageProtocol
protocol
;
@FieldDescribe
(
"登录用户名."
)
private
String
username
;
...
...
o2server/x_console/src/main/java/com/x/server/console/Hadoop.java
0 → 100644
浏览文件 @
34df7c5a
package
com.x.server.console
;
import
java.util.Iterator
;
import
java.util.Map.Entry
;
import
java.util.Properties
;
import
com.x.base.core.project.config.Config
;
public
class
Hadoop
{
private
Hadoop
()
{
// nothing
}
public
static
void
init
()
throws
Exception
{
setHomeDir
();
}
private
static
void
setHomeDir
()
throws
Exception
{
if
(
Config
.
isWindowsJava11
())
{
System
.
setProperty
(
"hadoop.home.dir"
,
Config
.
path_commons_hadoop_windows
(
false
).
toString
());
// [qtp680227777-226] WARN org.apache.hadoop.util.NativeCodeLoader - Unable to
// load native-hadoop library for your platform... using builtin-java classes
// where applicable
// System.setProperty("log4j.logger.org.apache.hadoop.util.NativeCodeLoader",
// "ERROR");
System
.
setProperty
(
"java.library.path"
,
Config
.
path_commons_hadoop_windows
(
false
).
resolve
(
"bin"
).
toString
()
+
System
.
getProperty
(
"path.separator"
)
+
System
.
getProperty
(
"java.library.path"
));
}
}
}
o2server/x_console/src/main/java/com/x/server/console/Main.java
浏览文件 @
34df7c5a
...
...
@@ -109,6 +109,8 @@ public class Main {
LogTools
.
setSlf4jSimple
();
ResourceFactory
.
bind
();
CommandFactory
.
printStartHelp
();
// 初始化hadoop环境
Hadoop
.
init
();
}
public
static
void
main
(
String
[]
args
)
throws
Exception
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录