提交 e47365bc 编写于 作者: F fel 提交者: Fabian Hueske

[FLINK-1208] Enable CsvInputFormats to ignore invalid lines and lines starting with comments

This closes #201
上级 e0a4ee07
......@@ -20,6 +20,9 @@ package org.apache.flink.api.java.io;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;
import java.util.Map;
import java.util.TreeMap;
......@@ -29,7 +32,10 @@ import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
......@@ -37,6 +43,11 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
private static final long serialVersionUID = 1L;
/**
* The log.
*/
private static final Logger LOG = LoggerFactory.getLogger(CsvInputFormat.class);
public static final String DEFAULT_LINE_DELIMITER = "\n";
public static final char DEFAULT_FIELD_DELIMITER = ',';
......@@ -44,10 +55,16 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
private transient Object[] parsedValues;
private byte[] commentPrefix = null;
// To speed up readRecord processing. Used to find windows line endings.
// It is set when open so that readRecord does not have to evaluate it
private boolean lineDelimiterIsLinebreak = false;
private transient int commentCount;
private transient int invalidLineCount;
public CsvInputFormat(Path filePath) {
super(filePath);
......@@ -66,6 +83,48 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
setFieldTypes(types);
}
public byte[] getCommentPrefix() {
return commentPrefix;
}
public void setCommentPrefix(byte[] commentPrefix) {
this.commentPrefix = commentPrefix;
}
public void setCommentPrefix(char commentPrefix) {
setCommentPrefix(String.valueOf(commentPrefix));
}
public void setCommentPrefix(String commentPrefix) {
setCommentPrefix(commentPrefix, Charsets.UTF_8);
}
public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
if (charsetName == null) {
throw new IllegalArgumentException("Charset name must not be null");
}
if (commentPrefix != null) {
Charset charset = Charset.forName(charsetName);
setCommentPrefix(commentPrefix, charset);
} else {
this.commentPrefix = null;
}
}
public void setCommentPrefix(String commentPrefix, Charset charset) {
if (charset == null) {
throw new IllegalArgumentException("Charset must not be null");
}
if (commentPrefix != null) {
this.commentPrefix = commentPrefix.getBytes(charset);
} else {
this.commentPrefix = null;
}
}
public void setFieldTypes(Class<?> ... fieldTypes) {
if (fieldTypes == null || fieldTypes.length == 0) {
throw new IllegalArgumentException("Field types must not be null or empty.");
......@@ -117,10 +176,39 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
this.lineDelimiterIsLinebreak = true;
}
this.commentCount = 0;
this.invalidLineCount = 0;
}
@Override
public void close() throws IOException {
if (this.invalidLineCount > 0) {
if (LOG.isWarnEnabled()) {
LOG.warn("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped.");
}
}
if (this.commentCount > 0) {
if (LOG.isInfoEnabled()) {
LOG.info("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped.");
}
}
super.close();
}
@Override
public OUT nextRecord(OUT record) throws IOException {
OUT returnRecord = null;
do {
returnRecord = super.nextRecord(record);
} while (returnRecord == null && !reachedEnd());
return returnRecord;
}
@Override
public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) {
public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
/*
* Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
*/
......@@ -130,6 +218,21 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
numBytes--;
}
if (commentPrefix != null && commentPrefix.length <= numBytes) {
//check record for comments
boolean isComment = true;
for (int i = 0; i < commentPrefix.length; i++) {
if (commentPrefix[i] != bytes[offset + i]) {
isComment = false;
break;
}
}
if (isComment) {
this.commentCount++;
return null;
}
}
if (parseRecord(parsedValues, bytes, offset, numBytes)) {
// valid parse, map values into pact record
for (int i = 0; i < parsedValues.length; i++) {
......@@ -137,6 +240,7 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
}
return reuse;
} else {
this.invalidLineCount++;
return null;
}
}
......
......@@ -50,9 +50,13 @@ public class CsvReader {
protected String lineDelimiter = CsvInputFormat.DEFAULT_LINE_DELIMITER;
protected char fieldDelimiter = CsvInputFormat.DEFAULT_FIELD_DELIMITER;
protected String commentPrefix = null; //default: no comments
protected boolean skipFirstLineAsHeader = false;
protected boolean ignoreInvalidLines = false;
// --------------------------------------------------------------------------------------------
public CsvReader(Path filePath, ExecutionEnvironment executionContext) {
......@@ -101,6 +105,23 @@ public class CsvReader {
return this;
}
/**
* Configures the string that starts comments.
* By default comments will be treated as invalid lines.
* This function only recognizes comments which start at the beginning of the line!
*
* @param commentPrefix The string that starts the comments.
* @return The CSV reader instance itself, to allow for fluent function chaining.
*/
public CsvReader ignoreComments(String commentPrefix) {
if (commentPrefix == null || commentPrefix.length() == 0) {
throw new IllegalArgumentException("The comment prefix must not be null or an empty string");
}
this.commentPrefix = commentPrefix;
return this;
}
/**
* Configures which fields of the CSV file should be included and which should be skipped. The
* parser will look at the first {@code n} fields, where {@code n} is the length of the boolean
......@@ -212,6 +233,20 @@ public class CsvReader {
skipFirstLineAsHeader = true;
return this;
}
/**
* Sets the CSV reader to ignore any invalid lines.
* This is useful for files that contain an empty line at the end, multiple header lines or comments. This would throw an exception otherwise.
*
* @return The CSV reader instance itself, to allow for fluent function chaining.
*/
public CsvReader ignoreInvalidLines(){
ignoreInvalidLines = true;
return this;
}
/**
* Configures the reader to read the CSV data and parse it to the given type. The type must be a subclass of
* {@link Tuple}. The type information for the fields is obtained from the type class. The type
......@@ -246,7 +281,9 @@ public class CsvReader {
private void configureInputFormat(CsvInputFormat<?> format, Class<?>... types) {
format.setDelimiter(this.lineDelimiter);
format.setFieldDelimiter(this.fieldDelimiter);
format.setCommentPrefix(this.commentPrefix);
format.setSkipFirstLineAsHeader(skipFirstLineAsHeader);
format.setLenient(ignoreInvalidLines);
if (this.includedMask == null) {
format.setFieldTypes(types);
} else {
......
......@@ -18,6 +18,9 @@
package org.apache.flink.api.java.io;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.util.Arrays;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
......@@ -45,6 +48,22 @@ public class CSVReaderTest {
Assert.assertTrue(reader.skipFirstLineAsHeader);
}
@Test
public void testIgnoreInvalidLinesConfigure() {
CsvReader reader = getCsvReader();
Assert.assertFalse(reader.ignoreInvalidLines);
reader.ignoreInvalidLines();
Assert.assertTrue(reader.ignoreInvalidLines);
}
@Test
public void testIgnoreComments() {
CsvReader reader = getCsvReader();
assertNull(reader.commentPrefix);
reader.ignoreComments("#");
assertEquals("#", reader.commentPrefix);
}
@Test
public void testIncludeFieldsDense() {
CsvReader reader = getCsvReader();
......
......@@ -53,6 +53,143 @@ public class CsvInputFormatTest {
private static final String FIRST_PART = "That is the first part";
private static final String SECOND_PART = "That is the second part";
@Test
public void ignoreInvalidLines() {
try {
final String fileContent = "#description of the data\n" +
"header1|header2|header3|\n"+
"this is|1|2.0|\n"+
"//a comment\n" +
"a test|3|4.0|\n" +
"#next|5|6.0|\n";
final FileInputSplit split = createTempFile(fileContent);
CsvInputFormat<Tuple3<String, Integer, Double>> format =
new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", '|', String.class, Integer.class, Double.class);
format.setLenient(true);
final Configuration parameters = new Configuration();
format.configure(parameters);
format.open(split);
Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("this is", result.f0);
assertEquals(new Integer(1), result.f1);
assertEquals(new Double(2.0), result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("a test", result.f0);
assertEquals(new Integer(3), result.f1);
assertEquals(new Double(4.0), result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("#next", result.f0);
assertEquals(new Integer(5), result.f1);
assertEquals(new Double(6.0), result.f2);
result = format.nextRecord(result);
assertNull(result);
}
catch (Exception ex) {
ex.printStackTrace();
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
@Test
public void ignoreSingleCharPrefixComments() {
try {
final String fileContent = "#description of the data\n" +
"#successive commented line\n" +
"this is|1|2.0|\n" +
"a test|3|4.0|\n" +
"#next|5|6.0|\n";
final FileInputSplit split = createTempFile(fileContent);
CsvInputFormat<Tuple3<String, Integer, Double>> format =
new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", '|', String.class, Integer.class, Double.class);
format.setCommentPrefix("#");
final Configuration parameters = new Configuration();
format.configure(parameters);
format.open(split);
Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("this is", result.f0);
assertEquals(new Integer(1), result.f1);
assertEquals(new Double(2.0), result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("a test", result.f0);
assertEquals(new Integer(3), result.f1);
assertEquals(new Double(4.0), result.f2);
result = format.nextRecord(result);
assertNull(result);
}
catch (Exception ex) {
ex.printStackTrace();
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
@Test
public void ignoreMultiCharPrefixComments() {
try {
final String fileContent = "//description of the data\n" +
"//successive commented line\n" +
"this is|1|2.0|\n"+
"a test|3|4.0|\n" +
"//next|5|6.0|\n";
final FileInputSplit split = createTempFile(fileContent);
CsvInputFormat<Tuple3<String, Integer, Double>> format =
new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", '|', String.class, Integer.class, Double.class);
format.setCommentPrefix("//");
final Configuration parameters = new Configuration();
format.configure(parameters);
format.open(split);
Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("this is", result.f0);
assertEquals(new Integer(1), result.f1);
assertEquals(new Double(2.0), result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("a test", result.f0);
assertEquals(new Integer(3), result.f1);
assertEquals(new Double(4.0), result.f2);
result = format.nextRecord(result);
assertNull(result);
}
catch (Exception ex) {
ex.printStackTrace();
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
@Test
public void readStringFields() {
......
......@@ -19,6 +19,7 @@
package org.apache.flink.api.scala.operators;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.apache.flink.api.common.io.GenericCsvInputFormat;
......@@ -30,7 +31,13 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;
import java.util.Map;
import java.util.TreeMap;
......@@ -39,6 +46,8 @@ import scala.Product;
public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFormat<OUT> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(ScalaCsvInputFormat.class);
private transient Object[] parsedValues;
......@@ -47,7 +56,12 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor
private boolean lineDelimiterIsLinebreak = false;
private final TupleSerializerBase<OUT> serializer;
private byte[] commentPrefix = null;
private transient int commentCount;
private transient int invalidLineCount;
public ScalaCsvInputFormat(Path filePath, TypeInformation<OUT> typeInfo) {
super(filePath);
......@@ -80,6 +94,72 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor
setFieldsGeneric(sourceFieldIndices, fieldTypes);
}
public byte[] getCommentPrefix() {
return commentPrefix;
}
public void setCommentPrefix(byte[] commentPrefix) {
this.commentPrefix = commentPrefix;
}
public void setCommentPrefix(char commentPrefix) {
setCommentPrefix(String.valueOf(commentPrefix));
}
public void setCommentPrefix(String commentPrefix) {
setCommentPrefix(commentPrefix, Charsets.UTF_8);
}
public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
if (charsetName == null) {
throw new IllegalArgumentException("Charset name must not be null");
}
if (commentPrefix != null) {
Charset charset = Charset.forName(charsetName);
setCommentPrefix(commentPrefix, charset);
} else {
this.commentPrefix = null;
}
}
public void setCommentPrefix(String commentPrefix, Charset charset) {
if (charset == null) {
throw new IllegalArgumentException("Charset must not be null");
}
if (commentPrefix != null) {
this.commentPrefix = commentPrefix.getBytes(charset);
} else {
this.commentPrefix = null;
}
}
@Override
public void close() throws IOException {
if (this.invalidLineCount > 0) {
if (LOG.isWarnEnabled()) {
LOG.warn("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped.");
}
}
if (this.commentCount > 0) {
if (LOG.isInfoEnabled()) {
LOG.info("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped.");
}
}
super.close();
}
@Override
public OUT nextRecord(OUT record) throws IOException {
OUT returnRecord = null;
do {
returnRecord = super.nextRecord(record);
} while (returnRecord == null && !reachedEnd());
return returnRecord;
}
@Override
public void open(FileInputSplit split) throws IOException {
......@@ -98,6 +178,9 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor
for (int i = 0; i < fieldParsers.length; i++) {
this.parsedValues[i] = fieldParsers[i].createValue();
}
this.commentCount = 0;
this.invalidLineCount = 0;
// left to right evaluation makes access [0] okay
// this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
......@@ -116,11 +199,27 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor
//reduce the number of bytes so that the Carriage return is not taken as data
numBytes--;
}
if (commentPrefix != null && commentPrefix.length <= numBytes) {
//check record for comments
boolean isComment = true;
for (int i = 0; i < commentPrefix.length; i++) {
if (commentPrefix[i] != bytes[offset + i]) {
isComment = false;
break;
}
}
if (isComment) {
this.commentCount++;
return null;
}
}
if (parseRecord(parsedValues, bytes, offset, numBytes)) {
OUT result = serializer.createInstance(parsedValues);
return result;
} else {
this.invalidLineCount++;
return null;
}
}
......
......@@ -155,6 +155,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
* @param lineDelimiter The string that separates lines, defaults to newline.
* @param fieldDelimiter The char that separates individual fields, defaults to ','.
* @param ignoreFirstLine Whether the first line in the file should be ignored.
* @param ignoreComments Lines that start with the given String are ignored, disabled by default.
* @param lenient Whether the parser should silently ignore malformed lines.
* @param includedFields The fields in the file that should be read. Per default all fields
* are read.
......@@ -164,6 +165,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
lineDelimiter: String = "\n",
fieldDelimiter: Char = ',',
ignoreFirstLine: Boolean = false,
ignoreComments: String = null,
lenient: Boolean = false,
includedFields: Array[Int] = null): DataSet[T] = {
......@@ -174,6 +176,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
inputFormat.setFieldDelimiter(fieldDelimiter)
inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
inputFormat.setLenient(lenient)
inputFormat.setCommentPrefix(ignoreComments)
val classes: Array[Class[_]] = new Array[Class[_]](typeInfo.getArity)
for (i <- 0 until typeInfo.getArity) {
......
......@@ -18,6 +18,7 @@
package org.apache.flink.api.scala.io
import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
import org.junit.Assert._
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNotNull
import org.junit.Assert.assertNull
......@@ -28,8 +29,7 @@ import java.io.FileOutputStream
import java.io.FileWriter
import java.io.OutputStreamWriter
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileInputSplit
import org.apache.flink.core.fs.Path
import org.apache.flink.core.fs.{FileInputSplit, Path}
import org.junit.Test
import org.apache.flink.api.scala._
......@@ -39,6 +39,88 @@ class CsvInputFormatTest {
private final val FIRST_PART: String = "That is the first part"
private final val SECOND_PART: String = "That is the second part"
@Test
def ignoreSingleCharPrefixComments():Unit = {
try {
val fileContent = "#description of the data\n" +
"#successive commented line\n" +
"this is|1|2.0|\n" +
"a test|3|4.0|\n" +
"#next|5|6.0|\n"
val split = createTempFile(fileContent)
val format = new ScalaCsvInputFormat[(String, Integer, Double)](
PATH, createTypeInformation[(String, Integer, Double)])
format.setDelimiter("\n")
format.setFieldDelimiter('|')
format.setCommentPrefix("#")
val parameters = new Configuration
format.configure(parameters)
format.open(split)
var result: (String, Integer, Double) = null
result = format.nextRecord(result)
assertNotNull(result)
assertEquals("this is", result._1)
assertEquals(new Integer(1), result._2)
assertEquals(2.0, result._3, 0.0001)
result = format.nextRecord(result)
assertNotNull(result)
assertEquals("a test", result._1)
assertEquals(new Integer(3), result._2)
assertEquals(4.0, result._3, 0.0001)
result = format.nextRecord(result)
assertNull(result)
assertTrue(format.reachedEnd)
}
catch {
case ex: Exception => {
ex.printStackTrace
fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
}
}
}
@Test
def ignoreMultiCharPrefixComments():Unit = {
try {
val fileContent = "//description of the data\n" +
"//successive commented line\n" +
"this is|1|2.0|\n" +
"a test|3|4.0|\n" +
"//next|5|6.0|\n"
val split = createTempFile(fileContent)
val format = new ScalaCsvInputFormat[(String, Integer, Double)](
PATH, createTypeInformation[(String, Integer, Double)])
format.setDelimiter("\n")
format.setFieldDelimiter('|')
format.setCommentPrefix("//")
val parameters = new Configuration
format.configure(parameters)
format.open(split)
var result: (String, Integer, Double) = null
result = format.nextRecord(result)
assertNotNull(result)
assertEquals("this is", result._1)
assertEquals(new Integer(1), result._2)
assertEquals(2.0, result._3, 0.0001)
result = format.nextRecord(result)
assertNotNull(result)
assertEquals("a test", result._1)
assertEquals(new Integer(3), result._2)
assertEquals(4.0, result._3, 0.0001)
result = format.nextRecord(result)
assertNull(result)
assertTrue(format.reachedEnd)
}
catch {
case ex: Exception => {
ex.printStackTrace
fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
}
}
}
@Test
def readStringFields():Unit = {
try {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册