提交 a4f9a549 编写于 作者: F Flavio Pompermaier 提交者: Stephan Ewen

[FLINK-6103] [core] Improve rename() on LocalFileSystem

This closes #3598
上级 a0249d99
......@@ -20,7 +20,7 @@
/**
* This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.
* additional information regarding copyright ownership.
*/
package org.apache.flink.core.fs.local;
......@@ -43,7 +43,10 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import static org.apache.flink.util.Preconditions.checkNotNull;
......@@ -190,7 +193,7 @@ public class LocalFileSystem extends FileSystem {
/**
* Deletes the given file or directory.
*
*
* @param f
* the file to be deleted
* @return <code>true</code> if all files were deleted successfully, <code>false</code> otherwise
......@@ -217,12 +220,13 @@ public class LocalFileSystem extends FileSystem {
/**
* Recursively creates the directory specified by the provided path.
*
*
* @return <code>true</code>if the directories either already existed or have been created successfully,
* <code>false</code> otherwise
* @throws IOException
* thrown if an error occurred while creating the directory/directories
*/
@Override
public boolean mkdirs(final Path f) throws IOException {
final File p2f = pathToFile(f);
......@@ -262,8 +266,18 @@ public class LocalFileSystem extends FileSystem {
public boolean rename(final Path src, final Path dst) throws IOException {
final File srcFile = pathToFile(src);
final File dstFile = pathToFile(dst);
return srcFile.renameTo(dstFile);
//Files.move fails if the destination directory doesn't exist
if(dstFile.getParentFile()!= null && !dstFile.getParentFile().exists()){
dstFile.getParentFile().mkdirs();
}
try {
Files.move(srcFile.toPath(), dstFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
return true;
}
catch (FileAlreadyExistsException | DirectoryNotEmptyException | SecurityException ex) {
//Catch the errors that are regular "move failed" exceptions and return false
return false;
}
}
@Override
......
......@@ -36,6 +36,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.util.FileUtils;
......@@ -109,7 +110,7 @@ public class LocalFileSystemTest {
*/
// create files.. one ""natively"", one using lfs
final FSDataOutputStream lfsoutput1 = lfs.create(pathtotestfile1, false);
final FSDataOutputStream lfsoutput1 = lfs.create(pathtotestfile1, WriteMode.NO_OVERWRITE);
assertTrue(testfile2.createNewFile());
// does lfs create files? does lfs recognize created files?
......@@ -127,7 +128,7 @@ public class LocalFileSystemTest {
try (FileInputStream fisfile1 = new FileInputStream(testfile1)) {
assertEquals(testbytestest.length, fisfile1.read(testbytestest));
}
assertArrayEquals(testbytes, testbytestest);
// does lfs see the correct file length?
......@@ -212,4 +213,51 @@ public class LocalFileSystemTest {
assertTrue(FileUtils.deletePathIfEmpty(fs, directoryPath));
assertFalse(fs.exists(directoryPath));
}
@Test
public void testRenamePath() throws IOException {
File rootDirectory = temporaryFolder.newFolder();
//create a file /root/src/B/test.csv
File srcDirectory = new File(rootDirectory, "src");
srcDirectory = new File(srcDirectory, "B");
assertTrue(srcDirectory.mkdirs());
File srcFile = new File(srcDirectory, "test.csv");
assertTrue(srcFile.createNewFile());
//Move/rename B and its content to /root/dst/A
File destDirectory = new File(rootDirectory, "dst");
destDirectory = new File(destDirectory, "B");
File destFile = new File(destDirectory, "test.csv");
Path srcDirPath = new Path(srcDirectory.toURI());
Path srcFilePath = new Path(srcFile.toURI());
Path destDirPath = new Path(destDirectory.toURI());
Path destFilePath = new Path(destFile.toURI());
FileSystem fs = FileSystem.getLocalFileSystem();
// pre-conditions: /root/src/B exists but /root/dst/B does not
assertTrue(fs.exists(srcDirPath));
assertFalse(fs.exists(destDirPath));
// do the move/rename: /root/src/B -> /root/dst/
assertTrue(fs.rename(srcDirPath, destDirPath));
// post-conditions: /root/src/B doens't exists, /root/dst/B/test.csv has been created
assertTrue(fs.exists(destFilePath));
assertFalse(fs.exists(srcDirPath));
//re-create initial situation and test overwrite
assertTrue(fs.delete(destDirPath, true));
assertTrue(srcDirectory.mkdirs());
assertTrue(srcFile.createNewFile());
//now use the file as dest: /root/src/B/test.csv -> /root/dst/B/test.csv
assertTrue(fs.rename(srcFilePath, destFilePath));
// post-conditions: now only the src file has been moved
assertFalse(fs.exists(srcFilePath));
assertTrue(fs.exists(srcDirPath));
assertTrue(fs.exists(destFilePath));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册