From e47365bc0eed07a57682ea75f71991e35981cc82 Mon Sep 17 00:00:00 2001 From: fel Date: Fri, 14 Nov 2014 12:48:24 +0100 Subject: [PATCH] [FLINK-1208] Enable CsvInputFormats to ignore invalid lines and lines starting with comments This closes #201 --- .../flink/api/java/io/CsvInputFormat.java | 106 +++++++++++++- .../apache/flink/api/java/io/CsvReader.java | 37 +++++ .../flink/api/java/io/CSVReaderTest.java | 19 +++ .../flink/api/java/io/CsvInputFormatTest.java | 137 ++++++++++++++++++ .../scala/operators/ScalaCsvInputFormat.java | 101 ++++++++++++- .../api/scala/ExecutionEnvironment.scala | 3 + .../api/scala/io/CsvInputFormatTest.scala | 86 ++++++++++- 7 files changed, 485 insertions(+), 4 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java index a8515178f18..fe2ae14c236 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java @@ -20,6 +20,9 @@ package org.apache.flink.api.java.io; import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.IllegalCharsetNameException; +import java.nio.charset.UnsupportedCharsetException; import java.util.Map; import java.util.TreeMap; @@ -29,7 +32,10 @@ import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.types.parser.FieldParser; import org.apache.flink.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.base.Charsets; import com.google.common.base.Preconditions; @@ -37,6 +43,11 @@ public class CsvInputFormat extends GenericCsvInputFormat extends GenericCsvInputFormat extends GenericCsvInputFormat ... fieldTypes) { if (fieldTypes == null || fieldTypes.length == 0) { throw new IllegalArgumentException("Field types must not be null or empty."); @@ -117,10 +176,39 @@ public class CsvInputFormat extends GenericCsvInputFormat 0) { + if (LOG.isWarnEnabled()) { + LOG.warn("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped."); + } + } + + if (this.commentCount > 0) { + if (LOG.isInfoEnabled()) { + LOG.info("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped."); + } + } + super.close(); + } + + @Override + public OUT nextRecord(OUT record) throws IOException { + OUT returnRecord = null; + do { + returnRecord = super.nextRecord(record); + } while (returnRecord == null && !reachedEnd()); + + return returnRecord; } @Override - public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) { + public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException { /* * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n */ @@ -130,6 +218,21 @@ public class CsvInputFormat extends GenericCsvInputFormat extends GenericCsvInputFormat format, Class... types) { format.setDelimiter(this.lineDelimiter); format.setFieldDelimiter(this.fieldDelimiter); + format.setCommentPrefix(this.commentPrefix); format.setSkipFirstLineAsHeader(skipFirstLineAsHeader); + format.setLenient(ignoreInvalidLines); if (this.includedMask == null) { format.setFieldTypes(types); } else { diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java index 1217b3dde7e..6676cd1c3a8 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java @@ -18,6 +18,9 @@ package org.apache.flink.api.java.io; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + import java.util.Arrays; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -45,6 +48,22 @@ public class CSVReaderTest { Assert.assertTrue(reader.skipFirstLineAsHeader); } + @Test + public void testIgnoreInvalidLinesConfigure() { + CsvReader reader = getCsvReader(); + Assert.assertFalse(reader.ignoreInvalidLines); + reader.ignoreInvalidLines(); + Assert.assertTrue(reader.ignoreInvalidLines); + } + + @Test + public void testIgnoreComments() { + CsvReader reader = getCsvReader(); + assertNull(reader.commentPrefix); + reader.ignoreComments("#"); + assertEquals("#", reader.commentPrefix); + } + @Test public void testIncludeFieldsDense() { CsvReader reader = getCsvReader(); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java index 5f10a2b1a97..c335db1e270 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java @@ -53,6 +53,143 @@ public class CsvInputFormatTest { private static final String FIRST_PART = "That is the first part"; private static final String SECOND_PART = "That is the second part"; + + @Test + public void ignoreInvalidLines() { + try { + + + final String fileContent = "#description of the data\n" + + "header1|header2|header3|\n"+ + "this is|1|2.0|\n"+ + "//a comment\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; + + final FileInputSplit split = createTempFile(fileContent); + + CsvInputFormat> format = + new CsvInputFormat>(PATH, "\n", '|', String.class, Integer.class, Double.class); + format.setLenient(true); + + final Configuration parameters = new Configuration(); + format.configure(parameters); + format.open(split); + + + Tuple3 result = new Tuple3(); + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("this is", result.f0); + assertEquals(new Integer(1), result.f1); + assertEquals(new Double(2.0), result.f2); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("a test", result.f0); + assertEquals(new Integer(3), result.f1); + assertEquals(new Double(4.0), result.f2); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("#next", result.f0); + assertEquals(new Integer(5), result.f1); + assertEquals(new Double(6.0), result.f2); + + result = format.nextRecord(result); + assertNull(result); + } + catch (Exception ex) { + ex.printStackTrace(); + fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); + } + } + + @Test + public void ignoreSingleCharPrefixComments() { + try { + final String fileContent = "#description of the data\n" + + "#successive commented line\n" + + "this is|1|2.0|\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; + + final FileInputSplit split = createTempFile(fileContent); + + CsvInputFormat> format = + new CsvInputFormat>(PATH, "\n", '|', String.class, Integer.class, Double.class); + format.setCommentPrefix("#"); + + final Configuration parameters = new Configuration(); + format.configure(parameters); + format.open(split); + + Tuple3 result = new Tuple3(); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("this is", result.f0); + assertEquals(new Integer(1), result.f1); + assertEquals(new Double(2.0), result.f2); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("a test", result.f0); + assertEquals(new Integer(3), result.f1); + assertEquals(new Double(4.0), result.f2); + + result = format.nextRecord(result); + assertNull(result); + } + catch (Exception ex) { + ex.printStackTrace(); + fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); + } + } + + @Test + public void ignoreMultiCharPrefixComments() { + try { + + + final String fileContent = "//description of the data\n" + + "//successive commented line\n" + + "this is|1|2.0|\n"+ + "a test|3|4.0|\n" + + "//next|5|6.0|\n"; + + final FileInputSplit split = createTempFile(fileContent); + + CsvInputFormat> format = + new CsvInputFormat>(PATH, "\n", '|', String.class, Integer.class, Double.class); + format.setCommentPrefix("//"); + + final Configuration parameters = new Configuration(); + format.configure(parameters); + format.open(split); + + Tuple3 result = new Tuple3(); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("this is", result.f0); + assertEquals(new Integer(1), result.f1); + assertEquals(new Double(2.0), result.f2); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("a test", result.f0); + assertEquals(new Integer(3), result.f1); + assertEquals(new Double(4.0), result.f2); + + result = format.nextRecord(result); + assertNull(result); + } + catch (Exception ex) { + ex.printStackTrace(); + fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); + } + } @Test public void readStringFields() { diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java index abe46a0868f..97cbd5ca981 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java @@ -19,6 +19,7 @@ package org.apache.flink.api.scala.operators; +import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import org.apache.flink.api.common.io.GenericCsvInputFormat; @@ -30,7 +31,13 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.types.parser.FieldParser; import org.apache.flink.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.IllegalCharsetNameException; +import java.nio.charset.UnsupportedCharsetException; import java.util.Map; import java.util.TreeMap; @@ -39,6 +46,8 @@ import scala.Product; public class ScalaCsvInputFormat extends GenericCsvInputFormat { private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ScalaCsvInputFormat.class); private transient Object[] parsedValues; @@ -47,7 +56,12 @@ public class ScalaCsvInputFormat extends GenericCsvInputFor private boolean lineDelimiterIsLinebreak = false; private final TupleSerializerBase serializer; - + + private byte[] commentPrefix = null; + + private transient int commentCount; + private transient int invalidLineCount; + public ScalaCsvInputFormat(Path filePath, TypeInformation typeInfo) { super(filePath); @@ -80,6 +94,72 @@ public class ScalaCsvInputFormat extends GenericCsvInputFor setFieldsGeneric(sourceFieldIndices, fieldTypes); } + + public byte[] getCommentPrefix() { + return commentPrefix; + } + + public void setCommentPrefix(byte[] commentPrefix) { + this.commentPrefix = commentPrefix; + } + + public void setCommentPrefix(char commentPrefix) { + setCommentPrefix(String.valueOf(commentPrefix)); + } + + public void setCommentPrefix(String commentPrefix) { + setCommentPrefix(commentPrefix, Charsets.UTF_8); + } + + public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException { + if (charsetName == null) { + throw new IllegalArgumentException("Charset name must not be null"); + } + + if (commentPrefix != null) { + Charset charset = Charset.forName(charsetName); + setCommentPrefix(commentPrefix, charset); + } else { + this.commentPrefix = null; + } + } + + public void setCommentPrefix(String commentPrefix, Charset charset) { + if (charset == null) { + throw new IllegalArgumentException("Charset must not be null"); + } + if (commentPrefix != null) { + this.commentPrefix = commentPrefix.getBytes(charset); + } else { + this.commentPrefix = null; + } + } + + @Override + public void close() throws IOException { + if (this.invalidLineCount > 0) { + if (LOG.isWarnEnabled()) { + LOG.warn("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped."); + } + } + + if (this.commentCount > 0) { + if (LOG.isInfoEnabled()) { + LOG.info("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped."); + } + } + super.close(); + } + + @Override + public OUT nextRecord(OUT record) throws IOException { + OUT returnRecord = null; + do { + returnRecord = super.nextRecord(record); + } while (returnRecord == null && !reachedEnd()); + + return returnRecord; + } @Override public void open(FileInputSplit split) throws IOException { @@ -98,6 +178,9 @@ public class ScalaCsvInputFormat extends GenericCsvInputFor for (int i = 0; i < fieldParsers.length; i++) { this.parsedValues[i] = fieldParsers[i].createValue(); } + + this.commentCount = 0; + this.invalidLineCount = 0; // left to right evaluation makes access [0] okay // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default @@ -116,11 +199,27 @@ public class ScalaCsvInputFormat extends GenericCsvInputFor //reduce the number of bytes so that the Carriage return is not taken as data numBytes--; } + + if (commentPrefix != null && commentPrefix.length <= numBytes) { + //check record for comments + boolean isComment = true; + for (int i = 0; i < commentPrefix.length; i++) { + if (commentPrefix[i] != bytes[offset + i]) { + isComment = false; + break; + } + } + if (isComment) { + this.commentCount++; + return null; + } + } if (parseRecord(parsedValues, bytes, offset, numBytes)) { OUT result = serializer.createInstance(parsedValues); return result; } else { + this.invalidLineCount++; return null; } } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index 4792e588f88..e756e78fc5b 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -155,6 +155,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { * @param lineDelimiter The string that separates lines, defaults to newline. * @param fieldDelimiter The char that separates individual fields, defaults to ','. * @param ignoreFirstLine Whether the first line in the file should be ignored. + * @param ignoreComments Lines that start with the given String are ignored, disabled by default. * @param lenient Whether the parser should silently ignore malformed lines. * @param includedFields The fields in the file that should be read. Per default all fields * are read. @@ -164,6 +165,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { lineDelimiter: String = "\n", fieldDelimiter: Char = ',', ignoreFirstLine: Boolean = false, + ignoreComments: String = null, lenient: Boolean = false, includedFields: Array[Int] = null): DataSet[T] = { @@ -174,6 +176,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { inputFormat.setFieldDelimiter(fieldDelimiter) inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine) inputFormat.setLenient(lenient) + inputFormat.setCommentPrefix(ignoreComments) val classes: Array[Class[_]] = new Array[Class[_]](typeInfo.getArity) for (i <- 0 until typeInfo.getArity) { diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala index 9c907885c6a..54314f7a6cf 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala.io import org.apache.flink.api.scala.operators.ScalaCsvInputFormat +import org.junit.Assert._ import org.junit.Assert.assertEquals import org.junit.Assert.assertNotNull import org.junit.Assert.assertNull @@ -28,8 +29,7 @@ import java.io.FileOutputStream import java.io.FileWriter import java.io.OutputStreamWriter import org.apache.flink.configuration.Configuration -import org.apache.flink.core.fs.FileInputSplit -import org.apache.flink.core.fs.Path +import org.apache.flink.core.fs.{FileInputSplit, Path} import org.junit.Test import org.apache.flink.api.scala._ @@ -39,6 +39,88 @@ class CsvInputFormatTest { private final val FIRST_PART: String = "That is the first part" private final val SECOND_PART: String = "That is the second part" + + + @Test + def ignoreSingleCharPrefixComments():Unit = { + try { + val fileContent = "#description of the data\n" + + "#successive commented line\n" + + "this is|1|2.0|\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n" + val split = createTempFile(fileContent) + val format = new ScalaCsvInputFormat[(String, Integer, Double)]( + PATH, createTypeInformation[(String, Integer, Double)]) + format.setDelimiter("\n") + format.setFieldDelimiter('|') + format.setCommentPrefix("#") + val parameters = new Configuration + format.configure(parameters) + format.open(split) + var result: (String, Integer, Double) = null + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("this is", result._1) + assertEquals(new Integer(1), result._2) + assertEquals(2.0, result._3, 0.0001) + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("a test", result._1) + assertEquals(new Integer(3), result._2) + assertEquals(4.0, result._3, 0.0001) + result = format.nextRecord(result) + assertNull(result) + assertTrue(format.reachedEnd) + } + catch { + case ex: Exception => { + ex.printStackTrace + fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage) + } + } + } + + @Test + def ignoreMultiCharPrefixComments():Unit = { + try { + val fileContent = "//description of the data\n" + + "//successive commented line\n" + + "this is|1|2.0|\n" + + "a test|3|4.0|\n" + + "//next|5|6.0|\n" + val split = createTempFile(fileContent) + val format = new ScalaCsvInputFormat[(String, Integer, Double)]( + PATH, createTypeInformation[(String, Integer, Double)]) + format.setDelimiter("\n") + format.setFieldDelimiter('|') + format.setCommentPrefix("//") + val parameters = new Configuration + format.configure(parameters) + format.open(split) + var result: (String, Integer, Double) = null + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("this is", result._1) + assertEquals(new Integer(1), result._2) + assertEquals(2.0, result._3, 0.0001) + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("a test", result._1) + assertEquals(new Integer(3), result._2) + assertEquals(4.0, result._3, 0.0001) + result = format.nextRecord(result) + assertNull(result) + assertTrue(format.reachedEnd) + } + catch { + case ex: Exception => { + ex.printStackTrace + fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage) + } + } + } + @Test def readStringFields():Unit = { try { -- GitLab