diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java deleted file mode 100644 index a826a06e69e3fb4f0cd2582c1996b5ba1c8e555e..0000000000000000000000000000000000000000 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.io; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.io.ParseException; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.typeutils.RowTypeInfo; -import org.apache.flink.core.fs.Path; -import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.types.parser.FieldParser.ParseErrorState; - -@Internal -public class RowCsvInputFormat extends CsvInputFormat { - - private static final long serialVersionUID = 1L; - - private int arity; - - public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { - this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); - } - - public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { - this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); - } - - public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { - this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); - } - - public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, - int[] includedFieldsMask) { - this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) - : toBooleanMask(includedFieldsMask)); - } - - public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { - this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); - } - - public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, - boolean[] includedFieldsMask) { - super(filePath); - if (rowTypeInfo.getArity() == 0) { - throw new IllegalArgumentException("Row arity must be greater than 0."); - } - - if (includedFieldsMask == null) { - includedFieldsMask = createDefaultMask(rowTypeInfo.getArity()); - } - - this.arity = rowTypeInfo.getArity(); - - setDelimiter(lineDelimiter); - setFieldDelimiter(fieldDelimiter); - - Class[] classes = new Class[rowTypeInfo.getArity()]; - - for (int i = 0; i < rowTypeInfo.getArity(); i++) { - classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass(); - } - - setFieldsGeneric(includedFieldsMask, classes); - } - - - @Override - public Row fillRecord(Row reuse, Object[] parsedValues) { - if (reuse == null) { - reuse = new Row(arity); - } - for (int i = 0; i < parsedValues.length; i++) { - reuse.setField(i, parsedValues[i]); - } - return reuse; - } - - @Override - protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int numBytes) throws ParseException { - boolean[] fieldIncluded = this.fieldIncluded; - - int startPos = offset; - final int limit = offset + numBytes; - - for (int field = 0, output = 0; field < fieldIncluded.length; field++) { - - // check valid start position - if (startPos >= limit) { - if (isLenient()) { - return false; - } else { - throw new ParseException("Row too short: " + new String(bytes, offset, numBytes)); - } - } - - if (fieldIncluded[field]) { - // parse field - @SuppressWarnings("unchecked") - FieldParser parser = (FieldParser) this.getFieldParsers()[output]; - int latestValidPos = startPos; - startPos = parser.resetErrorStateAndParse(bytes, startPos, limit, this.getFieldDelimiter(), holders[output]); - if (!isLenient() && parser.getErrorState() != ParseErrorState.NONE) { - // Row is able to handle null values - if (parser.getErrorState() != ParseErrorState.EMPTY_STRING) { - throw new ParseException( - String.format("Parsing error for column %s of row '%s' originated by %s: %s.", field, - new String(bytes, offset, numBytes), - parser.getClass().getSimpleName(), parser.getErrorState())); - } - } - holders[output] = parser.getLastResult(); - - // check parse result - if (startPos < 0) { - holders[output] = null; - startPos = skipFields(bytes, latestValidPos, limit, this.getFieldDelimiter()); - } - output++; - } else { - // skip field - startPos = skipFields(bytes, startPos, limit, this.getFieldDelimiter()); - } - } - return true; - } - -} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala index fbdc7d52f0a59706667cae100baf5f08edd7bd2d..a31f199c77e25681df8ef47d7ec8537b32b87bbb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala @@ -19,14 +19,14 @@ package org.apache.flink.api.table.plan.nodes.dataset import com.google.common.collect.ImmutableList -import org.apache.calcite.plan.{RelTraitSet, RelOptCluster} -import org.apache.calcite.rel.{RelWriter, RelNode} +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.Values import org.apache.calcite.rex.RexLiteral import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.runtime.ValuesInputFormat +import org.apache.flink.api.table.runtime.io.ValuesInputFormat import org.apache.flink.api.table.typeutils.RowTypeInfo import org.apache.flink.api.table.typeutils.TypeConverter._ import org.apache.flink.api.table.{BatchTableEnvironment, Row} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala index 4a3a704b557449e17b08edd074b1a4706fa2d425..3ae19ac524236cfeb93125e2882b6a44c77827b1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala @@ -25,8 +25,8 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.Values import org.apache.calcite.rex.RexLiteral import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.runtime.io.ValuesInputFormat import org.apache.flink.api.table.{Row, StreamTableEnvironment} -import org.apache.flink.api.table.runtime.ValuesInputFormat import org.apache.flink.api.table.typeutils.RowTypeInfo import org.apache.flink.api.table.typeutils.TypeConverter._ import org.apache.flink.streaming.api.datastream.DataStream diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala new file mode 100644 index 0000000000000000000000000000000000000000..1eb056ccc27393ab6f6a56577550a224da611e41 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime.io + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.io.ParseException +import org.apache.flink.api.java.io.CsvInputFormat +import org.apache.flink.api.java.io.CsvInputFormat.{DEFAULT_FIELD_DELIMITER, DEFAULT_LINE_DELIMITER, createDefaultMask, toBooleanMask} +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.runtime.io.RowCsvInputFormat.extractTypeClasses +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.core.fs.Path +import org.apache.flink.types.parser.FieldParser +import org.apache.flink.types.parser.FieldParser.ParseErrorState + +@Internal +@SerialVersionUID(1L) +class RowCsvInputFormat( + filePath: Path, + rowTypeInfo: RowTypeInfo, + lineDelimiter: String = DEFAULT_LINE_DELIMITER, + fieldDelimiter: String = DEFAULT_FIELD_DELIMITER, + includedFieldsMask: Array[Boolean] = null) + extends CsvInputFormat[Row](filePath) { + + if (rowTypeInfo.getArity == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0.") + } + private val arity = rowTypeInfo.getArity + private lazy val defaultFieldMask = createDefaultMask(arity) + private val fieldsMask = Option(includedFieldsMask).getOrElse(defaultFieldMask) + + // prepare CsvInputFormat + setDelimiter(lineDelimiter) + setFieldDelimiter(fieldDelimiter) + setFieldsGeneric(fieldsMask, extractTypeClasses(rowTypeInfo)) + + def this( + filePath: Path, + rowTypeInfo: RowTypeInfo, + lineDelimiter: String, + fieldDelimiter: String, + includedFieldsMask: Array[Int]) { + this( + filePath, + rowTypeInfo, + lineDelimiter, + fieldDelimiter, + if (includedFieldsMask == null) { + null + } else { + toBooleanMask(includedFieldsMask) + }) + } + + def this( + filePath: Path, + rowTypeInfo: RowTypeInfo, + includedFieldsMask: Array[Int]) { + this( + filePath, + rowTypeInfo, + DEFAULT_LINE_DELIMITER, + DEFAULT_FIELD_DELIMITER, + includedFieldsMask) + } + + def fillRecord(reuse: Row, parsedValues: Array[AnyRef]): Row = { + val reuseRow = if (reuse == null) { + new Row(arity) + } else { + reuse + } + var i: Int = 0 + while (i < parsedValues.length) { + reuse.setField(i, parsedValues(i)) + i += 1 + } + reuseRow + } + + @throws[ParseException] + override protected def parseRecord( + holders: Array[AnyRef], + bytes: Array[Byte], + offset: Int, + numBytes: Int) + : Boolean = { + val fieldDelimiter = this.getFieldDelimiter + val fieldIncluded: Array[Boolean] = this.fieldIncluded + + var startPos = offset + val limit = offset + numBytes + + var field = 0 + var output = 0 + while (field < fieldIncluded.length) { + + // check valid start position + if (startPos >= limit) { + if (isLenient) { + return false + } else { + throw new ParseException("Row too short: " + new String(bytes, offset, numBytes)) + } + } + + if (fieldIncluded(field)) { + // parse field + val parser: FieldParser[AnyRef] = this.getFieldParsers()(output) + .asInstanceOf[FieldParser[AnyRef]] + val latestValidPos = startPos + startPos = parser.resetErrorStateAndParse( + bytes, + startPos, + limit, + fieldDelimiter, + holders(output)) + + if (!isLenient && (parser.getErrorState ne ParseErrorState.NONE)) { + // Row is able to handle null values + if (parser.getErrorState ne ParseErrorState.EMPTY_STRING) { + throw new ParseException(s"Parsing error for column $field of row '" + + new String(bytes, offset, numBytes) + + s"' originated by ${parser.getClass.getSimpleName}: ${parser.getErrorState}.") + } + } + holders(output) = parser.getLastResult + + // check parse result + if (startPos < 0) { + holders(output) = null + startPos = skipFields(bytes, latestValidPos, limit, fieldDelimiter) + } + output += 1 + } else { + // skip field + startPos = skipFields(bytes, startPos, limit, fieldDelimiter) + } + + // check if something went wrong + if (startPos < 0) { + throw new ParseException(s"Unexpected parser position for column $field of row '" + + new String(bytes, offset, numBytes) + "'") + } + + field += 1 + } + true + } +} + +object RowCsvInputFormat { + + private def extractTypeClasses(rowTypeInfo: RowTypeInfo): Array[Class[_]] = { + val classes = for (i <- 0 until rowTypeInfo.getArity) + yield rowTypeInfo.getTypeAt(i).getTypeClass + classes.toArray + } + +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala similarity index 90% rename from flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ValuesInputFormat.scala rename to flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala index 13b7fa08feb90403d0691aeb2866a57056b3e68b..5e0a466f8a74aa14a655e8e22fa0af390b834054 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ValuesInputFormat.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala @@ -16,9 +16,9 @@ * limitations under the License. */ -package org.apache.flink.api.table.runtime +package org.apache.flink.api.table.runtime.io -import org.apache.flink.api.common.io.{NonParallelInput, GenericInputFormat} +import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput} import org.apache.flink.api.table.Row class ValuesInputFormat(val rows: Seq[Row]) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala index 44796b218b8b967076e54d9b1ec4021a3c7348f2..54d7718df39160a556e33f5e31d45414b17bd814 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala @@ -19,14 +19,12 @@ package org.apache.flink.api.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.io.TupleCsvInputFormat -import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TupleTypeInfoBase} +import org.apache.flink.api.java.io.CsvInputFormat import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} -import org.apache.flink.api.table.Row -import org.apache.flink.core.fs.Path +import org.apache.flink.api.table.{Row, TableException} +import org.apache.flink.api.table.runtime.io.RowCsvInputFormat import org.apache.flink.api.table.typeutils.RowTypeInfo -import org.apache.flink.api.java.io.RowCsvInputFormat +import org.apache.flink.core.fs.Path /** * A [[TableSource]] for simple CSV files with a (logically) unlimited number of fields. @@ -45,19 +43,23 @@ class CsvTableSource( path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]], - fieldDelim: String = ",", - rowDelim: String = "\n", + fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER, + rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER, quoteCharacter: Character = null, ignoreFirstLine: Boolean = false, ignoreComments: String = null, lenient: Boolean = false) extends BatchTableSource[Row] { + if (fieldNames.length != fieldTypes.length) { + throw TableException("Number of field names and field types must be equal.") + } + + private val returnType = new RowTypeInfo(fieldTypes) + /** Returns the data of the table as a [[DataSet]] of [[Row]]. */ override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { - - val typeInfo = getReturnType.asInstanceOf[RowTypeInfo] - val inputFormat = new RowCsvInputFormat(new Path(path), rowDelim, fieldDelim, typeInfo) + val inputFormat = new RowCsvInputFormat(new Path(path), returnType, rowDelim, fieldDelim) inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine) inputFormat.setLenient(lenient) @@ -68,7 +70,7 @@ class CsvTableSource( inputFormat.setCommentPrefix(ignoreComments) } - execEnv.createInput(inputFormat, typeInfo) + execEnv.createInput(inputFormat, returnType) } /** Returns the types of the table fields. */ @@ -81,7 +83,5 @@ class CsvTableSource( override def getNumberOfFields: Int = fieldNames.length /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */ - override def getReturnType: RowTypeInfo = { - new RowTypeInfo(fieldTypes) - } + override def getReturnType: RowTypeInfo = returnType } diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java deleted file mode 100644 index 0ab94536ba6d41028c742410710dc28d7fc68c7b..0000000000000000000000000000000000000000 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java +++ /dev/null @@ -1,1086 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.common.io.ParseException; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.PojoTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.typeutils.RowTypeInfo; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; -import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.types.parser.StringParser; -import org.junit.Test; - -public class RowCsvInputFormatTest { - - private static final Path PATH = new Path("an/ignored/file/"); - - //Static variables for testing the removal of \r\n to \n - private static final String FIRST_PART = "That is the first part"; - - private static final String SECOND_PART = "That is the second part"; - - @Test - public void ignoreInvalidLines() { - try { - String fileContent = "#description of the data\n" + - "header1|header2|header3|\n"+ - "this is|1|2.0|\n"+ - "//a comment\n" + - "a test|3|4.0|\n" + - "#next|5|6.0|\n"; - - FileInputSplit split = createTempFile(fileContent); - - RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO - }); - CsvInputFormat format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo); - format.setLenient(false); - - Configuration parameters = new Configuration(); - format.configure(parameters); - format.open(split); - - Row result = new Row(3); - - try { - result = format.nextRecord(result); - fail("Parse Exception was not thrown! (Row too short)"); - } catch (ParseException ex) { - } - - try { - result = format.nextRecord(result); - fail("Parse Exception was not thrown! (Invalid int value)"); - } catch (ParseException ex) { - } - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("this is", result.productElement(0)); - assertEquals(new Integer(1), result.productElement(1)); - assertEquals(new Double(2.0), result.productElement(2)); - - try { - result = format.nextRecord(result); - fail("Parse Exception was not thrown! (Row too short)"); - } catch (ParseException ex) { - } - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("a test", result.productElement(0)); - assertEquals(new Integer(3), result.productElement(1)); - assertEquals(new Double(4.0), result.productElement(2)); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("#next", result.productElement(0)); - assertEquals(new Integer(5), result.productElement(1)); - assertEquals(new Double(6.0), result.productElement(2)); - - result = format.nextRecord(result); - assertNull(result); - - //re-open with lenient = true - format.setLenient(true); - format.configure(parameters); - format.open(split); - - result = new Row(3); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("header1", result.productElement(0)); - assertNull(result.productElement(1)); - assertNull(result.productElement(2)); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("this is", result.productElement(0)); - assertEquals(new Integer(1), result.productElement(1)); - assertEquals(new Double(2.0), result.productElement(2)); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("a test", result.productElement(0)); - assertEquals(new Integer(3), result.productElement(1)); - assertEquals(new Double(4.0), result.productElement(2)); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("#next", result.productElement(0)); - assertEquals(new Integer(5), result.productElement(1)); - assertEquals(new Double(6.0), result.productElement(2)); - - result = format.nextRecord(result); - assertNull(result); - } - catch (Exception ex) { - ex.printStackTrace(); - fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); - } - } - - @Test - public void ignoreSingleCharPrefixComments() { - try { - final String fileContent = - "#description of the data\n" + - "#successive commented line\n" + - "this is|1|2.0|\n" + - "a test|3|4.0|\n" + - "#next|5|6.0|\n"; - - FileInputSplit split = createTempFile(fileContent); - - RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO }); - CsvInputFormat format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo); - format.setCommentPrefix("#"); - - Configuration parameters = new Configuration(); - format.configure(parameters); - format.open(split); - - Row result = new Row(3); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("this is", result.productElement(0)); - assertEquals(new Integer(1), result.productElement(1)); - assertEquals(new Double(2.0), result.productElement(2)); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("a test", result.productElement(0)); - assertEquals(new Integer(3), result.productElement(1)); - assertEquals(new Double(4.0), result.productElement(2)); - - result = format.nextRecord(result); - assertNull(result); - } - catch (Exception ex) { - ex.printStackTrace(); - fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); - } - } - - @Test - public void ignoreMultiCharPrefixComments() { - try { - - - final String fileContent = "//description of the data\n" + - "//successive commented line\n" + - "this is|1|2.0|\n"+ - "a test|3|4.0|\n" + - "//next|5|6.0|\n"; - - final FileInputSplit split = createTempFile(fileContent); - - final RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { - BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO }); - final CsvInputFormat format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo); - format.setCommentPrefix("//"); - - final Configuration parameters = new Configuration(); - format.configure(parameters); - format.open(split); - - Row result = new Row(3); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("this is", result.productElement(0)); - assertEquals(new Integer(1), result.productElement(1)); - assertEquals(new Double(2.0), result.productElement(2)); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("a test", result.productElement(0)); - assertEquals(new Integer(3), result.productElement(1)); - assertEquals(new Double(4.0), result.productElement(2)); - - result = format.nextRecord(result); - assertNull(result); - } - catch (Exception ex) { - ex.printStackTrace(); - fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); - } - } - - @Test - public void readStringFields() { - try { - String fileContent = "abc|def|ghijk\nabc||hhg\n|||"; - FileInputSplit split = createTempFile(fileContent); - - RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO - }); - CsvInputFormat format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo); - - final Configuration parameters = new Configuration(); - format.configure(parameters); - format.open(split); - - Row result = new Row(3); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("abc", result.productElement(0)); - assertEquals("def", result.productElement(1)); - assertEquals("ghijk", result.productElement(2)); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("abc", result.productElement(0)); - assertEquals("", result.productElement(1)); - assertEquals("hhg", result.productElement(2)); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("", result.productElement(0)); - assertEquals("", result.productElement(1)); - assertEquals("", result.productElement(2)); - - result = format.nextRecord(result); - assertNull(result); - assertTrue(format.reachedEnd()); - } - catch (Exception ex) { - ex.printStackTrace(); - fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); - } - } - - @Test - public void readMixedQuotedStringFields() { - try { - String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||"; - FileInputSplit split = createTempFile(fileContent); - - RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO - }); - CsvInputFormat format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo); - - Configuration parameters = new Configuration(); - format.configure(parameters); - format.enableQuotedStringParsing('@'); - format.open(split); - - Row result = new Row(3); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("a|b|c", result.productElement(0)); - assertEquals("def", result.productElement(1)); - assertEquals("ghijk", result.productElement(2)); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("abc", result.productElement(0)); - assertEquals("", result.productElement(1)); - assertEquals("|hhg", result.productElement(2)); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("", result.productElement(0)); - assertEquals("", result.productElement(1)); - assertEquals("", result.productElement(2)); - - result = format.nextRecord(result); - assertNull(result); - assertTrue(format.reachedEnd()); - } - catch (Exception ex) { - ex.printStackTrace(); - fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); - } - } - - @Test - public void readStringFieldsWithTrailingDelimiters() { - try { - String fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n"; - FileInputSplit split = createTempFile(fileContent); - - RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO - }); - CsvInputFormat format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo); - - format.setFieldDelimiter("|-"); - - format.configure(new Configuration()); - format.open(split); - - Row result = new Row(3); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("abc", result.productElement(0)); - assertEquals("def", result.productElement(1)); - assertEquals("ghijk", result.productElement(2)); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("abc", result.productElement(0)); - assertEquals("", result.productElement(1)); - assertEquals("hhg", result.productElement(2)); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("", result.productElement(0)); - assertEquals("", result.productElement(1)); - assertEquals("", result.productElement(2)); - - result = format.nextRecord(result); - assertNull(result); - assertTrue(format.reachedEnd()); - } - catch (Exception ex) { - fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); - } - } - - @Test - public void testIntegerFields() throws IOException { - try { - String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n"; - FileInputSplit split = createTempFile(fileContent); - - RowTypeInfo typeInfo = new RowTypeInfo( - new TypeInformation[] { - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO - }); - CsvInputFormat format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo); - - format.setFieldDelimiter("|"); - - format.configure(new Configuration()); - format.open(split); - - Row result = new Row(5); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals(Integer.valueOf(111), result.productElement(0)); - assertEquals(Integer.valueOf(222), result.productElement(1)); - assertEquals(Integer.valueOf(333), result.productElement(2)); - assertEquals(Integer.valueOf(444), result.productElement(3)); - assertEquals(Integer.valueOf(555), result.productElement(4)); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals(Integer.valueOf(666), result.productElement(0)); - assertEquals(Integer.valueOf(777), result.productElement(1)); - assertEquals(Integer.valueOf(888), result.productElement(2)); - assertEquals(Integer.valueOf(999), result.productElement(3)); - assertEquals(Integer.valueOf(000), result.productElement(4)); - - result = format.nextRecord(result); - assertNull(result); - assertTrue(format.reachedEnd()); - } - catch (Exception ex) { - fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); - } - } - - @Test - public void testEmptyFields() throws IOException { - try{ - String fileContent = - "|0|0|0|0|0|\n" + - "1||1|1|1|1|\n" + - "2|2||2|2|2|\n" + - "3|3|3||3|3|\n" + - "4|4|4|4||4|\n" + - "5|5|5|5|5||\n"; - - FileInputSplit split = createTempFile(fileContent); - - //TODO: FLOAT_TYPE_INFO and DOUBLE_TYPE_INFO don't handle correctly null values - RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { - BasicTypeInfo.SHORT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, -// BasicTypeInfo.FLOAT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, -// BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.BYTE_TYPE_INFO - }); - CsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo); - - format.setFieldDelimiter("|"); - - format.configure(new Configuration()); - format.open(split); - - Row result = new Row(6); - int linesCnt = fileContent.split("\n").length; - - for (int i = 0; i < linesCnt; i++) { - result = format.nextRecord(result); - assertNull(result.productElement(i)); - } - - //ensure no more rows - assertNull(format.nextRecord(result)); - assertTrue(format.reachedEnd()); - } catch (Exception ex) { - fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); - } - } - - @Test - public void testDoubleFields() throws IOException { - try { - String fileContent = "11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n"; - FileInputSplit split = createTempFile(fileContent); - - RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO - }); - CsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo); - - format.setFieldDelimiter("|"); - - format.configure(new Configuration()); - format.open(split); - - Row result = new Row(5); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals(Double.valueOf(11.1), result.productElement(0)); - assertEquals(Double.valueOf(22.2), result.productElement(1)); - assertEquals(Double.valueOf(33.3), result.productElement(2)); - assertEquals(Double.valueOf(44.4), result.productElement(3)); - assertEquals(Double.valueOf(55.5), result.productElement(4)); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals(Double.valueOf(66.6), result.productElement(0)); - assertEquals(Double.valueOf(77.7), result.productElement(1)); - assertEquals(Double.valueOf(88.8), result.productElement(2)); - assertEquals(Double.valueOf(99.9), result.productElement(3)); - assertEquals(Double.valueOf(00.0), result.productElement(4)); - - result = format.nextRecord(result); - assertNull(result); - assertTrue(format.reachedEnd()); - } - catch (Exception ex) { - fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); - } - } - - @Test - public void testReadFirstN() throws IOException { - try { - final String fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n"; - final FileInputSplit split = createTempFile(fileContent); - - final RowTypeInfo typeInfo = new RowTypeInfo( - new TypeInformation[] { - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO - }); - CsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo); - - format.setFieldDelimiter("|"); - - format.configure(new Configuration()); - format.open(split); - - Row result = new Row(2); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals(Integer.valueOf(111), result.productElement(0)); - assertEquals(Integer.valueOf(222), result.productElement(1)); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals(Integer.valueOf(666), result.productElement(0)); - assertEquals(Integer.valueOf(777), result.productElement(1)); - - result = format.nextRecord(result); - assertNull(result); - assertTrue(format.reachedEnd()); - } - catch (Exception ex) { - fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); - } - - } - - @Test - public void testReadSparseWithNullFieldsForTypes() throws IOException { - try { - String fileContent = "111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" + - "000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|"; - FileInputSplit split = createTempFile(fileContent); - - RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO - }); - CsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, - new boolean[] { true, false, false, true, false, false, false, true }); - - format.setFieldDelimiter("|x|"); - - format.setFieldDelimiter("|x|"); - - format.configure(new Configuration()); - format.open(split); - - Row result = new Row(3); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals(Integer.valueOf(111), result.productElement(0)); - assertEquals(Integer.valueOf(444), result.productElement(1)); - assertEquals(Integer.valueOf(888), result.productElement(2)); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals(Integer.valueOf(000), result.productElement(0)); - assertEquals(Integer.valueOf(777), result.productElement(1)); - assertEquals(Integer.valueOf(333), result.productElement(2)); - - result = format.nextRecord(result); - assertNull(result); - assertTrue(format.reachedEnd()); - } - catch (Exception ex) { - fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); - } - } - - @Test - public void testReadSparseWithPositionSetter() throws IOException { - try { - String fileContent = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|"; - FileInputSplit split = createTempFile(fileContent); - - RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO - }); - CsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, new int[] { 0, 3, 7 }); - - format.setFieldDelimiter("|"); - - format.configure(new Configuration()); - format.open(split); - - Row result = new Row(3); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals(Integer.valueOf(111), result.productElement(0)); - assertEquals(Integer.valueOf(444), result.productElement(1)); - assertEquals(Integer.valueOf(888), result.productElement(2)); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals(Integer.valueOf(000), result.productElement(0)); - assertEquals(Integer.valueOf(777), result.productElement(1)); - assertEquals(Integer.valueOf(333), result.productElement(2)); - - result = format.nextRecord(result); - assertNull(result); - assertTrue(format.reachedEnd()); - } - catch (Exception ex) { - fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); - } - } - - @Test - public void testReadSparseWithMask() throws IOException { - try { - String fileContent = - "111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" + - "000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&"; - FileInputSplit split = createTempFile(fileContent); - - RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO - }); - CsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, - new boolean[] { true, false, false, true, false, false, false, true }); - - format.setFieldDelimiter("&&"); - - format.configure(new Configuration()); - format.open(split); - - Row result = new Row(3); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals(Integer.valueOf(111), result.productElement(0)); - assertEquals(Integer.valueOf(444), result.productElement(1)); - assertEquals(Integer.valueOf(888), result.productElement(2)); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals(Integer.valueOf(000), result.productElement(0)); - assertEquals(Integer.valueOf(777), result.productElement(1)); - assertEquals(Integer.valueOf(333), result.productElement(2)); - - result = format.nextRecord(result); - assertNull(result); - assertTrue(format.reachedEnd()); - } - catch (Exception ex) { - fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); - } - } - - @Test - public void testParseStringErrors() throws Exception { - StringParser stringParser = new StringParser(); - stringParser.enableQuotedStringParsing((byte)'"'); - - Object[][] failures = { - {"\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING}, - {"\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING} - }; - - for (Object[] failure : failures) { - String input = (String) failure[0]; - - int result = stringParser.parseField(input.getBytes(), 0, input.length(), new byte[]{'|'}, null); - - assertThat(result, is(-1)); - assertThat(stringParser.getErrorState(), is(failure[1])); - } - - - } - - // Test disabled becase we do not support double-quote escaped quotes right now. - // @Test - public void testParserCorrectness() throws Exception { - // RFC 4180 Compliance Test content - // Taken from http://en.wikipedia.org/wiki/Comma-separated_values#Example - String fileContent = - "Year,Make,Model,Description,Price\n" + - "1997,Ford,E350,\"ac, abs, moon\",3000.00\n" + - "1999,Chevy,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00\n" + - "1996,Jeep,Grand Cherokee,\"MUST SELL! air, moon roof, loaded\",4799.00\n" + - "1999,Chevy,\"Venture \"\"Extended Edition, Very Large\"\"\",,5000.00\n" + - ",,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00"; - - FileInputSplit split = createTempFile(fileContent); - - RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO - }); - CsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo); - - format.setSkipFirstLineAsHeader(true); - format.setFieldDelimiter(','); - - format.configure(new Configuration()); - format.open(split); - - Row result = new Row(5); - - Row r1 = new Row(5); - r1.setField(0, 1997); - r1.setField(1, "Ford"); - r1.setField(2, "E350"); - r1.setField(3, "ac, abs, moon"); - r1.setField(4, 3000.0); - Row r2 = new Row(5); - r2.setField(0, 1999); - r2.setField(1, "Chevy"); - r2.setField(2, "Venture \"Extended Edition\""); - r2.setField(3, ""); - r2.setField(4, 4900.0); - Row r3 = new Row(5); - r3.setField(0, 1996); - r3.setField(1, "Jeep"); - r3.setField(2, "Grand Cherokee"); - r3.setField(3, "MUST SELL! air, moon roof, loaded"); - r3.setField(4, 4799.0); - Row r4 = new Row(5); - r4.setField(0, 1999); - r4.setField(1, "Chevy"); - r4.setField(2, "Venture \"Extended Edition, Very Large\""); - r4.setField(3, ""); - r4.setField(4, 5000.0); - Row r5 = new Row(5); - r5.setField(0, 0); - r5.setField(1, ""); - r5.setField(2, "Venture \"Extended Edition\""); - r5.setField(3, ""); - r5.setField(4, 4900.0); - - Row[] expectedLines = new Row[] { r1, r2, r3, r4, r5 }; - try { - for (Row expected : expectedLines) { - result = format.nextRecord(result); - assertEquals(expected, result); - } - - assertNull(format.nextRecord(result)); - assertTrue(format.reachedEnd()); - - } catch (Exception ex) { - fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); - } - - } - - private FileInputSplit createTempFile(String content) throws IOException { - File tempFile = File.createTempFile("test_contents", "tmp"); - tempFile.deleteOnExit(); - - OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile), StandardCharsets.UTF_8); - wrt.write(content); - wrt.close(); - - return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"}); - } - - @Test - public void testWindowsLineEndRemoval() { - - //Check typical use case -- linux file is correct and it is set up to linuc(\n) - this.testRemovingTrailingCR("\n", "\n"); - - //Check typical windows case -- windows file endings and file has windows file endings set up - this.testRemovingTrailingCR("\r\n", "\r\n"); - - //Check problematic case windows file -- windows file endings(\r\n) but linux line endings (\n) set up - this.testRemovingTrailingCR("\r\n", "\n"); - - //Check problematic case linux file -- linux file endings (\n) but windows file endings set up (\r\n) - //Specific setup for windows line endings will expect \r\n because it has to be set up and is not standard. - } - - private void testRemovingTrailingCR(String lineBreakerInFile, String lineBreakerSetup) { - File tempFile=null; - - String fileContent = FIRST_PART + lineBreakerInFile + SECOND_PART + lineBreakerInFile; - - try { - // create input file - tempFile = File.createTempFile("CsvInputFormatTest", "tmp"); - tempFile.deleteOnExit(); - tempFile.setWritable(true); - - OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); - wrt.write(fileContent); - wrt.close(); - - RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO }); - CsvInputFormat inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo); - - Configuration parameters = new Configuration(); - inputFormat.configure(parameters); - - inputFormat.setDelimiter(lineBreakerSetup); - - FileInputSplit[] splits = inputFormat.createInputSplits(1); - - inputFormat.open(splits[0]); - - Row result = inputFormat.nextRecord(new Row(1)); - - assertNotNull("Expecting to not return null", result); - - assertEquals(FIRST_PART, result.productElement(0)); - - result = inputFormat.nextRecord(result); - - assertNotNull("Expecting to not return null", result); - assertEquals(SECOND_PART, result.productElement(0)); - - } - catch (Throwable t) { - System.err.println("test failed with exception: " + t.getMessage()); - t.printStackTrace(System.err); - fail("Test erroneous"); - } - } - - @Test - public void testQuotedStringParsingWithIncludeFields() throws Exception { - final String fileContent = "\"20:41:52-1-3-2015\"|\"Re: Taskmanager memory error in Eclipse\"|" + - "\"Blahblah \"|\"blaaa|\"blubb\""; - - final File tempFile = File.createTempFile("CsvReaderQuotedString", "tmp"); - tempFile.deleteOnExit(); - tempFile.setWritable(true); - - OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(tempFile)); - writer.write(fileContent); - writer.close(); - - final RowTypeInfo typeInfo = new RowTypeInfo( - new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO }); - CsvInputFormat inputFormat = new RowCsvInputFormat( - new Path(tempFile.toURI().toString()), typeInfo, new boolean[] { true, false, true }); - - inputFormat.enableQuotedStringParsing('"'); - inputFormat.setFieldDelimiter('|'); - inputFormat.setDelimiter('\n'); - - inputFormat.configure(new Configuration()); - FileInputSplit[] splits = inputFormat.createInputSplits(1); - - inputFormat.open(splits[0]); - - Row record = inputFormat.nextRecord(new Row(2)); - - assertEquals("20:41:52-1-3-2015", record.productElement(0)); - assertEquals("Blahblah ", record.productElement(1)); - } - - @Test - public void testQuotedStringParsingWithEscapedQuotes() throws Exception { - final String fileContent = "\"\\\"Hello\\\" World\"|\"We are\\\" young\""; - - final File tempFile = File.createTempFile("CsvReaderQuotedString", "tmp"); - tempFile.deleteOnExit(); - tempFile.setWritable(true); - - OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(tempFile)); - writer.write(fileContent); - writer.close(); - - RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO }); - CsvInputFormat inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo); - - inputFormat.enableQuotedStringParsing('"'); - inputFormat.setFieldDelimiter('|'); - inputFormat.setDelimiter('\n'); - - inputFormat.configure(new Configuration()); - FileInputSplit[] splits = inputFormat.createInputSplits(1); - - inputFormat.open(splits[0]); - - Row record = inputFormat.nextRecord(new Row(2)); - - assertEquals("\\\"Hello\\\" World", record.productElement(0)); - assertEquals("We are\\\" young", record.productElement(1)); - } - - /** - * Tests that the CSV input format can deal with POJOs which are subclasses. - * - * @throws Exception - */ - @Test - public void testPojoSubclassType() throws Exception { - final String fileContent = "t1,foobar,tweet2\nt2,barfoo,tweet2"; - - final File tempFile = File.createTempFile("CsvReaderPOJOSubclass", "tmp"); - tempFile.deleteOnExit(); - - OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(tempFile)); - writer.write(fileContent); - writer.close(); - - PojoTypeInfo typeInfo = (PojoTypeInfo)TypeExtractor.createTypeInfo(TwitterPOJO.class); - CsvInputFormat inputFormat = new PojoCsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo); - - inputFormat.configure(new Configuration()); - FileInputSplit[] splits = inputFormat.createInputSplits(1); - - inputFormat.open(splits[0]); - - List expected = new ArrayList<>(); - - for (String line: fileContent.split("\n")) { - String[] elements = line.split(","); - expected.add(new TwitterPOJO(elements[0], elements[1], elements[2])); - } - - List actual = new ArrayList<>(); - - TwitterPOJO pojo; - - while((pojo = inputFormat.nextRecord(new TwitterPOJO())) != null) { - actual.add(pojo); - } - - assertEquals(expected, actual); - } - - // -------------------------------------------------------------------------------------------- - // Custom types for testing - // -------------------------------------------------------------------------------------------- - - public static class PojoItem { - public int field1; - public String field2; - public Double field3; - public String field4; - } - - public static class PrivatePojoItem { - private int field1; - private String field2; - private Double field3; - private String field4; - - public int getField1() { - return field1; - } - - public void setField1(int field1) { - this.field1 = field1; - } - - public String getField2() { - return field2; - } - - public void setField2(String field2) { - this.field2 = field2; - } - - public Double getField3() { - return field3; - } - - public void setField3(Double field3) { - this.field3 = field3; - } - - public String getField4() { - return field4; - } - - public void setField4(String field4) { - this.field4 = field4; - } - } - - public static class POJO { - public String table; - public String time; - - public POJO() { - this("", ""); - } - - public POJO(String table, String time) { - this.table = table; - this.time = time; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof POJO) { - POJO other = (POJO) obj; - return table.equals(other.table) && time.equals(other.time); - } else { - return false; - } - } - } - - public static class TwitterPOJO extends POJO { - public String tweet; - - public TwitterPOJO() { - this("", "", ""); - } - - public TwitterPOJO(String table, String time, String tweet) { - super(table, time); - this.tweet = tweet; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof TwitterPOJO) { - TwitterPOJO other = (TwitterPOJO) obj; - return super.equals(other) && tweet.equals(other.tweet); - } else { - return false; - } - } - } - -} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala new file mode 100644 index 0000000000000000000000000000000000000000..540776d2d30a96b6bdc52f0913ef738b8d6ca888 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala @@ -0,0 +1,841 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime.io + +import java.io.{File, FileOutputStream, OutputStreamWriter} +import java.nio.charset.StandardCharsets + +import org.apache.flink.api.common.io.ParseException +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.runtime.io.RowCsvInputFormatTest.{PATH, createTempFile, testRemovingTrailingCR} +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.core.fs.{FileInputSplit, Path} +import org.apache.flink.types.parser.{FieldParser, StringParser} +import org.junit.Assert._ +import org.junit.{Ignore, Test} + +class RowCsvInputFormatTest { + + @Test + def ignoreInvalidLines() { + val fileContent = + "#description of the data\n" + + "header1|header2|header3|\n" + + "this is|1|2.0|\n" + + "//a comment\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n" + + val split = createTempFile(fileContent) + + val typeInfo = new RowTypeInfo(Seq( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO)) + + val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|") + format.setLenient(false) + val parameters = new Configuration + format.configure(parameters) + format.open(split) + + var result = new Row(3) + try { + result = format.nextRecord(result) + fail("Parse Exception was not thrown! (Row too short)") + } + catch { + case ex: ParseException => // ok + } + + try { + result = format.nextRecord(result) + fail("Parse Exception was not thrown! (Invalid int value)") + } + catch { + case ex: ParseException => // ok + } + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("this is", result.productElement(0)) + assertEquals(1, result.productElement(1)) + assertEquals(2.0, result.productElement(2)) + + try { + result = format.nextRecord(result) + fail("Parse Exception was not thrown! (Row too short)") + } + catch { + case ex: ParseException => // ok + } + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("a test", result.productElement(0)) + assertEquals(3, result.productElement(1)) + assertEquals(4.0, result.productElement(2)) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("#next", result.productElement(0)) + assertEquals(5, result.productElement(1)) + assertEquals(6.0, result.productElement(2)) + + result = format.nextRecord(result) + assertNull(result) + + // re-open with lenient = true + format.setLenient(true) + format.configure(parameters) + format.open(split) + + result = new Row(3) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("header1", result.productElement(0)) + assertNull(result.productElement(1)) + assertNull(result.productElement(2)) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("this is", result.productElement(0)) + assertEquals(1, result.productElement(1)) + assertEquals(2.0, result.productElement(2)) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("a test", result.productElement(0)) + assertEquals(3, result.productElement(1)) + assertEquals(4.0, result.productElement(2)) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("#next", result.productElement(0)) + assertEquals(5, result.productElement(1)) + assertEquals(6.0, result.productElement(2)) + result = format.nextRecord(result) + assertNull(result) + } + + @Test + def ignoreSingleCharPrefixComments() { + val fileContent = + "#description of the data\n" + + "#successive commented line\n" + + "this is|1|2.0|\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n" + + val split = createTempFile(fileContent) + + val typeInfo = new RowTypeInfo(Seq( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO)) + + val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|") + format.setCommentPrefix("#") + format.configure(new Configuration) + format.open(split) + + var result = new Row(3) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("this is", result.productElement(0)) + assertEquals(1, result.productElement(1)) + assertEquals(2.0, result.productElement(2)) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("a test", result.productElement(0)) + assertEquals(3, result.productElement(1)) + assertEquals(4.0, result.productElement(2)) + + result = format.nextRecord(result) + assertNull(result) + } + + @Test + def ignoreMultiCharPrefixComments() { + val fileContent = + "//description of the data\n" + + "//successive commented line\n" + + "this is|1|2.0|\n" + + "a test|3|4.0|\n" + + "//next|5|6.0|\n" + + val split = createTempFile(fileContent) + + val typeInfo = new RowTypeInfo(Seq( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO)) + + val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|") + format.setCommentPrefix("//") + format.configure(new Configuration) + format.open(split) + + var result = new Row(3) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("this is", result.productElement(0)) + assertEquals(1, result.productElement(1)) + assertEquals(2.0, result.productElement(2)) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("a test", result.productElement(0)) + assertEquals(3, result.productElement(1)) + assertEquals(4.0, result.productElement(2)) + result = format.nextRecord(result) + assertNull(result) + } + + @Test + def readStringFields() { + val fileContent = "abc|def|ghijk\nabc||hhg\n|||" + + val split = createTempFile(fileContent) + + val typeInfo = new RowTypeInfo(Seq( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO)) + + val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|") + format.configure(new Configuration) + format.open(split) + + var result = new Row(3) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("abc", result.productElement(0)) + assertEquals("def", result.productElement(1)) + assertEquals("ghijk", result.productElement(2)) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("abc", result.productElement(0)) + assertEquals("", result.productElement(1)) + assertEquals("hhg", result.productElement(2)) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("", result.productElement(0)) + assertEquals("", result.productElement(1)) + assertEquals("", result.productElement(2)) + + result = format.nextRecord(result) + assertNull(result) + assertTrue(format.reachedEnd) + } + + @Test def readMixedQuotedStringFields() { + val fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||" + + val split = createTempFile(fileContent) + + val typeInfo = new RowTypeInfo(Seq( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO)) + + val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|") + format.configure(new Configuration) + format.enableQuotedStringParsing('@') + format.open(split) + + var result = new Row(3) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("a|b|c", result.productElement(0)) + assertEquals("def", result.productElement(1)) + assertEquals("ghijk", result.productElement(2)) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("abc", result.productElement(0)) + assertEquals("", result.productElement(1)) + assertEquals("|hhg", result.productElement(2)) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("", result.productElement(0)) + assertEquals("", result.productElement(1)) + assertEquals("", result.productElement(2)) + + result = format.nextRecord(result) + assertNull(result) + assertTrue(format.reachedEnd) + } + + @Test def readStringFieldsWithTrailingDelimiters() { + val fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n" + + val split = createTempFile(fileContent) + + val typeInfo = new RowTypeInfo(Seq( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO)) + + val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|") + format.setFieldDelimiter("|-") + format.configure(new Configuration) + format.open(split) + + var result = new Row(3) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("abc", result.productElement(0)) + assertEquals("def", result.productElement(1)) + assertEquals("ghijk", result.productElement(2)) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("abc", result.productElement(0)) + assertEquals("", result.productElement(1)) + assertEquals("hhg", result.productElement(2)) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("", result.productElement(0)) + assertEquals("", result.productElement(1)) + assertEquals("", result.productElement(2)) + + result = format.nextRecord(result) + assertNull(result) + assertTrue(format.reachedEnd) + } + + @Test + def testIntegerFields() { + val fileContent = "111|222|333|444|555\n666|777|888|999|000|\n" + + val split = createTempFile(fileContent) + + val typeInfo = new RowTypeInfo(Seq( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO)) + + val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|") + + format.setFieldDelimiter("|") + format.configure(new Configuration) + format.open(split) + + var result = new Row(5) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals(111, result.productElement(0)) + assertEquals(222, result.productElement(1)) + assertEquals(333, result.productElement(2)) + assertEquals(444, result.productElement(3)) + assertEquals(555, result.productElement(4)) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals(666, result.productElement(0)) + assertEquals(777, result.productElement(1)) + assertEquals(888, result.productElement(2)) + assertEquals(999, result.productElement(3)) + assertEquals(0, result.productElement(4)) + + result = format.nextRecord(result) + assertNull(result) + assertTrue(format.reachedEnd) + } + + @Test + def testEmptyFields() { + val fileContent = + "|0|0|0|0|0|\n" + + "1||1|1|1|1|\n" + + "2|2||2|2|2|\n" + + "3|3|3||3|3|\n" + + "4|4|4|4||4|\n" + + "5|5|5|5|5||\n" + + val split = createTempFile(fileContent) + + // TODO: FLOAT_TYPE_INFO and DOUBLE_TYPE_INFO don't handle correctly null values + val typeInfo = new RowTypeInfo(Seq( + BasicTypeInfo.SHORT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.BYTE_TYPE_INFO)) + + val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo) + format.setFieldDelimiter("|") + format.configure(new Configuration) + format.open(split) + + var result = new Row(6) + val linesCnt = fileContent.split("\n").length + + var i = 0 + while (i < linesCnt) { + result = format.nextRecord(result) + assertNull(result.productElement(i)) + i += 1 + } + + // ensure no more rows + assertNull(format.nextRecord(result)) + assertTrue(format.reachedEnd) + } + + @Test + def testDoubleFields() { + val fileContent = "11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n" + + val split = createTempFile(fileContent) + + val typeInfo = new RowTypeInfo(Seq( + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO)) + + val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo) + format.setFieldDelimiter("|") + format.configure(new Configuration) + format.open(split) + + var result = new Row(5) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals(11.1, result.productElement(0)) + assertEquals(22.2, result.productElement(1)) + assertEquals(33.3, result.productElement(2)) + assertEquals(44.4, result.productElement(3)) + assertEquals(55.5, result.productElement(4)) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals(66.6, result.productElement(0)) + assertEquals(77.7, result.productElement(1)) + assertEquals(88.8, result.productElement(2)) + assertEquals(99.9, result.productElement(3)) + assertEquals(0.0, result.productElement(4)) + + result = format.nextRecord(result) + assertNull(result) + assertTrue(format.reachedEnd) + } + + @Test + def testReadFirstN() { + val fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n" + + val split = createTempFile(fileContent) + + val typeInfo = new RowTypeInfo(Seq( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO)) + + val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo) + format.setFieldDelimiter("|") + format.configure(new Configuration) + format.open(split) + + var result = new Row(2) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals(111, result.productElement(0)) + assertEquals(222, result.productElement(1)) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals(666, result.productElement(0)) + assertEquals(777, result.productElement(1)) + + result = format.nextRecord(result) + assertNull(result) + assertTrue(format.reachedEnd) + } + + @Test + def testReadSparseWithNullFieldsForTypes() { + val fileContent = "111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" + + "000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|" + + val split = createTempFile(fileContent) + + val typeInfo: RowTypeInfo = new RowTypeInfo(Seq( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO)) + + val format = new RowCsvInputFormat( + PATH, + rowTypeInfo = typeInfo, + includedFieldsMask = Array(true, false, false, true, false, false, false, true)) + format.setFieldDelimiter("|x|") + format.configure(new Configuration) + format.open(split) + + var result = new Row(3) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals(111, result.productElement(0)) + assertEquals(444, result.productElement(1)) + assertEquals(888, result.productElement(2)) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals(0, result.productElement(0)) + assertEquals(777, result.productElement(1)) + assertEquals(333, result.productElement(2)) + + result = format.nextRecord(result) + assertNull(result) + assertTrue(format.reachedEnd) + } + + @Test + def testReadSparseWithPositionSetter() { + val fileContent = "111|222|333|444|555|666|777|888|999|000|\n" + + "000|999|888|777|666|555|444|333|222|111|" + + val split = createTempFile(fileContent) + + val typeInfo = new RowTypeInfo(Seq( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO)) + + val format = new RowCsvInputFormat( + PATH, + typeInfo, + Array(0, 3, 7)) + format.setFieldDelimiter("|") + format.configure(new Configuration) + format.open(split) + + var result = new Row(3) + result = format.nextRecord(result) + + assertNotNull(result) + assertEquals(111, result.productElement(0)) + assertEquals(444, result.productElement(1)) + assertEquals(888, result.productElement(2)) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals(0, result.productElement(0)) + assertEquals(777, result.productElement(1)) + assertEquals(333, result.productElement(2)) + + result = format.nextRecord(result) + assertNull(result) + assertTrue(format.reachedEnd) + } + + @Test + def testReadSparseWithMask() { + val fileContent = "111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" + + "000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&" + + val split = RowCsvInputFormatTest.createTempFile(fileContent) + + val typeInfo = new RowTypeInfo(Seq( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO)) + + val format = new RowCsvInputFormat( + PATH, + rowTypeInfo = typeInfo, + includedFieldsMask = Array(true, false, false, true, false, false, false, true)) + format.setFieldDelimiter("&&") + format.configure(new Configuration) + format.open(split) + + var result = new Row(3) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals(111, result.productElement(0)) + assertEquals(444, result.productElement(1)) + assertEquals(888, result.productElement(2)) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals(0, result.productElement(0)) + assertEquals(777, result.productElement(1)) + assertEquals(333, result.productElement(2)) + + result = format.nextRecord(result) + assertNull(result) + assertTrue(format.reachedEnd) + } + + @Test + def testParseStringErrors() { + val stringParser = new StringParser + stringParser.enableQuotedStringParsing('"'.toByte) + + val failures = Seq( + ("\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING), + ("\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING) + ) + + for (failure <- failures) { + val result = stringParser.parseField( + failure._1.getBytes, + 0, + failure._1.length, + Array[Byte]('|'), + null) + + assertEquals(-1, result) + assertEquals(failure._2, stringParser.getErrorState) + } + } + + // Test disabled because we do not support double-quote escaped quotes right now. + @Test + @Ignore + def testParserCorrectness() { + // RFC 4180 Compliance Test content + // Taken from http://en.wikipedia.org/wiki/Comma-separated_values#Example + val fileContent = "Year,Make,Model,Description,Price\n" + + "1997,Ford,E350,\"ac, abs, moon\",3000.00\n" + + "1999,Chevy,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00\n" + + "1996,Jeep,Grand Cherokee,\"MUST SELL! air, moon roof, loaded\",4799.00\n" + + "1999,Chevy,\"Venture \"\"Extended Edition, Very Large\"\"\",,5000.00\n" + + ",,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00" + + val split = createTempFile(fileContent) + + val typeInfo = new RowTypeInfo(Seq( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO)) + + val format = new RowCsvInputFormat(PATH, typeInfo) + format.setSkipFirstLineAsHeader(true) + format.setFieldDelimiter(',') + format.configure(new Configuration) + format.open(split) + + var result = new Row(5) + val r1: Row = new Row(5) + r1.setField(0, 1997) + r1.setField(1, "Ford") + r1.setField(2, "E350") + r1.setField(3, "ac, abs, moon") + r1.setField(4, 3000.0) + + val r2: Row = new Row(5) + r2.setField(0, 1999) + r2.setField(1, "Chevy") + r2.setField(2, "Venture \"Extended Edition\"") + r2.setField(3, "") + r2.setField(4, 4900.0) + + val r3: Row = new Row(5) + r3.setField(0, 1996) + r3.setField(1, "Jeep") + r3.setField(2, "Grand Cherokee") + r3.setField(3, "MUST SELL! air, moon roof, loaded") + r3.setField(4, 4799.0) + + val r4: Row = new Row(5) + r4.setField(0, 1999) + r4.setField(1, "Chevy") + r4.setField(2, "Venture \"Extended Edition, Very Large\"") + r4.setField(3, "") + r4.setField(4, 5000.0) + + val r5: Row = new Row(5) + r5.setField(0, 0) + r5.setField(1, "") + r5.setField(2, "Venture \"Extended Edition\"") + r5.setField(3, "") + r5.setField(4, 4900.0) + + val expectedLines = Array(r1, r2, r3, r4, r5) + for (expected <- expectedLines) { + result = format.nextRecord(result) + assertEquals(expected, result) + } + assertNull(format.nextRecord(result)) + assertTrue(format.reachedEnd) + } + + @Test + def testWindowsLineEndRemoval() { + + // check typical use case -- linux file is correct and it is set up to linux(\n) + testRemovingTrailingCR("\n", "\n") + + // check typical windows case -- windows file endings and file has windows file endings set up + testRemovingTrailingCR("\r\n", "\r\n") + + // check problematic case windows file -- windows file endings(\r\n) + // but linux line endings (\n) set up + testRemovingTrailingCR("\r\n", "\n") + + // check problematic case linux file -- linux file endings (\n) + // but windows file endings set up (\r\n) + // specific setup for windows line endings will expect \r\n because + // it has to be set up and is not standard. + } + + @Test + def testQuotedStringParsingWithIncludeFields() { + val fileContent = "\"20:41:52-1-3-2015\"|\"Re: Taskmanager memory error in Eclipse\"|" + + "\"Blahblah \"|\"blaaa|\"blubb\"" + val tempFile = File.createTempFile("CsvReaderQuotedString", "tmp") + tempFile.deleteOnExit() + tempFile.setWritable(true) + + val writer = new OutputStreamWriter(new FileOutputStream(tempFile)) + writer.write(fileContent) + writer.close() + + val typeInfo = new RowTypeInfo(Seq( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO)) + + val inputFormat = new RowCsvInputFormat( + new Path(tempFile.toURI.toString), + rowTypeInfo = typeInfo, + includedFieldsMask = Array(true, false, true)) + inputFormat.enableQuotedStringParsing('"') + inputFormat.setFieldDelimiter('|') + inputFormat.setDelimiter('\n') + inputFormat.configure(new Configuration) + + val splits = inputFormat.createInputSplits(1) + inputFormat.open(splits(0)) + + val record = inputFormat.nextRecord(new Row(2)) + assertEquals("20:41:52-1-3-2015", record.productElement(0)) + assertEquals("Blahblah ", record.productElement(1)) + } + + @Test + def testQuotedStringParsingWithEscapedQuotes() { + val fileContent = "\"\\\"Hello\\\" World\"|\"We are\\\" young\"" + val tempFile = File.createTempFile("CsvReaderQuotedString", "tmp") + tempFile.deleteOnExit() + tempFile.setWritable(true) + + val writer = new OutputStreamWriter(new FileOutputStream(tempFile)) + writer.write(fileContent) + writer.close() + + val typeInfo = new RowTypeInfo(Seq( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO)) + + val inputFormat = new RowCsvInputFormat( + new Path(tempFile.toURI.toString), + rowTypeInfo = typeInfo) + inputFormat.enableQuotedStringParsing('"') + inputFormat.setFieldDelimiter('|') + inputFormat.setDelimiter('\n') + inputFormat.configure(new Configuration) + + val splits = inputFormat.createInputSplits(1) + inputFormat.open(splits(0)) + + val record = inputFormat.nextRecord(new Row(2)) + assertEquals("\\\"Hello\\\" World", record.productElement(0)) + assertEquals("We are\\\" young", record.productElement(1)) + } +} + +object RowCsvInputFormatTest { + + private val PATH = new Path("an/ignored/file/") + + // static variables for testing the removal of \r\n to \n + private val FIRST_PART = "That is the first part" + private val SECOND_PART = "That is the second part" + + private def createTempFile(content: String): FileInputSplit = { + val tempFile = File.createTempFile("test_contents", "tmp") + tempFile.deleteOnExit() + val wrt = new OutputStreamWriter(new FileOutputStream(tempFile), StandardCharsets.UTF_8) + wrt.write(content) + wrt.close() + new FileInputSplit( + 0, + new Path(tempFile.toURI.toString), + 0, + tempFile.length, + Array("localhost")) + } + + private def testRemovingTrailingCR(lineBreakerInFile: String, lineBreakerSetup: String) { + val fileContent = FIRST_PART + lineBreakerInFile + SECOND_PART + lineBreakerInFile + + // create input file + val tempFile = File.createTempFile("CsvInputFormatTest", "tmp") + tempFile.deleteOnExit() + tempFile.setWritable(true) + + val wrt = new OutputStreamWriter(new FileOutputStream(tempFile)) + wrt.write(fileContent) + wrt.close() + + val typeInfo = new RowTypeInfo(Seq(BasicTypeInfo.STRING_TYPE_INFO)) + + val inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI.toString), typeInfo) + inputFormat.configure(new Configuration) + inputFormat.setDelimiter(lineBreakerSetup) + + val splits = inputFormat.createInputSplits(1) + inputFormat.open(splits(0)) + + var result = inputFormat.nextRecord(new Row(1)) + assertNotNull("Expecting to not return null", result) + assertEquals(FIRST_PART, result.productElement(0)) + + result = inputFormat.nextRecord(result) + assertNotNull("Expecting to not return null", result) + assertEquals(SECOND_PART, result.productElement(0)) + } +}