提交 1f775aed 编写于 作者: Z zhourui

调整相对path路径

上级 9be9400a
......@@ -161,62 +161,7 @@ public abstract class StorageObject extends SliceJpaObject {
}
private Long vfsUpdateContent(StorageMapping mapping, byte[] bytes) throws Exception {
String prefix = this.getPrefix(mapping);
String path = this.path();
if (StringUtils.isEmpty(path)) {
throw new IllegalStateException("path can not be empty.");
}
FileSystemOptions options = this.getOptions(mapping);
long length = -1L;
FileSystemManager manager = this.getFileSystemManager();
/*
* 需要进行两次判断,在前端使用nginx分发的情况下,可能同时触发多个文件的上传,多个文件同时上传可能会同时创建文件的存储目录,会在后台导致错误
* org.apache.commons.vfs2.FileSystemException: Could not create folder
* "ftp://processPlatform:***@o2.server01.com:20040/20200601/1beb018a-5009-4baa-a9ef-7e903f9d48ef".
* 这种情况下再次发起请求尝试获取文件可以解决这个问题.
*/
for (int i = 0; i < 2; i++) {
try (FileObject fo = manager.resolveFile(prefix + PATHSEPARATOR + path, options);
OutputStream output = fo.getContent().getOutputStream()) {
length = IOUtils.copyLarge(new ByteArrayInputStream(bytes), output);
this.setLength(length);
if ((!Objects.equals(StorageProtocol.webdav, mapping.getProtocol()))
&& (!Objects.equals(StorageProtocol.sftp, mapping.getProtocol()))) {
/* webdav关闭会试图去关闭commons.httpClient */
manager.closeFileSystem(fo.getFileSystem());
}
this.setStorage(mapping.getName());
this.setLastUpdateTime(new Date());
break;
} catch (FileSystemException fse) {
if (i != 0) {
// 第一次错误先跳过,直接执行第二次.如果第二次错误那么报错.
throw fse;
}
}
}
return length;
}
private long hdfsUpdateContent(StorageMapping mapping, byte[] bytes) throws Exception {
try (org.apache.hadoop.fs.FileSystem fileSystem = org.apache.hadoop.fs.FileSystem
.get(hdfsConfiguration(mapping))) {
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(getPrefix(mapping), this.path());
if (fileSystem.exists(path)) {
fileSystem.delete(path, false);
}
try (org.apache.hadoop.fs.FSDataOutputStream out = fileSystem.create(path)) {
out.write(bytes);
this.setStorage(mapping.getName());
this.setLastUpdateTime(new Date());
this.setLength((long) bytes.length);
}
}
return bytes.length;
}
/** 读出内容 */
// 读出内容
public byte[] readContent(StorageMapping mapping) throws Exception {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
readContent(mapping, baos);
......@@ -224,7 +169,7 @@ public abstract class StorageObject extends SliceJpaObject {
}
}
/** 将内容流出到output */
// 将内容流出到output
public Long readContent(StorageMapping mapping, OutputStream output) throws Exception {
if (Objects.equals(mapping.getProtocol(), StorageProtocol.hdfs)) {
return hdfsReadContent(mapping, output);
......@@ -233,48 +178,7 @@ public abstract class StorageObject extends SliceJpaObject {
}
}
// vfs读取数据
private Long vfsReadContent(StorageMapping mapping, OutputStream output) throws Exception {
long length = -1L;
FileSystemManager manager = this.getFileSystemManager();
String prefix = this.getPrefix(mapping);
String path = this.path();
FileSystemOptions options = this.getOptions(mapping);
try (FileObject fo = manager.resolveFile(prefix + PATHSEPARATOR + path, options)) {
if (fo.exists() && fo.isFile()) {
try (InputStream input = fo.getContent().getInputStream()) {
length = IOUtils.copyLarge(input, output);
}
} else {
throw new FileNotFoundException(
fo.getPublicURIString() + " not existed, object:" + this.toString() + ".");
}
if (!Objects.equals(StorageProtocol.webdav, mapping.getProtocol())) {
/* webdav关闭会试图去关闭commons.httpClient */
manager.closeFileSystem(fo.getFileSystem());
}
}
return length;
}
// hadoop读取数据
private Long hdfsReadContent(StorageMapping mapping, OutputStream output) throws Exception {
long length = -1L;
try (org.apache.hadoop.fs.FileSystem fileSystem = org.apache.hadoop.fs.FileSystem
.get(hdfsConfiguration(mapping))) {
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(getPrefix(mapping), this.path());
if (fileSystem.exists(path)) {
try (org.apache.hadoop.fs.FSDataInputStream inputStream = fileSystem.open(path)) {
length = IOUtils.copyLarge(inputStream, output);
}
} else {
throw new FileNotFoundException(path + " not existed, object:" + this.toString() + ".");
}
}
return length;
}
/** 检查是否存在内容 */
// 检查是否存在内容
public boolean existContent(StorageMapping mapping) throws Exception {
if (Objects.equals(mapping.getProtocol(), StorageProtocol.hdfs)) {
return hdfsExistContent(mapping);
......@@ -283,28 +187,6 @@ public abstract class StorageObject extends SliceJpaObject {
}
}
private boolean vfsExistContent(StorageMapping mapping) throws Exception {
FileSystemManager manager = this.getFileSystemManager();
String prefix = this.getPrefix(mapping);
String path = this.path();
FileSystemOptions options = this.getOptions(mapping);
try (FileObject fo = manager.resolveFile(prefix + PATHSEPARATOR + path, options)) {
return (fo.exists() && fo.isFile());
}
}
private boolean hdfsExistContent(StorageMapping mapping) throws Exception {
try (org.apache.hadoop.fs.FileSystem fileSystem = org.apache.hadoop.fs.FileSystem
.get(hdfsConfiguration(mapping))) {
org.apache.hadoop.fs.Path path = fileSystem.getHomeDirectory();
if (StringUtils.isNotEmpty(mapping.getPrefix())) {
path = new org.apache.hadoop.fs.Path(path, mapping.getPrefix());
}
path = new org.apache.hadoop.fs.Path(path, this.path());
return fileSystem.exists(path);
}
}
public void deleteContent(StorageMapping mapping) throws Exception {
if (Objects.equals(mapping.getProtocol(), StorageProtocol.hdfs)) {
hdfsDeleteContent(mapping);
......@@ -313,40 +195,6 @@ public abstract class StorageObject extends SliceJpaObject {
}
}
private void hdfsDeleteContent(StorageMapping mapping) throws Exception {
try (org.apache.hadoop.fs.FileSystem fileSystem = org.apache.hadoop.fs.FileSystem
.get(hdfsConfiguration(mapping))) {
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(getPrefix(mapping), this.path());
if (fileSystem.exists(path)) {
fileSystem.delete(path, false);
}
}
}
// 删除内容,同时判断上一级目录(只判断一级)是否为空,为空则删除上一级目录
private void vfsDeleteContent(StorageMapping mapping) throws Exception {
FileSystemManager manager = this.getFileSystemManager();
String prefix = this.getPrefix(mapping);
String path = this.path(DELETE_OPERATE);
FileSystemOptions options = this.getOptions(mapping);
try (FileObject fo = manager.resolveFile(prefix + PATHSEPARATOR + path, options)) {
if (fo.exists() && fo.isFile()) {
fo.delete();
if ((!StringUtils.startsWith(path, PATHSEPARATOR)) && (StringUtils.contains(path, PATHSEPARATOR))) {
FileObject parent = fo.getParent();
if ((null != parent) && parent.exists() && parent.isFolder()
&& (parent.getChildren().length == 0)) {
parent.delete();
}
}
}
if (!Objects.equals(StorageProtocol.webdav, mapping.getProtocol())) {
// webdav关闭会试图去关闭commons.httpClient
manager.closeFileSystem(fo.getFileSystem());
}
}
}
// 取得完整访问路径的前半部分
private String getPrefix(StorageMapping mapping) throws IllegalStateException, UnsupportedEncodingException {
String prefix = "";
......@@ -400,7 +248,7 @@ public abstract class StorageObject extends SliceJpaObject {
case sftp:
FtpFileSystemConfigBuilder sftpBuilder = FtpFileSystemConfigBuilder.getInstance();
sftpBuilder.setPassiveMode(opts, Config.vfs().getSftp().getPassive());
/** 强制不校验IP */
// 强制不校验IP
sftpBuilder.setRemoteVerification(opts, false);
sftpBuilder.setFileType(opts, FtpFileType.BINARY);
sftpBuilder.setConnectTimeout(opts, 10000);
......@@ -424,9 +272,6 @@ public abstract class StorageObject extends SliceJpaObject {
* java.net.Socket.connect(Socket.java:589)
*/
ftpBuilder.setPassiveMode(opts, Config.vfs().getFtp().getPassive());
/*
* builder.setPassiveMode(opts, false);
*/
// 强制不校验IP
ftpBuilder.setRemoteVerification(opts, false);
// FtpFileType.BINARY is the default
......@@ -466,6 +311,153 @@ public abstract class StorageObject extends SliceJpaObject {
return opts;
}
private Long vfsUpdateContent(StorageMapping mapping, byte[] bytes) throws Exception {
String prefix = this.getPrefix(mapping);
String path = this.path();
if (StringUtils.isEmpty(path)) {
throw new IllegalStateException("path can not be empty.");
}
FileSystemOptions options = this.getOptions(mapping);
long length = -1L;
FileSystemManager manager = this.getFileSystemManager();
/*
* 需要进行两次判断,在前端使用nginx分发的情况下,可能同时触发多个文件的上传,多个文件同时上传可能会同时创建文件的存储目录,会在后台导致错误
* org.apache.commons.vfs2.FileSystemException: Could not create folder
* "ftp://processPlatform:***@o2.server01.com:20040/20200601/1beb018a-5009-4baa-a9ef-7e903f9d48ef".
* 这种情况下再次发起请求尝试获取文件可以解决这个问题.
*/
for (int i = 0; i < 2; i++) {
try (FileObject fo = manager.resolveFile(prefix + PATHSEPARATOR + path, options);
OutputStream output = fo.getContent().getOutputStream()) {
length = IOUtils.copyLarge(new ByteArrayInputStream(bytes), output);
this.setLength(length);
if ((!Objects.equals(StorageProtocol.webdav, mapping.getProtocol()))
&& (!Objects.equals(StorageProtocol.sftp, mapping.getProtocol()))) {
/* webdav关闭会试图去关闭commons.httpClient */
manager.closeFileSystem(fo.getFileSystem());
}
this.setStorage(mapping.getName());
this.setLastUpdateTime(new Date());
break;
} catch (FileSystemException fse) {
if (i != 0) {
// 第一次错误先跳过,直接执行第二次.如果第二次错误那么报错.
throw fse;
}
}
}
return length;
}
// vfs读取数据
private Long vfsReadContent(StorageMapping mapping, OutputStream output) throws Exception {
long length = -1L;
FileSystemManager manager = this.getFileSystemManager();
String prefix = this.getPrefix(mapping);
String path = this.path();
FileSystemOptions options = this.getOptions(mapping);
try (FileObject fo = manager.resolveFile(prefix + PATHSEPARATOR + path, options)) {
if (fo.exists() && fo.isFile()) {
try (InputStream input = fo.getContent().getInputStream()) {
length = IOUtils.copyLarge(input, output);
}
} else {
throw new FileNotFoundException(
fo.getPublicURIString() + " not existed, object:" + this.toString() + ".");
}
if (!Objects.equals(StorageProtocol.webdav, mapping.getProtocol())) {
/* webdav关闭会试图去关闭commons.httpClient */
manager.closeFileSystem(fo.getFileSystem());
}
}
return length;
}
private boolean vfsExistContent(StorageMapping mapping) throws Exception {
FileSystemManager manager = this.getFileSystemManager();
String prefix = this.getPrefix(mapping);
String path = this.path();
FileSystemOptions options = this.getOptions(mapping);
try (FileObject fo = manager.resolveFile(prefix + PATHSEPARATOR + path, options)) {
return (fo.exists() && fo.isFile());
}
}
// 删除内容,同时判断上一级目录(只判断一级)是否为空,为空则删除上一级目录
private void vfsDeleteContent(StorageMapping mapping) throws Exception {
FileSystemManager manager = this.getFileSystemManager();
String prefix = this.getPrefix(mapping);
String path = this.path(DELETE_OPERATE);
FileSystemOptions options = this.getOptions(mapping);
try (FileObject fo = manager.resolveFile(prefix + PATHSEPARATOR + path, options)) {
if (fo.exists() && fo.isFile()) {
fo.delete();
if ((!StringUtils.startsWith(path, PATHSEPARATOR)) && (StringUtils.contains(path, PATHSEPARATOR))) {
FileObject parent = fo.getParent();
if ((null != parent) && parent.exists() && parent.isFolder()
&& (parent.getChildren().length == 0)) {
parent.delete();
}
}
}
if (!Objects.equals(StorageProtocol.webdav, mapping.getProtocol())) {
// webdav关闭会试图去关闭commons.httpClient
manager.closeFileSystem(fo.getFileSystem());
}
}
}
private long hdfsUpdateContent(StorageMapping mapping, byte[] bytes) throws Exception {
try (org.apache.hadoop.fs.FileSystem fileSystem = org.apache.hadoop.fs.FileSystem
.get(hdfsConfiguration(mapping))) {
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(getPrefix(mapping), this.path());
if (fileSystem.exists(path)) {
fileSystem.delete(path, false);
}
try (org.apache.hadoop.fs.FSDataOutputStream out = fileSystem.create(path)) {
out.write(bytes);
this.setStorage(mapping.getName());
this.setLastUpdateTime(new Date());
this.setLength((long) bytes.length);
}
}
return bytes.length;
}
private Long hdfsReadContent(StorageMapping mapping, OutputStream output) throws Exception {
long length = -1L;
try (org.apache.hadoop.fs.FileSystem fileSystem = org.apache.hadoop.fs.FileSystem
.get(hdfsConfiguration(mapping))) {
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(getPrefix(mapping), this.path());
if (fileSystem.exists(path)) {
try (org.apache.hadoop.fs.FSDataInputStream inputStream = fileSystem.open(path)) {
length = IOUtils.copyLarge(inputStream, output);
}
} else {
throw new FileNotFoundException(path + " not existed, object:" + this.toString() + ".");
}
}
return length;
}
private boolean hdfsExistContent(StorageMapping mapping) throws Exception {
try (org.apache.hadoop.fs.FileSystem fileSystem = org.apache.hadoop.fs.FileSystem
.get(hdfsConfiguration(mapping))) {
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(getPrefix(mapping), this.path());
return fileSystem.exists(path);
}
}
private void hdfsDeleteContent(StorageMapping mapping) throws Exception {
try (org.apache.hadoop.fs.FileSystem fileSystem = org.apache.hadoop.fs.FileSystem
.get(hdfsConfiguration(mapping))) {
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(getPrefix(mapping), this.path());
if (fileSystem.exists(path)) {
fileSystem.delete(path, false);
}
}
}
private org.apache.hadoop.conf.Configuration hdfsConfiguration(StorageMapping mapping) {
if ((!StringUtils.equals(System.getProperty("HADOOP_USER_NAME"), mapping.getUsername()))
&& StringUtils.isNotBlank(mapping.getUsername())) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册