提交 6c261d3a 编写于 作者: S StephanEwen

CsvInputFormat can optionally skip an initial header line.

上级 6344e573
......@@ -467,7 +467,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
// --------------------------------------------------------------------------------------------
private boolean readLine() throws IOException {
protected final boolean readLine() throws IOException {
if (this.stream == null || this.overLimit) {
return false;
}
......
......@@ -55,7 +55,9 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
private char fieldDelim = DEFAULT_FIELD_DELIMITER;
private boolean lenient = false;
private boolean lenient;
private boolean skipFirstLineAsHeader;
// --------------------------------------------------------------------------------------------
......@@ -137,7 +139,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
"Number of field indices and field types must match.");
for (int i : sourceFieldIndices) {
if ( i < 0) {
if (i < 0) {
throw new IllegalArgumentException("Field indices must not be smaller than zero.");
}
}
......@@ -152,7 +154,8 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
if (type != null) {
if (FieldParser.getParserForType(type) == null) {
throw new IllegalArgumentException("The type '" + type.getName() + "' is not supported for the CSV input format.");
throw new IllegalArgumentException("The type '" + type.getName()
+ "' is not supported for the CSV input format.");
}
types.add(type);
fieldIncluded[sourceFieldIndices[i]] = true;
......@@ -191,6 +194,14 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
this.lenient = lenient;
}
public boolean isSkippingFirstLineAsHeader() {
return skipFirstLineAsHeader;
}
public void setSkipFirstLineAsHeader(boolean skipFirstLine) {
this.skipFirstLineAsHeader = skipFirstLine;
}
protected FieldParser<Value>[] getFieldParsers() {
return this.fieldParsers;
}
......@@ -220,6 +231,11 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
}
}
this.fieldParsers = parsers;
// skip the first line, if we are at the beginning of a file and have the option set
if (this.skipFirstLineAsHeader && this.splitStart == 0) {
readLine(); // read and ignore
}
}
protected boolean parseRecord(Value[] valueHolders, byte[] bytes, int offset, int numBytes) throws ParseException {
......@@ -270,7 +286,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
field += (skipCnt - 1);
}
else if (lenient) {
// no valid line, but we go on
// no valid line, but we do not report an exception, simply skip the line
return false;
}
else {
......
......@@ -5,20 +5,18 @@ import java.io.IOException;
import junit.framework.Assert;
import org.apache.log4j.Level;
import org.junit.BeforeClass;
import org.junit.Test;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.util.LogUtils;
import eu.stratosphere.util.OperatingSystem;
public class FileOutputFormatTest {
@BeforeClass
public static void initialize() {
LogUtils.initializeDefaultConsoleLogger(Level.WARN);
LogUtils.initializeDefaultTestConsoleLogger();
}
@Test
......
......@@ -206,8 +206,8 @@ public class GenericCsvInputFormatTest {
final Configuration parameters = new Configuration();
format.setFieldDelim('|');
format.setFields(new int[]{0, 3, 7},
(Class<? extends Value>[]) new Class[]{IntValue.class, IntValue.class, IntValue.class});
format.setFields(new int[] { 0, 3, 7 },
(Class<? extends Value>[]) new Class[] { IntValue.class, IntValue.class, IntValue.class });
format.configure(parameters);
format.open(split);
......@@ -225,8 +225,7 @@ public class GenericCsvInputFormatTest {
assertFalse(format.nextRecord(values));
assertTrue(format.reachedEnd());
}
catch (Exception ex) {
} catch (Exception ex) {
System.err.println(ex.getMessage());
ex.printStackTrace();
Assert.fail("Test erroneous");
......@@ -346,6 +345,37 @@ public class GenericCsvInputFormatTest {
}
}
@SuppressWarnings("unchecked")
@Test
public void testReadInvalidContentsLenientWithSkipping() {
try {
final String fileContent = "abc|dfgsdf|777|444\n" + // good line
"kkz|777|foobar|hhg\n" + // wrong data type in field
"kkz|777foobarhhg \n" + // too short, a skipped field never ends
"xyx|ignored|42|\n"; // another good line
final FileInputSplit split = createTempFile(fileContent);
final Configuration parameters = new Configuration();
format.setFieldDelim('|');
format.setFieldTypes(StringValue.class, null, IntValue.class);
format.setLenient(true);
format.configure(parameters);
format.open(split);
Value[] values = new Value[] { new StringValue(), new IntValue()};
assertTrue(format.nextRecord(values));
assertFalse(format.nextRecord(values));
assertFalse(format.nextRecord(values));
assertTrue(format.nextRecord(values));
}
catch (Exception ex) {
Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
}
}
@SuppressWarnings("unchecked")
@Test
public void readWithEmptyField() {
......@@ -384,6 +414,78 @@ public class GenericCsvInputFormatTest {
}
}
@SuppressWarnings("unchecked")
@Test
public void readWithHeaderLine() {
try {
final String fileContent = "colname-1|colname-2|some name 3|column four|\n" +
"123|abc|456|def|\n"+
"987|xyz|654|pqr|\n";
final FileInputSplit split = createTempFile(fileContent);
final Configuration parameters = new Configuration();
format.setFieldDelim('|');
format.setFieldTypes(IntValue.class, StringValue.class, IntValue.class, StringValue.class);
format.setSkipFirstLineAsHeader(true);
format.configure(parameters);
format.open(split);
Value[] values = new Value[] { new IntValue(), new StringValue(), new IntValue(), new StringValue()};
// first line is skipped as header
assertTrue(format.nextRecord(values)); // first row (= second line)
assertTrue(format.nextRecord(values)); // second row (= third line)
assertFalse(format.nextRecord(values)); // exhausted
assertTrue(format.reachedEnd()); // exhausted
}
catch (Exception ex) {
Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
}
}
@SuppressWarnings("unchecked")
@Test
public void readWithHeaderLineAndInvalidIntermediate() {
try {
final String fileContent = "colname-1|colname-2|some name 3|column four|\n" +
"123|abc|456|def|\n"+
"colname-1|colname-2|some name 3|column four|\n" + // repeated header in the middle
"987|xyz|654|pqr|\n";
final FileInputSplit split = createTempFile(fileContent);
final Configuration parameters = new Configuration();
format.setFieldDelim('|');
format.setFieldTypes(IntValue.class, StringValue.class, IntValue.class, StringValue.class);
format.setSkipFirstLineAsHeader(true);
format.configure(parameters);
format.open(split);
Value[] values = new Value[] { new IntValue(), new StringValue(), new IntValue(), new StringValue()};
// first line is skipped as header
assertTrue(format.nextRecord(values)); // first row (= second line)
try {
format.nextRecord(values);
Assert.fail("Format accepted invalid line.");
}
catch (ParseException e) {
// as we expected
}
}
catch (Exception ex) {
Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
}
}
private FileInputSplit createTempFile(String content) throws IOException {
this.tempFile = File.createTempFile("test_contents", "tmp");
this.tempFile.deleteOnExit();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册