From c56be665cf27c11cda65d707104507cbe5ccf1c1 Mon Sep 17 00:00:00 2001 From: vasia Date: Thu, 11 Dec 2014 00:49:59 +0100 Subject: [PATCH] [FLINK-1307] Allow file input from nested directory structure This closes #260 --- docs/programming_guide.md | 43 ++- .../flink/api/common/io/FileInputFormat.java | 95 ++++-- .../common/io/EnumerateNestedFilesTest.java | 275 ++++++++++++++++++ .../api/common/io/FileInputFormatTest.java | 51 ++++ .../apache/flink/testutils/TestFileUtils.java | 22 +- 5 files changed, 460 insertions(+), 26 deletions(-) create mode 100644 flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java diff --git a/docs/programming_guide.md b/docs/programming_guide.md index 6c32250bbf6..5a8614aaf34 100644 --- a/docs/programming_guide.md +++ b/docs/programming_guide.md @@ -1584,7 +1584,7 @@ Data Sources
Data sources create the initial data sets, such as from files or from Java collections. The general -mechanism of of creating data sets is abstracted behind an +mechanism of creating data sets is abstracted behind an {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java "InputFormat"%}. Flink comes with several built-in formats to create data sets from common file formats. Many of them have @@ -1662,11 +1662,30 @@ DataSet dbData = // manually provide the type information as shown in the examples above. {% endhighlight %} +#### Recursive Traversal of the Input Path Directory + +For file-based inputs, when the input path is a directory, nested files are not enumerated by default. Instead, only the files inside the base directory are read, while nested files are ignored. Recursive enumeration of nested files can be enabled through the `recursive.file.enumeration` configuration parameter, like in the following example. + +{% highlight java %} +// enable recursive enumeration of nested input files +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +// create a configuration object +Configuration parameters = new Configuration(); + +// set the recursive enumeration parameter +parameters.setBoolean("recursive.file.enumeration", true); + +// pass the configuration to the data source +DataSet logs = env.readTextFile("file:///path/with.nested/files") + .withParameters(parameters); +{% endhighlight %} +
Data sources create the initial data sets, such as from files or from Java collections. The general -mechanism of of creating data sets is abstracted behind an +mechanism of creating data sets is abstracted behind an {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java "InputFormat"%}. Flink comes with several built-in formats to create data sets from common file formats. Many of them have @@ -1730,9 +1749,27 @@ val values = env.fromElements("Foo", "bar", "foobar", "fubar") // generate a number sequence val numbers = env.generateSequence(1, 10000000); {% endhighlight %} + +#### Recursive Traversal of the Input Path Directory + +For file-based inputs, when the input path is a directory, nested files are not enumerated by default. Instead, only the files inside the base directory are read, while nested files are ignored. Recursive enumeration of nested files can be enabled through the `recursive.file.enumeration` configuration parameter, like in the following example. + +{% highlight scala %} +// enable recursive enumeration of nested input files +val env = ExecutionEnvironment.getExecutionEnvironment + +// create a configuration object +val parameters = new Configuration + +// set the recursive enumeration parameter +parameters.setBoolean("recursive.file.enumeration", true) + +// pass the configuration to the data source +env.readTextFile("file:///path/with.nested/files").withParameters(parameters) +{% endhighlight %} +
- [Back to top](#top) Data Sinks diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index 30426f6b046..2f3f59f25cc 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -66,7 +66,7 @@ public abstract class FileInputFormat implements InputFormat implements InputFormat implements InputFormat implements InputFormat implements InputFormat implements InputFormat implements InputFormat implements InputFormat implements InputFormat files, long length) + throws IOException { + final FileSystem fs = path.getFileSystem(); + + for(FileStatus dir: fs.listStatus(path)) { + if (dir.isDir()) { + addNestedFiles(dir.getPath(), files, length); + } + else { + if(acceptFile(dir)) { + files.add(dir); + length += dir.getLen(); + testForUnsplittable(dir); + } + } + } + return length; + } + private boolean testForUnsplittable(FileStatus pathFile) { if(pathFile.getPath().getName().endsWith(DEFLATE_SUFFIX)) { unsplittable = true; @@ -807,6 +853,11 @@ public abstract class FileInputFormat implements InputFormat { + private static final long serialVersionUID = 1L; + + @Override + public boolean reachedEnd() throws IOException { + return true; + } + + @Override + public IntValue nextRecord(IntValue reuse) throws IOException { + return null; + } + } +} \ No newline at end of file diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java index d6c21f40eca..58154e50cd5 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java @@ -18,8 +18,10 @@ package org.apache.flink.api.common.io; +import java.io.BufferedOutputStream; import java.io.BufferedWriter; import java.io.File; +import java.io.FileOutputStream; import java.io.FileWriter; import java.io.IOException; import java.net.URI; @@ -288,6 +290,55 @@ public class FileInputFormatTest { Assert.fail(e.getMessage()); } } + + @Test + public void testGetStatsIgnoredUnderscoreFiles() { + try { + final long SIZE = 2048; + final long TOTAL = 2*SIZE; + + // create two accepted and two ignored files + File tempDir = new File(System.getProperty("java.io.tmpdir")); + File f = null; + do { + f = new File(tempDir, TestFileUtils.randomFileName("")); + } while (f.exists()); + f.mkdirs(); + f.deleteOnExit(); + + File child1 = new File(f, "dataFile1.txt"); + File child2 = new File(f, "another_file.bin"); + File luigiFile = new File(f, "_luigi"); + File success = new File(f, "_SUCCESS"); + + File[] files = { child1, child2, luigiFile, success }; + + for (File child : files) { + child.deleteOnExit(); + + BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(child)); + try { + for (long bytes = SIZE; bytes > 0; bytes--) { + out.write(0); + } + } finally { + out.close(); + } + } + final DummyFileInputFormat format = new DummyFileInputFormat(); + format.setFilePath(f.toURI().toString()); + format.configure(new Configuration()); + + // check that only valid files are used for statistics computation + BaseStatistics stats = format.getStatistics(null); + Assert.assertEquals(TOTAL, stats.getTotalInputSize()); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } // ------------------------------------------------------------------------ diff --git a/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java b/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java index 83835ca46cf..683bc4d30e1 100644 --- a/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java +++ b/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java @@ -63,7 +63,27 @@ public class TestFileUtils { } return f.toURI().toString(); } - + + public static String createTempFileInDirectory(String dir, long bytes) throws IOException { + File f; + do { + f = new File(dir + "/" + randomFileName()); + } while (f.exists()); + f.getParentFile().mkdirs(); + f.createNewFile(); + f.deleteOnExit(); + + BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(f)); + try { + for (; bytes > 0; bytes--) { + out.write(0); + } + } finally { + out.close(); + } + return f.toURI().toString(); + } + public static String createTempFile(String contents) throws IOException { File f = File.createTempFile(FILE_PREFIX, FILE_SUFFIX); f.deleteOnExit(); -- GitLab