From 3d07a97c82c3947935bf13b3f60aafce274a6cff Mon Sep 17 00:00:00 2001 From: Timo Walther Date: Mon, 26 Nov 2018 16:11:04 +0100 Subject: [PATCH] [FLINK-9964][table] Add a full CSV table format factory It adds CsvRowSerializationSchema, CsvRowDeserializationSchema, a new CSV descriptor, CsvRowTableFormatFactory, documentation and tests The format integrates nicely with most SQL types. It deprecates the "old CSV" descriptor stack and prepares also for FLINK-7050 (#4660). The old CSV descriptor is still available under as "OldCsv". This closes #7777. --- docs/dev/table/connect.md | 191 ++++++-- .../kryo/KryoGenericTypeSerializerTest.java | 2 +- .../flink-sql-client-test/pom.xml | 15 + .../apache/flink/table/descriptors/Avro.java | 2 + flink-formats/flink-csv/pom.xml | 62 ++- .../csv/CsvRowDeserializationSchema.java | 407 ++++++++++++------ .../formats/csv/CsvRowFormatFactory.java | 138 +++--- .../formats/csv/CsvRowSchemaConverter.java | 87 ++-- .../csv/CsvRowSerializationSchema.java | 398 ++++++++++------- .../apache/flink/table/descriptors/Csv.java | 180 ++++++++ .../flink/table/descriptors/CsvValidator.java | 69 +++ .../csv/CsvRowDeSerializationSchemaTest.java | 260 +++++++++++ .../csv/CsvRowDeserializationSchemaTest.java | 150 ------- .../formats/csv/CsvRowFormatFactoryTest.java | 116 ++--- .../csv/CsvRowSchemaConverterTest.java | 75 ---- .../csv/CsvRowSerializationSchemaTest.java | 234 ---------- .../flink/table/descriptors/CsvTest.java | 113 +++++ flink-formats/flink-json/pom.xml | 5 - .../json/JsonRowDeserializationSchema.java | 2 +- .../apache/flink/table/descriptors/Json.java | 4 +- flink-formats/pom.xml | 1 + .../descriptors/{Csv.scala => OldCsv.scala} | 81 ++-- ...vValidator.scala => OldCsvValidator.scala} | 37 +- .../table/sinks/CsvTableSinkFactoryBase.scala | 4 +- .../sources/CsvTableSourceFactoryBase.scala | 4 +- .../{CsvTest.scala => OldCsvTest.scala} | 20 +- .../descriptors/TableDescriptorTest.scala | 2 +- .../table/runtime/utils/CommonTestData.scala | 8 +- tools/travis/stage.sh | 2 + 29 files changed, 1611 insertions(+), 1058 deletions(-) create mode 100644 flink-formats/flink-csv/src/main/java/org/apache/flink/table/descriptors/Csv.java create mode 100644 flink-formats/flink-csv/src/main/java/org/apache/flink/table/descriptors/CsvValidator.java create mode 100644 flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java delete mode 100644 flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeserializationSchemaTest.java delete mode 100644 flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSchemaConverterTest.java delete mode 100644 flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSerializationSchemaTest.java create mode 100644 flink-formats/flink-csv/src/test/java/org/apache/flink/table/descriptors/CsvTest.java rename flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/{Csv.scala => OldCsv.scala} (74%) rename flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/{CsvValidator.scala => OldCsvValidator.scala} (57%) rename flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/{CsvTest.scala => OldCsvTest.scala} (88%) diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md index 27c6920fcd7..98e089108f4 100644 --- a/docs/dev/table/connect.md +++ b/docs/dev/table/connect.md @@ -52,11 +52,12 @@ The following tables list all available connectors and formats. Their mutual com ### Formats -| Name | Maven dependency | SQL Client JAR | -| :---------------- | :--------------------------- | :--------------------- | -| CSV | Built-in | Built-in | -| JSON | `flink-json` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar) | -| Apache Avro | `flink-avro` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar) | +| Name | Maven dependency | SQL Client JAR | +| :------------------------- | :--------------------------- | :--------------------- | +| Old CSV (for files) | Built-in | Built-in | +| CSV (for Kafka) | `flink-csv` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-csv/{{site.version}}/flink-csv-{{site.version}}-sql-jar.jar) | +| JSON | `flink-json` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar) | +| Apache Avro | `flink-avro` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar) | {% else %} @@ -708,21 +709,47 @@ A format tag indicates the format type for matching with a connector. ### CSV Format -The CSV format allows to read and write comma-separated rows. +Format: Serialization Schema +Format: Deserialization Schema + +The CSV format aims to comply with [RFC-4180](https://tools.ietf.org/html/rfc4180) ("Common Format and +MIME Type for Comma-Separated Values (CSV) Files") proposed by the Internet Engineering Task Force (IETF). + +The format allows to read and write CSV data that corresponds to a given format schema. The format schema can be +defined either as a Flink type or derived from the desired table schema. + +If the format schema is equal to the table schema, the schema can also be automatically derived. This allows for +defining schema information only once. The names, types, and fields' order of the format are determined by the +table's schema. Time attributes are ignored if their origin is not a field. A `from` definition in the table +schema is interpreted as a field renaming in the format. + +The CSV format can be used as follows:
{% highlight java %} .withFormat( new Csv() - .field("field1", Types.STRING) // required: ordered format fields - .field("field2", Types.TIMESTAMP) - .fieldDelimiter(",") // optional: string delimiter "," by default - .lineDelimiter("\n") // optional: string delimiter "\n" by default - .quoteCharacter('"') // optional: single character for string values, empty by default - .commentPrefix('#') // optional: string to indicate comments, empty by default - .ignoreFirstLine() // optional: ignore the first line, by default it is not skipped - .ignoreParseErrors() // optional: skip records with parse error instead of failing by default + + // required: define the schema either by using type information + .schema(Type.ROW(...)) + + // or use the table's schema + .deriveSchema() + + .fieldDelimiter(';') // optional: field delimiter character (',' by default) + .lineDelimiter("\r\n") // optional: line delimiter ("\n" by default; + // otherwise "\r" or "\r\n" are allowed) + .quoteCharacter('\'') // optional: quote character for enclosing field values ('"' by default) + .allowComments() // optional: ignores comment lines that start with '#' (disabled by default); + // if enabled, make sure to also ignore parse errors to allow empty rows + .ignoreParseErrors() // optional: skip fields and rows with parse errors instead of failing; + // fields are set to null in case of errors + .arrayElementDelimiter("|") // optional: the array element delimiter string for separating + // array and row element values (";" by default) + .escapeCharacter('\\') // optional: escape character for escaping values (disabled by default) + .nullLiteral("n/a") // optional: null literal string that is interpreted as a + // null value (disabled by default) ) {% endhighlight %}
@@ -731,24 +758,76 @@ The CSV format allows to read and write comma-separated rows. {% highlight yaml %} format: type: csv - fields: # required: ordered format fields - - name: field1 - type: VARCHAR - - name: field2 - type: TIMESTAMP - field-delimiter: "," # optional: string delimiter "," by default - line-delimiter: "\n" # optional: string delimiter "\n" by default - quote-character: '"' # optional: single character for string values, empty by default - comment-prefix: '#' # optional: string to indicate comments, empty by default - ignore-first-line: false # optional: boolean flag to ignore the first line, by default it is not skipped - ignore-parse-errors: true # optional: skip records with parse error instead of failing by default + + # required: define the schema either by using type information + schema: "ROW(lon FLOAT, rideTime TIMESTAMP)" + + # or use the table's schema + derive-schema: true + + field-delimiter: ";" # optional: field delimiter character (',' by default) + line-delimiter: "\r\n" # optional: line delimiter ("\n" by default; otherwise "\r" or "\r\n" are allowed) + quote-character: "'" # optional: quote character for enclosing field values ('"' by default) + allow-comments: true # optional: ignores comment lines that start with "#" (disabled by default); + # if enabled, make sure to also ignore parse errors to allow empty rows + ignore-parse-errors: true # optional: skip fields and rows with parse errors instead of failing; + # fields are set to null in case of errors + array-element-delimiter: "|" # optional: the array element delimiter string for separating + # array and row element values (";" by default) + escape-character: "\\" # optional: escape character for escaping values (disabled by default) + null-literal: "n/a" # optional: null literal string that is interpreted as a + # null value (disabled by default) {% endhighlight %}
-The CSV format is included in Flink and does not require additional dependencies. +The following table lists supported types that can be read and written: + +| Supported Flink SQL Types | +| :------------------------ | +| `ROW` | +| `VARCHAR` | +| `ARRAY[_]` | +| `INT` | +| `BIGINT` | +| `FLOAT` | +| `DOUBLE` | +| `BOOLEAN` | +| `DATE` | +| `TIME` | +| `TIMESTAMP` | +| `DECIMAL` | +| `NULL` (unsupported yet) | + +**Numeric types:** Value should be a number but the literal `"null"` can also be understood. An empty string is +considered `null`. Values are also trimmed (leading/trailing white space). Numbers are parsed using +Java's `valueOf` semantics. Other non-numeric strings may cause a parsing exception. + +**String and time types:** Value is not trimmed. The literal `"null"` can also be understood. Time types +must be formatted according to the Java SQL time format with millisecond precision. For example: +`2018-01-01` for date, `20:43:59` for time, and `2018-01-01 20:43:59.999` for timestamp. + +**Boolean type:** Value is expected to be a boolean (`"true"`, `"false"`) string or `"null"`. Empty strings are +interpreted as `false`. Values are trimmed (leading/trailing white space). Other values result in an exception. + +**Nested types:** Array and row types are supported for one level of nesting using the array element delimiter. + +**Primitive byte arrays:** Primitive byte arrays are handled in Base64-encoded representation. + +**Line endings:** Line endings need to be considered even for row-based connectors (such as Kafka) +to be ignored for unquoted string fields at the end of a row. + +**Escaping and quoting:** The following table shows examples of how escaping and quoting affect the parsing +of a string using `*` for escaping and `'` for quoting: + +| CSV Field | Parsed String | +| :---------------- | :------------------- | +| `123*'4**` | `123'4*` | +| `'123''4**'` | `123'4*` | +| `'a;b*'c'` | `a;b'c` | +| `'a;b''c'` | `a;b'c` | -Attention The CSV format for writing rows is limited at the moment. Only a custom field delimiter is supported as optional parameter. +Make sure to add the CSV format as a dependency. ### JSON Format @@ -757,7 +836,9 @@ The CSV format is included in Flink and does not require additional dependencies The JSON format allows to read and write JSON data that corresponds to a given format schema. The format schema can be defined either as a Flink type, as a JSON schema, or derived from the desired table schema. A Flink type enables a more SQL-like definition and mapping to the corresponding SQL data types. The JSON schema allows for more complex and nested structures. -If the format schema is equal to the table schema, the schema can also be automatically derived. This allows for defining schema information only once. The names, types, and field order of the format are determined by the table's schema. Time attributes are ignored if their origin is not a field. A `from` definition in the table schema is interpreted as a field renaming in the format. +If the format schema is equal to the table schema, the schema can also be automatically derived. This allows for defining schema information only once. The names, types, and fields' order of the format are determined by the table's schema. Time attributes are ignored if their origin is not a field. A `from` definition in the table schema is interpreted as a field renaming in the format. + +The JSON format can be used as follows:
@@ -837,7 +918,6 @@ The following table shows the mapping of JSON schema types to Flink SQL types: | `string` with `encoding: base64` | `ARRAY[TINYINT]` | | `null` | `NULL` (unsupported yet)| - Currently, Flink supports only a subset of the [JSON schema specification](http://json-schema.org/) `draft-07`. Union types (as well as `allOf`, `anyOf`, `not`) are not supported yet. `oneOf` and arrays of types are only supported for specifying nullability. Simple references that link to a common definition in the document are supported as shown in the more complex example below: @@ -891,7 +971,6 @@ Simple references that link to a common definition in the document are supported Make sure to add the JSON format as a dependency. - ### Apache Avro Format Format: Serialization Schema @@ -899,6 +978,8 @@ Make sure to add the JSON format as a dependency. The [Apache Avro](https://avro.apache.org/) format allows to read and write Avro data that corresponds to a given format schema. The format schema can be defined either as a fully qualified class name of an Avro specific record or as an Avro schema string. If a class name is used, the class must be available in the classpath during runtime. +The Avro format can be used as follows: +
{% highlight java %} @@ -975,6 +1056,56 @@ Avro uses [Joda-Time](http://www.joda.org/joda-time/) for representing logical d Make sure to add the Apache Avro dependency. +### Old CSV Format + +Attention For prototyping purposes only! + +The old CSV format allows to read and write comma-separated rows using the filesystem connector. + +This format describes Flink's non-standard CSV table source/sink. In the future, the format will be +replaced by a proper RFC-compliant version. Use the RFC-compliant CSV format when writing to Kafka. +Use the old one for stream/batch filesystem operations for now. + +
+
+{% highlight java %} +.withFormat( + new OldCsv() + .field("field1", Types.STRING) // required: ordered format fields + .field("field2", Types.TIMESTAMP) + .fieldDelimiter(",") // optional: string delimiter "," by default + .lineDelimiter("\n") // optional: string delimiter "\n" by default + .quoteCharacter('"') // optional: single character for string values, empty by default + .commentPrefix('#') // optional: string to indicate comments, empty by default + .ignoreFirstLine() // optional: ignore the first line, by default it is not skipped + .ignoreParseErrors() // optional: skip records with parse error instead of failing by default +) +{% endhighlight %} +
+ +
+{% highlight yaml %} +format: + type: csv + fields: # required: ordered format fields + - name: field1 + type: VARCHAR + - name: field2 + type: TIMESTAMP + field-delimiter: "," # optional: string delimiter "," by default + line-delimiter: "\n" # optional: string delimiter "\n" by default + quote-character: '"' # optional: single character for string values, empty by default + comment-prefix: '#' # optional: string to indicate comments, empty by default + ignore-first-line: false # optional: boolean flag to ignore the first line, by default it is not skipped + ignore-parse-errors: true # optional: skip records with parse error instead of failing by default +{% endhighlight %} +
+
+ +The old CSV format is included in Flink and does not require additional dependencies. + +Attention The old CSV format for writing rows is limited at the moment. Only a custom field delimiter is supported as optional parameter. + {% top %} Further TableSources and TableSinks diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java index 12090f8814f..69bd5e21adb 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java @@ -165,4 +165,4 @@ public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializer Kryo kryo = serializer.getKryo(); assertTrue(kryo.getReferences()); } -} \ No newline at end of file +} diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml index ad54b3a6625..0e72c097ccd 100644 --- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml +++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml @@ -60,6 +60,14 @@ under the License. sql-jar provided + + + org.apache.flink + flink-csv + ${project.version} + sql-jar + provided + org.apache.flink @@ -159,6 +167,13 @@ under the License. sql-jar jar + + org.apache.flink + flink-csv + ${project.version} + sql-jar + jar + - flink-table_2.11 + flink-core ${project.version} provided - - true - - com.fasterxml.jackson.dataformat - jackson-dataformat-csv - 2.7.9 + org.apache.flink + flink-table-common + ${project.version} + provided + org.apache.flink - flink-test-utils-junit + flink-table-common + ${project.version} + test + test-jar + org.apache.flink - - flink-table_2.11 + flink-table-planner_${scala.binary.version} ${project.version} - test-jar test - + org.scala-lang scala-compiler test + + + + + sql-jars + + + !skipSqlJars + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + package + + jar + + + sql-jar + + + + + + + + diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java index 7e328eef545..8cfcab26a8d 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java @@ -18,72 +18,153 @@ package org.apache.flink.formats.csv; -import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectReader; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.NullNode; -import com.fasterxml.jackson.databind.node.TextNode; -import com.fasterxml.jackson.dataformat.csv.CsvMapper; -import com.fasterxml.jackson.dataformat.csv.CsvSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; import java.io.IOException; -import java.io.UnsupportedEncodingException; +import java.io.Serializable; import java.lang.reflect.Array; import java.math.BigDecimal; import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Objects; /** * Deserialization schema from CSV to Flink types. * *

Deserializes a byte[] message as a {@link JsonNode} and - * convert it to {@link Row}. + * converts it to {@link Row}. * - *

Failure during deserialization are forwarded as wrapped IOExceptions. + *

Failure during deserialization are forwarded as wrapped {@link IOException}s. */ -@Public +@PublicEvolving public final class CsvRowDeserializationSchema implements DeserializationSchema { - /** Schema describing the input csv data. */ - private CsvSchema csvSchema; + private static final long serialVersionUID = 2135553495874539201L; - /** Type information describing the input csv data. */ - private TypeInformation rowTypeInfo; + /** Type information describing the result type. */ + private final TypeInformation typeInfo; - /** ObjectReader used to read message, it will be changed when csvSchema is changed. */ - private ObjectReader objectReader; + /** Runtime instance that performs the actual work. */ + private final RuntimeConverter runtimeConverter; - /** Charset for byte[]. */ - private String charset = "UTF-8"; + /** Schema describing the input CSV data. */ + private final CsvSchema csvSchema; + + /** Object reader used to read rows. It is configured by {@link CsvSchema}. */ + private final ObjectReader objectReader; + + /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */ + private final boolean ignoreParseErrors; + + private CsvRowDeserializationSchema( + RowTypeInfo typeInfo, + CsvSchema csvSchema, + boolean ignoreParseErrors) { + this.typeInfo = typeInfo; + this.runtimeConverter = createRowRuntimeConverter(typeInfo, ignoreParseErrors, true); + this.csvSchema = csvSchema; + this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema); + this.ignoreParseErrors = ignoreParseErrors; + } /** - * Create a csv row DeserializationSchema with given {@link TypeInformation}. + * A builder for creating a {@link CsvRowDeserializationSchema}. */ - public CsvRowDeserializationSchema(TypeInformation rowTypeInfo) { - Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not be null !"); - CsvMapper csvMapper = new CsvMapper(); - this.rowTypeInfo = rowTypeInfo; - this.csvSchema = CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo); - this.objectReader = csvMapper.readerFor(JsonNode.class).with(csvSchema); - this.setNullValue("null"); + @PublicEvolving + public static class Builder { + + private final RowTypeInfo typeInfo; + private CsvSchema csvSchema; + private boolean ignoreParseErrors; + + /** + * Creates a CSV deserialization schema for the given {@link TypeInformation} with + * optional parameters. + */ + public Builder(TypeInformation typeInfo) { + Preconditions.checkNotNull(typeInfo, "Type information must not be null."); + + if (!(typeInfo instanceof RowTypeInfo)) { + throw new IllegalArgumentException("Row type information expected."); + } + + this.typeInfo = (RowTypeInfo) typeInfo; + this.csvSchema = CsvRowSchemaConverter.convert((RowTypeInfo) typeInfo); + } + + public Builder setFieldDelimiter(char delimiter) { + this.csvSchema = this.csvSchema.rebuild().setColumnSeparator(delimiter).build(); + return this; + } + + public Builder setAllowComments(boolean allowComments) { + this.csvSchema = this.csvSchema.rebuild().setAllowComments(allowComments).build(); + return this; + } + + public Builder setArrayElementDelimiter(String delimiter) { + Preconditions.checkNotNull(delimiter, "Array element delimiter must not be null."); + this.csvSchema = this.csvSchema.rebuild().setArrayElementSeparator(delimiter).build(); + return this; + } + + public Builder setQuoteCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setQuoteChar(c).build(); + return this; + } + + public Builder setEscapeCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setEscapeChar(c).build(); + return this; + } + + public Builder setNullLiteral(String nullLiteral) { + Preconditions.checkNotNull(nullLiteral, "Null literal must not be null."); + this.csvSchema = this.csvSchema.rebuild().setNullValue(nullLiteral).build(); + return this; + } + + public Builder setIgnoreParseErrors(boolean ignoreParseErrors) { + this.ignoreParseErrors = ignoreParseErrors; + return this; + } + + public CsvRowDeserializationSchema build() { + return new CsvRowDeserializationSchema( + typeInfo, + csvSchema, + ignoreParseErrors); + } } @Override public Row deserialize(byte[] message) throws IOException { - JsonNode root = objectReader.readValue(message); - return convertRow(root, (RowTypeInfo) rowTypeInfo); + try { + final JsonNode root = objectReader.readValue(message); + return (Row) runtimeConverter.convert(root); + } catch (Throwable t) { + if (ignoreParseErrors) { + return null; + } + throw new IOException("Failed to deserialize CSV row '" + new String(message) + "'.", t); + } } @Override @@ -93,133 +174,195 @@ public final class CsvRowDeserializationSchema implements DeserializationSchema< @Override public TypeInformation getProducedType() { - return rowTypeInfo; + return typeInfo; } - private Row convertRow(JsonNode root, RowTypeInfo rowTypeInfo) { - String[] fields = rowTypeInfo.getFieldNames(); - TypeInformation[] types = rowTypeInfo.getFieldTypes(); - Row row = new Row(fields.length); - - for (int i = 0; i < fields.length; i++) { - String columnName = fields[i]; - JsonNode node = root.get(columnName); - row.setField(i, convert(node, types[i])); + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || o.getClass() != this.getClass()) { + return false; } - return row; + final CsvRowDeserializationSchema that = (CsvRowDeserializationSchema) o; + final CsvSchema otherSchema = that.csvSchema; + + return typeInfo.equals(that.typeInfo) && + ignoreParseErrors == that.ignoreParseErrors && + csvSchema.getColumnSeparator() == otherSchema.getColumnSeparator() && + csvSchema.allowsComments() == otherSchema.allowsComments() && + csvSchema.getArrayElementSeparator().equals(otherSchema.getArrayElementSeparator()) && + csvSchema.getQuoteChar() == otherSchema.getQuoteChar() && + csvSchema.getEscapeChar() == otherSchema.getEscapeChar() && + Arrays.equals(csvSchema.getNullValue(), otherSchema.getNullValue()); } - private Row convertRow(ArrayNode node, RowTypeInfo rowTypeInfo) { - TypeInformation[] types = rowTypeInfo.getFieldTypes(); - String[] fields = rowTypeInfo.getFieldNames(); - Row row = new Row(fields.length); - for (int i = 0; i < fields.length; i++) { - row.setField(i, convert(node.get(i), types[i])); - } - return row; + @Override + public int hashCode() { + return Objects.hash( + typeInfo, + ignoreParseErrors, + csvSchema.getColumnSeparator(), + csvSchema.allowsComments(), + csvSchema.getArrayElementSeparator(), + csvSchema.getQuoteChar(), + csvSchema.getEscapeChar(), + csvSchema.getNullValue()); } - /** - * Converts json node to object with given type information. - * @param node json node to be converted. - * @param info type information for the json data. - * @return converted object - */ - private Object convert(JsonNode node, TypeInformation info) { - if (node instanceof NullNode) { - return null; - } - if (info == Types.STRING) { - return node.asText(); - } else if (info == Types.LONG) { - return node.asLong(); - } else if (info == Types.INT) { - return node.asInt(); - } else if (info == Types.DOUBLE) { - return node.asDouble(); - } else if (info == Types.FLOAT) { - return Double.valueOf(node.asDouble()).floatValue(); - } else if (info == Types.BIG_DEC) { - return BigDecimal.valueOf(node.asDouble()); - } else if (info == Types.BIG_INT) { - return BigInteger.valueOf(node.asLong()); - } else if (info == Types.SQL_DATE) { - return Date.valueOf(node.asText()); - } else if (info == Types.SQL_TIME) { - return Time.valueOf(node.asText()); - } else if (info == Types.SQL_TIMESTAMP) { - return Timestamp.valueOf(node.asText()); - } else if (info == Types.BOOLEAN) { - return node.asBoolean(); - } else if (info instanceof RowTypeInfo) { - return convertRow((ArrayNode) node, (RowTypeInfo) info); - } else if (info instanceof BasicArrayTypeInfo) { - return convertArray((ArrayNode) node, ((BasicArrayTypeInfo) info).getComponentInfo()); - } else if (info instanceof PrimitiveArrayTypeInfo && - ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { - return convertByteArray((TextNode) node); - } else { - throw new RuntimeException("Unable to support type " + info.toString() + " yet"); - } + // -------------------------------------------------------------------------------------------- + + private interface RuntimeConverter extends Serializable { + Object convert(JsonNode node); } - private Object[] convertArray(ArrayNode node, TypeInformation elementType) { - final Object[] array = (Object[]) Array.newInstance(elementType.getTypeClass(), node.size()); - for (int i = 0; i < node.size(); i++) { - array[i] = convert(node.get(i), elementType); - } - return array; + private static RuntimeConverter createRowRuntimeConverter( + RowTypeInfo rowTypeInfo, + boolean ignoreParseErrors, + boolean isTopLevel) { + final TypeInformation[] fieldTypes = rowTypeInfo.getFieldTypes(); + final String[] fieldNames = rowTypeInfo.getFieldNames(); + + final RuntimeConverter[] fieldConverters = + createFieldRuntimeConverters(ignoreParseErrors, fieldTypes); + + return assembleRowRuntimeConverter(ignoreParseErrors, isTopLevel, fieldNames, fieldConverters); } - private byte[] convertByteArray(TextNode node) { - try { - return node.asText().getBytes(charset); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException("Unsupport encoding charset" + charset, e); + private static RuntimeConverter[] createFieldRuntimeConverters(boolean ignoreParseErrors, TypeInformation[] fieldTypes) { + final RuntimeConverter[] fieldConverters = new RuntimeConverter[fieldTypes.length]; + for (int i = 0; i < fieldTypes.length; i++) { + fieldConverters[i] = createNullableRuntimeConverter(fieldTypes[i], ignoreParseErrors); } + return fieldConverters; } - public void setCharset(String charset) { - this.charset = charset; - } + private static RuntimeConverter assembleRowRuntimeConverter( + boolean ignoreParseErrors, + boolean isTopLevel, + String[] fieldNames, + RuntimeConverter[] fieldConverters) { + final int rowArity = fieldNames.length; - public void setFieldDelimiter(String s) { - if (s.length() != 1) { - throw new RuntimeException("FieldDelimiter's length must be one !"); - } - this.csvSchema = this.csvSchema.rebuild().setColumnSeparator(s.charAt(0)).build(); + return (node) -> { + final int nodeSize = node.size(); + + validateArity(rowArity, nodeSize, ignoreParseErrors); + + final Row row = new Row(rowArity); + for (int i = 0; i < Math.min(rowArity, nodeSize); i++) { + // Jackson only supports mapping by name in the first level + if (isTopLevel) { + row.setField(i, fieldConverters[i].convert(node.get(fieldNames[i]))); + } else { + row.setField(i, fieldConverters[i].convert(node.get(i))); + } + } + return row; + }; } - public void setArrayElementDelimiter(String s) { - this.csvSchema = this.csvSchema.rebuild().setArrayElementSeparator(s).build(); - this.objectReader = objectReader.with(csvSchema); + private static RuntimeConverter createNullableRuntimeConverter( + TypeInformation info, + boolean ignoreParseErrors) { + final RuntimeConverter valueConverter = createRuntimeConverter(info, ignoreParseErrors); + return (node) -> { + if (node.isNull()) { + return null; + } + try { + return valueConverter.convert(node); + } catch (Throwable t) { + if (!ignoreParseErrors) { + throw t; + } + return null; + } + }; } - public void setQuoteCharacter(char c) { - this.csvSchema = this.csvSchema.rebuild().setQuoteChar(c).build(); - this.objectReader = objectReader.with(csvSchema); + private static RuntimeConverter createRuntimeConverter(TypeInformation info, boolean ignoreParseErrors) { + if (info.equals(Types.VOID)) { + return (node) -> null; + } else if (info.equals(Types.STRING)) { + return JsonNode::asText; + } else if (info.equals(Types.BOOLEAN)) { + return (node) -> Boolean.valueOf(node.asText().trim()); + } else if (info.equals(Types.BYTE)) { + return (node) -> Byte.valueOf(node.asText().trim()); + } else if (info.equals(Types.SHORT)) { + return (node) -> Short.valueOf(node.asText().trim()); + } else if (info.equals(Types.INT)) { + return (node) -> Integer.valueOf(node.asText().trim()); + } else if (info.equals(Types.LONG)) { + return (node) -> Long.valueOf(node.asText().trim()); + } else if (info.equals(Types.FLOAT)) { + return (node) -> Float.valueOf(node.asText().trim()); + } else if (info.equals(Types.DOUBLE)) { + return (node) -> Double.valueOf(node.asText().trim()); + } else if (info.equals(Types.BIG_DEC)) { + return (node) -> new BigDecimal(node.asText().trim()); + } else if (info.equals(Types.BIG_INT)) { + return (node) -> new BigInteger(node.asText().trim()); + } else if (info.equals(Types.SQL_DATE)) { + return (node) -> Date.valueOf(node.asText()); + } else if (info.equals(Types.SQL_TIME)) { + return (node) -> Time.valueOf(node.asText()); + } else if (info.equals(Types.SQL_TIMESTAMP)) { + return (node) -> Timestamp.valueOf(node.asText()); + } else if (info instanceof RowTypeInfo) { + final RowTypeInfo rowTypeInfo = (RowTypeInfo) info; + return createRowRuntimeConverter(rowTypeInfo, ignoreParseErrors, false); + } else if (info instanceof BasicArrayTypeInfo) { + return createObjectArrayRuntimeConverter( + ((BasicArrayTypeInfo) info).getComponentInfo(), + ignoreParseErrors); + } else if (info instanceof ObjectArrayTypeInfo) { + return createObjectArrayRuntimeConverter( + ((ObjectArrayTypeInfo) info).getComponentInfo(), + ignoreParseErrors); + } else if (info instanceof PrimitiveArrayTypeInfo && + ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { + return createByteArrayRuntimeConverter(ignoreParseErrors); + } else { + throw new RuntimeException("Unsupported type information '" + info + "'."); + } } - public void setEscapeCharacter(char c) { - this.csvSchema = this.csvSchema.rebuild().setEscapeChar(c).build(); - this.objectReader = objectReader.with(csvSchema); + private static RuntimeConverter createObjectArrayRuntimeConverter( + TypeInformation elementType, + boolean ignoreParseErrors) { + final Class elementClass = elementType.getTypeClass(); + final RuntimeConverter elementConverter = createNullableRuntimeConverter(elementType, ignoreParseErrors); + + return (node) -> { + final int nodeSize = node.size(); + final Object[] array = (Object[]) Array.newInstance(elementClass, nodeSize); + for (int i = 0; i < nodeSize; i++) { + array[i] = elementConverter.convert(node.get(i)); + } + return array; + }; } - public void setNullValue(String s) { - this.csvSchema = this.csvSchema.rebuild().setNullValue(s).build(); - this.objectReader = objectReader.with(csvSchema); + private static RuntimeConverter createByteArrayRuntimeConverter(boolean ignoreParseErrors) { + return (node) -> { + try { + return node.binaryValue(); + } catch (IOException e) { + if (!ignoreParseErrors) { + throw new RuntimeException("Unable to deserialize byte array.", e); + } + return null; + } + }; } - @Override - public boolean equals(Object o) { - if (o == null || o.getClass() != this.getClass()) { - return false; + private static void validateArity(int expected, int actual, boolean ignoreParseErrors) { + if (expected != actual && !ignoreParseErrors) { + throw new RuntimeException("Row length mismatch. " + expected + + " fields expected but was " + actual + "."); } - if (this == o) { - return true; - } - final CsvRowDeserializationSchema that = (CsvRowDeserializationSchema) o; - return rowTypeInfo.equals(that.rowTypeInfo) && - csvSchema.toString().equals(that.csvSchema.toString()); } } diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowFormatFactory.java index e9c65bb9f5a..7816faf0325 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowFormatFactory.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowFormatFactory.java @@ -21,123 +21,117 @@ package org.apache.flink.formats.csv; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.descriptors.CsvValidator; import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.FormatDescriptorValidator; -import org.apache.flink.table.descriptors.SchemaValidator; import org.apache.flink.table.factories.DeserializationSchemaFactory; import org.apache.flink.table.factories.SerializationSchemaFactory; +import org.apache.flink.table.factories.TableFormatFactoryBase; import org.apache.flink.types.Row; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; /** - * Table format for providing configured instances of CSV-to-row {@link SerializationSchema} + * Table format factory for providing configured instances of CSV-to-row {@link SerializationSchema} * and {@link DeserializationSchema}. */ -public final class CsvRowFormatFactory implements SerializationSchemaFactory, - DeserializationSchemaFactory { +public final class CsvRowFormatFactory extends TableFormatFactoryBase + implements SerializationSchemaFactory, DeserializationSchemaFactory { - @Override - public Map requiredContext() { - final Map context = new HashMap<>(); - context.put(FormatDescriptorValidator.FORMAT_TYPE(), CsvValidator.FORMAT_TYPE_VALUE()); - context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION(), "1"); - return context; + public CsvRowFormatFactory() { + super(CsvValidator.FORMAT_TYPE_VALUE, 1, true); } @Override - public boolean supportsSchemaDerivation() { - return true; - } - - @Override - public List supportedProperties() { + public List supportedFormatProperties() { final List properties = new ArrayList<>(); - properties.add(CsvValidator.FORMAT_FIELDS() + ".#." + DescriptorProperties.TYPE()); - properties.add(CsvValidator.FORMAT_FIELDS() + ".#." + DescriptorProperties.NAME()); - properties.add(CsvValidator.FORMAT_FIELD_DELIMITER()); - properties.add(CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER()); - properties.add(CsvValidator.FORMAT_QUOTE_CHARACTER()); - properties.add(CsvValidator.FORMAT_ESCAPE_CHARACTER()); - properties.add(CsvValidator.FORMAT_BYTES_CHARSET()); - properties.add(CsvValidator.FORMAT_NULL_VALUE()); - properties.add(SchemaValidator.SCHEMA() + ".#." + SchemaValidator.SCHEMA_TYPE()); - properties.add(SchemaValidator.SCHEMA() + ".#." + SchemaValidator.SCHEMA_NAME()); + properties.add(CsvValidator.FORMAT_FIELD_DELIMITER); + properties.add(CsvValidator.FORMAT_LINE_DELIMITER); + properties.add(CsvValidator.FORMAT_QUOTE_CHARACTER); + properties.add(CsvValidator.FORMAT_ALLOW_COMMENTS); + properties.add(CsvValidator.FORMAT_IGNORE_PARSE_ERRORS); + properties.add(CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER); + properties.add(CsvValidator.FORMAT_ESCAPE_CHARACTER); + properties.add(CsvValidator.FORMAT_NULL_LITERAL); + properties.add(CsvValidator.FORMAT_SCHEMA); return properties; } @Override public DeserializationSchema createDeserializationSchema(Map properties) { - final DescriptorProperties descriptorProperties = validateAndGetProperties(properties); + final DescriptorProperties descriptorProperties = getValidatedProperties(properties); - final CsvRowDeserializationSchema schema = new CsvRowDeserializationSchema( + final CsvRowDeserializationSchema.Builder schemaBuilder = new CsvRowDeserializationSchema.Builder( createTypeInformation(descriptorProperties)); - // update csv schema with properties - descriptorProperties.getOptionalString(CsvValidator.FORMAT_FIELD_DELIMITER()) - .ifPresent(schema::setFieldDelimiter); - descriptorProperties.getOptionalString(CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER()) - .ifPresent(schema::setArrayElementDelimiter); - descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_QUOTE_CHARACTER()) - .ifPresent(schema::setQuoteCharacter); - descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_ESCAPE_CHARACTER()) - .ifPresent(schema::setEscapeCharacter); - descriptorProperties.getOptionalString(CsvValidator.FORMAT_BYTES_CHARSET()) - .ifPresent(schema::setCharset); - descriptorProperties.getOptionalString(CsvValidator.FORMAT_NULL_VALUE()) - .ifPresent(schema::setCharset); - - return new CsvRowDeserializationSchema(createTypeInformation(descriptorProperties)); + descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_FIELD_DELIMITER) + .ifPresent(schemaBuilder::setFieldDelimiter); + + descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_QUOTE_CHARACTER) + .ifPresent(schemaBuilder::setQuoteCharacter); + + descriptorProperties.getOptionalBoolean(CsvValidator.FORMAT_ALLOW_COMMENTS) + .ifPresent(schemaBuilder::setAllowComments); + + descriptorProperties.getOptionalBoolean(CsvValidator.FORMAT_IGNORE_PARSE_ERRORS) + .ifPresent(schemaBuilder::setIgnoreParseErrors); + + descriptorProperties.getOptionalString(CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER) + .ifPresent(schemaBuilder::setArrayElementDelimiter); + + descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_ESCAPE_CHARACTER) + .ifPresent(schemaBuilder::setEscapeCharacter); + + descriptorProperties.getOptionalString(CsvValidator.FORMAT_NULL_LITERAL) + .ifPresent(schemaBuilder::setNullLiteral); + + return schemaBuilder.build(); } @Override public SerializationSchema createSerializationSchema(Map properties) { - final DescriptorProperties descriptorProperties = validateAndGetProperties(properties); + final DescriptorProperties descriptorProperties = getValidatedProperties(properties); - final CsvRowSerializationSchema schema = new CsvRowSerializationSchema( + final CsvRowSerializationSchema.Builder schemaBuilder = new CsvRowSerializationSchema.Builder( createTypeInformation(descriptorProperties)); - // update csv schema with properties - descriptorProperties.getOptionalString(CsvValidator.FORMAT_FIELD_DELIMITER()) - .ifPresent(schema::setFieldDelimiter); - descriptorProperties.getOptionalString(CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER()) - .ifPresent(schema::setArrayElementDelimiter); - descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_QUOTE_CHARACTER()) - .ifPresent(schema::setQuoteCharacter); - descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_ESCAPE_CHARACTER()) - .ifPresent(schema::setEscapeCharacter); - descriptorProperties.getOptionalString(CsvValidator.FORMAT_BYTES_CHARSET()) - .ifPresent(schema::setCharset); - descriptorProperties.getOptionalString(CsvValidator.FORMAT_NULL_VALUE()) - .ifPresent(schema::setCharset); - - return new CsvRowSerializationSchema(createTypeInformation(descriptorProperties)); + descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_FIELD_DELIMITER) + .ifPresent(schemaBuilder::setFieldDelimiter); + + descriptorProperties.getOptionalString(CsvValidator.FORMAT_LINE_DELIMITER) + .ifPresent(schemaBuilder::setLineDelimiter); + + descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_QUOTE_CHARACTER) + .ifPresent(schemaBuilder::setQuoteCharacter); + + descriptorProperties.getOptionalString(CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER) + .ifPresent(schemaBuilder::setArrayElementDelimiter); + + descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_ESCAPE_CHARACTER) + .ifPresent(schemaBuilder::setEscapeCharacter); + + descriptorProperties.getOptionalString(CsvValidator.FORMAT_NULL_LITERAL) + .ifPresent(schemaBuilder::setNullLiteral); + + return schemaBuilder.build(); } - private static DescriptorProperties validateAndGetProperties(Map propertiesMap) { + private static DescriptorProperties getValidatedProperties(Map propertiesMap) { final DescriptorProperties descriptorProperties = new DescriptorProperties(true); descriptorProperties.putProperties(propertiesMap); - // validate new CsvValidator().validate(descriptorProperties); return descriptorProperties; } - /** - * Create a {@link TypeInformation} based on the "format-fields" in {@link CsvValidator}. - * @param descriptorProperties descriptor properties - * @return {@link TypeInformation} - */ private static TypeInformation createTypeInformation(DescriptorProperties descriptorProperties) { - if (descriptorProperties.getOptionalTableSchema(CsvValidator.FORMAT_FIELDS()).isPresent()) { - return descriptorProperties.getTableSchema(CsvValidator.FORMAT_FIELDS()).toRowType(); + if (descriptorProperties.containsKey(CsvValidator.FORMAT_SCHEMA)) { + return (RowTypeInfo) descriptorProperties.getType(CsvValidator.FORMAT_SCHEMA); } else { - return SchemaValidator.deriveFormatFields(descriptorProperties).toRowType(); + return deriveSchema(descriptorProperties.asMap()).toRowType(); } } } diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java index 1476159d643..a9f03c56474 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java @@ -25,77 +25,108 @@ import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; -import com.fasterxml.jackson.dataformat.csv.CsvSchema; -import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder; -import com.fasterxml.jackson.dataformat.csv.CsvSchema.Column; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema.Column; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; /** - * Converting functions that related to {@link CsvSchema}. - * In {@link CsvSchema}, there are four types(string,number,boolean - * and array), in order to satisfy various flink types, this class - * sorts out instances of {@link TypeInformation} and convert them to - * one of CsvSchema's types. + * Converter functions that covert Flink's type information to Jackson's {@link CsvSchema}. + * + *

In {@link CsvSchema}, there are four types (string, number, boolean, and array). In order + * to satisfy various Flink types, this class sorts out instances of {@link TypeInformation} that + * are not supported. It converts supported types to one of CsvSchema's types. + * + *

Note: Changes in this class need to be kept in sync with the corresponding runtime + * classes {@link CsvRowDeserializationSchema} and {@link CsvRowSerializationSchema}. */ public final class CsvRowSchemaConverter { + private CsvRowSchemaConverter() { + // private + } + /** * Types that can be converted to ColumnType.NUMBER. + * + *

From Jackson: Value should be a number, but literals "null", "true" and "false" are also + * understood, and an empty String is considered null. Values are also trimmed (leading/trailing + * white space). Other non-numeric Strings may cause parsing exception. */ private static final HashSet> NUMBER_TYPES = - new HashSet<>(Arrays.asList(Types.LONG, Types.INT, Types.DOUBLE, Types.FLOAT, - Types.BIG_DEC, Types.BIG_INT)); + new HashSet<>(Arrays.asList(Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.DOUBLE, + Types.FLOAT, Types.BIG_DEC, Types.BIG_INT)); /** * Types that can be converted to ColumnType.STRING. + * + *

From Jackson: Default type if not explicitly defined; no type-inference is performed, + * and value is not trimmed. */ private static final HashSet> STRING_TYPES = - new HashSet<>(Arrays.asList(Types.STRING, Types.SQL_DATE, - Types.SQL_TIME, Types.SQL_TIMESTAMP)); + new HashSet<>(Arrays.asList(Types.STRING, Types.SQL_DATE, Types.SQL_TIME, Types.SQL_TIMESTAMP)); /** * Types that can be converted to ColumnType.BOOLEAN. + * + *

From Jackson: Value is expected to be a boolean ("true", "false") String, or "null", or + * empty String (equivalent to null). Values are trimmed (leading/trailing white space). + * Values other than indicated above may result in an exception. */ private static final HashSet> BOOLEAN_TYPES = - new HashSet<>(Collections.singletonList(Types.BOOLEAN)); + new HashSet<>(Arrays.asList(Types.BOOLEAN, Types.VOID)); /** * Convert {@link RowTypeInfo} to {@link CsvSchema}. */ - public static CsvSchema rowTypeToCsvSchema(RowTypeInfo rowType) { - Builder builder = new CsvSchema.Builder(); - String[] fields = rowType.getFieldNames(); - TypeInformation[] infos = rowType.getFieldTypes(); + public static CsvSchema convert(RowTypeInfo rowType) { + final Builder builder = new CsvSchema.Builder(); + final String[] fields = rowType.getFieldNames(); + final TypeInformation[] types = rowType.getFieldTypes(); for (int i = 0; i < rowType.getArity(); i++) { - builder.addColumn(new Column(i, fields[i], convertType(infos[i]))); + builder.addColumn(new Column(i, fields[i], convertType(fields[i], types[i]))); } return builder.build(); } /** - * Convert {@link TypeInformation} to {@link CsvSchema.ColumnType} - * based on their catogories. + * Convert {@link TypeInformation} to {@link CsvSchema.ColumnType} based on Jackson's categories. */ - private static CsvSchema.ColumnType convertType(TypeInformation info) { + private static CsvSchema.ColumnType convertType(String fieldName, TypeInformation info) { if (STRING_TYPES.contains(info)) { return CsvSchema.ColumnType.STRING; } else if (NUMBER_TYPES.contains(info)) { return CsvSchema.ColumnType.NUMBER; } else if (BOOLEAN_TYPES.contains(info)) { return CsvSchema.ColumnType.BOOLEAN; - } else if (info instanceof ObjectArrayTypeInfo - || info instanceof BasicArrayTypeInfo - || info instanceof RowTypeInfo) { + } else if (info instanceof ObjectArrayTypeInfo) { + validateNestedField(fieldName, ((ObjectArrayTypeInfo) info).getComponentInfo()); + return CsvSchema.ColumnType.ARRAY; + } else if (info instanceof BasicArrayTypeInfo) { + validateNestedField(fieldName, ((BasicArrayTypeInfo) info).getComponentInfo()); + return CsvSchema.ColumnType.ARRAY; + } else if (info instanceof RowTypeInfo) { + final TypeInformation[] types = ((RowTypeInfo) info).getFieldTypes(); + for (TypeInformation type : types) { + validateNestedField(fieldName, type); + } return CsvSchema.ColumnType.ARRAY; } else if (info instanceof PrimitiveArrayTypeInfo && - ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { + ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { return CsvSchema.ColumnType.STRING; } else { - throw new RuntimeException("Unable to support " + info.toString() - + " yet"); + throw new IllegalArgumentException( + "Unsupported type information '" + info.toString() + "' for field '" + fieldName + "'."); + } + } + + private static void validateNestedField(String fieldName, TypeInformation info) { + if (!NUMBER_TYPES.contains(info) && !STRING_TYPES.contains(info) && !BOOLEAN_TYPES.contains(info)) { + throw new IllegalArgumentException( + "Only simple types are supported in the second level nesting of fields '" + + fieldName + "' but was: " + info); } } } diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java index f8ced687dea..a65554f8cab 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java @@ -18,30 +18,30 @@ package org.apache.flink.formats.csv; -import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ContainerNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.dataformat.csv.CsvMapper; -import com.fasterxml.jackson.dataformat.csv.CsvSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ContainerNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; -import java.io.UnsupportedEncodingException; +import java.io.Serializable; import java.math.BigDecimal; import java.math.BigInteger; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Objects; /** * Serialization schema that serializes an object of Flink types into a CSV bytes. @@ -51,33 +51,105 @@ import java.sql.Timestamp; * *

Result byte[] messages can be deserialized using {@link CsvRowDeserializationSchema}. */ -@Public +@PublicEvolving public final class CsvRowSerializationSchema implements SerializationSchema { - /** Schema describing the input csv data. */ - private CsvSchema csvSchema; + private static final long serialVersionUID = 2098447220136965L; - /** Type information describing the input csv data. */ - private TypeInformation rowTypeInfo; + /** Type information describing the input CSV data. */ + private final RowTypeInfo typeInfo; + + /** Runtime instance that performs the actual work. */ + private final RuntimeConverter runtimeConverter; /** CsvMapper used to write {@link JsonNode} into bytes. */ - private CsvMapper csvMapper = new CsvMapper(); + private final CsvMapper csvMapper; + + /** Schema describing the input CSV data. */ + private final CsvSchema csvSchema; + + /** Object writer used to write rows. It is configured by {@link CsvSchema}. */ + private final ObjectWriter objectWriter; /** Reusable object node. */ - private ObjectNode root; + private transient ObjectNode root; - /** Charset for byte[]. */ - private String charset = "UTF-8"; + private CsvRowSerializationSchema( + RowTypeInfo typeInfo, + CsvSchema csvSchema) { + this.typeInfo = typeInfo; + this.runtimeConverter = createRowRuntimeConverter(typeInfo, true); + this.csvMapper = new CsvMapper(); + this.csvSchema = csvSchema; + this.objectWriter = csvMapper.writer(csvSchema); + } /** - * Create a {@link CsvRowSerializationSchema} with given {@link TypeInformation}. - * @param rowTypeInfo type information used to create schem. + * A builder for creating a {@link CsvRowSerializationSchema}. */ - CsvRowSerializationSchema(TypeInformation rowTypeInfo) { - Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not be null !"); - this.rowTypeInfo = rowTypeInfo; - this.csvSchema = CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo); - this.setNullValue("null"); + @PublicEvolving + public static class Builder { + + private final RowTypeInfo typeInfo; + private CsvSchema csvSchema; + + /** + * Creates a {@link CsvRowSerializationSchema} expecting the given {@link TypeInformation}. + * + * @param typeInfo type information used to create schema. + */ + public Builder(TypeInformation typeInfo) { + Preconditions.checkNotNull(typeInfo, "Type information must not be null."); + + if (!(typeInfo instanceof RowTypeInfo)) { + throw new IllegalArgumentException("Row type information expected."); + } + + this.typeInfo = (RowTypeInfo) typeInfo; + this.csvSchema = CsvRowSchemaConverter.convert((RowTypeInfo) typeInfo); + } + + public Builder setFieldDelimiter(char c) { + this.csvSchema = this.csvSchema.rebuild().setColumnSeparator(c).build(); + return this; + } + + public Builder setLineDelimiter(String delimiter) { + Preconditions.checkNotNull(delimiter, "Delimiter must not be null."); + if (!delimiter.equals("\n") && !delimiter.equals("\r") && !delimiter.equals("\r\n")) { + throw new IllegalArgumentException( + "Unsupported new line delimiter. Only \\n, \\r, or \\r\\n are supported."); + } + this.csvSchema = this.csvSchema.rebuild().setLineSeparator(delimiter).build(); + return this; + } + + public Builder setArrayElementDelimiter(String delimiter) { + Preconditions.checkNotNull(delimiter, "Delimiter must not be null."); + this.csvSchema = this.csvSchema.rebuild().setArrayElementSeparator(delimiter).build(); + return this; + } + + public Builder setQuoteCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setQuoteChar(c).build(); + return this; + } + + public Builder setEscapeCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setEscapeChar(c).build(); + return this; + } + + public Builder setNullLiteral(String s) { + this.csvSchema = this.csvSchema.rebuild().setNullValue(s).build(); + return this; + } + + public CsvRowSerializationSchema build() { + return new CsvRowSerializationSchema( + typeInfo, + csvSchema); + } } @Override @@ -86,157 +158,177 @@ public final class CsvRowSerializationSchema implements SerializationSchema root = csvMapper.createObjectNode(); } try { - convertRow(root, row, (RowTypeInfo) rowTypeInfo); - return csvMapper.writer(csvSchema).writeValueAsBytes(root); - } catch (JsonProcessingException e) { - throw new RuntimeException("Could not serialize row '" + row + "'. " + - "Make sure that the schema matches the input.", e); + runtimeConverter.convert(csvMapper, root, row); + return objectWriter.writeValueAsBytes(root); + } catch (Throwable t) { + throw new RuntimeException("Could not serialize row '" + row + "'.", t); } } - private void convertRow(ObjectNode reuse, Row row, RowTypeInfo rowTypeInfo) { - if (reuse == null) { - reuse = csvMapper.createObjectNode(); - } - if (row.getArity() != rowTypeInfo.getFieldNames().length) { - throw new IllegalStateException(String.format( - "Number of elements in the row '%s' is different from number of field names: %d", - row, rowTypeInfo.getFieldNames().length)); + @Override + public boolean equals(Object o) { + if (o == null || o.getClass() != this.getClass()) { + return false; } - TypeInformation[] types = rowTypeInfo.getFieldTypes(); - String[] fields = rowTypeInfo.getFieldNames(); - for (int i = 0; i < types.length; i++) { - String columnName = fields[i]; - Object obj = row.getField(i); - reuse.set(columnName, convert(reuse, obj, types[i], false)); + if (this == o) { + return true; } + final CsvRowSerializationSchema that = (CsvRowSerializationSchema) o; + final CsvSchema otherSchema = that.csvSchema; + + return typeInfo.equals(that.typeInfo) && + csvSchema.getColumnSeparator() == otherSchema.getColumnSeparator() && + Arrays.equals(csvSchema.getLineSeparator(), otherSchema.getLineSeparator()) && + csvSchema.getArrayElementSeparator().equals(otherSchema.getArrayElementSeparator()) && + csvSchema.getQuoteChar() == otherSchema.getQuoteChar() && + csvSchema.getEscapeChar() == otherSchema.getEscapeChar() && + Arrays.equals(csvSchema.getNullValue(), otherSchema.getNullValue()); } - /** - * Converts an object to a JsonNode. - * @param container {@link ContainerNode} that creates {@link JsonNode}. - * @param obj Object that used to {@link JsonNode}. - * @param info Type infomation that decides the type of {@link JsonNode}. - * @param nested variable that indicates whether the obj is in a nested structure - * like a string in an array. - * @return result after converting. - */ - private JsonNode convert(ContainerNode container, Object obj, TypeInformation info, Boolean nested) { - if (obj == null) { - return container.nullNode(); - } - if (info == Types.STRING) { - return container.textNode((String) obj); - } else if (info == Types.LONG) { - return container.numberNode((Long) obj); - } else if (info == Types.INT) { - return container.numberNode((Integer) obj); - } else if (info == Types.DOUBLE) { - return container.numberNode((Double) obj); - } else if (info == Types.FLOAT) { - return container.numberNode((Float) obj); - } else if (info == Types.BIG_DEC) { - return container.numberNode(new BigDecimal(String.valueOf(obj))); - } else if (info == Types.BIG_INT) { - return container.numberNode(BigInteger.valueOf(Long.valueOf(String.valueOf(obj)))); - } else if (info == Types.SQL_DATE) { - return container.textNode(Date.valueOf(String.valueOf(obj)).toString()); - } else if (info == Types.SQL_TIME) { - return container.textNode(Time.valueOf(String.valueOf(obj)).toString()); - } else if (info == Types.SQL_TIMESTAMP) { - return container.textNode(Timestamp.valueOf(String.valueOf(obj)).toString()); - } else if (info == Types.BOOLEAN) { - return container.booleanNode((Boolean) obj); - } else if (info instanceof RowTypeInfo){ - if (nested) { - throw new RuntimeException("Unable to support nested row type " + info.toString() + " yet"); - } - return convertArray((Row) obj, (RowTypeInfo) info); - } else if (info instanceof BasicArrayTypeInfo) { - if (nested) { - throw new RuntimeException("Unable to support nested array type " + info.toString() + " yet"); - } - return convertArray((Object[]) obj, - ((BasicArrayTypeInfo) info).getComponentInfo()); - } else if (info instanceof PrimitiveArrayTypeInfo && - ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { - /* We converts byte[] to TextNode instead of BinaryNode here, - because the instance of BinaryNode will be serialized to base64 string in - {@link com.fasterxml.jackson.databind.node.BinaryNode#serialize(JsonGenerator, SerializerProvider)}, - which is unacceptable for users. - */ - try { - return container.textNode(new String((byte[]) obj, charset)); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException("Unsupport encoding charset " + charset, e); - } - } else { - throw new RuntimeException("Unable to support type " + info.toString() + " yet"); - } + @Override + public int hashCode() { + return Objects.hash( + typeInfo, + csvSchema.getColumnSeparator(), + csvSchema.getLineSeparator(), + csvSchema.getArrayElementSeparator(), + csvSchema.getQuoteChar(), + csvSchema.getEscapeChar(), + csvSchema.getNullValue()); } - /** - * Use {@link ArrayNode} to represents a row. - */ - private ArrayNode convertArray(Row row, RowTypeInfo rowTypeInfo) { - ArrayNode arrayNode = csvMapper.createArrayNode(); - TypeInformation[] types = rowTypeInfo.getFieldTypes(); - String[] fields = rowTypeInfo.getFieldNames(); - for (int i = 0; i < fields.length; i++) { - arrayNode.add(convert(arrayNode, row.getField(i), types[i], true)); - } - return arrayNode; + // -------------------------------------------------------------------------------------------- + + private interface RuntimeConverter extends Serializable { + JsonNode convert(CsvMapper csvMapper, ContainerNode container, Object obj); } - /** - * Use {@link ArrayNode} to represents an array. - */ - private ArrayNode convertArray(Object[] obj, TypeInformation elementInfo) { - ArrayNode arrayNode = csvMapper.createArrayNode(); - for (Object elementObj : obj) { - arrayNode.add(convert(arrayNode, elementObj, elementInfo, true)); - } - return arrayNode; + private static RuntimeConverter createRowRuntimeConverter(RowTypeInfo rowTypeInfo, boolean isTopLevel) { + final TypeInformation[] fieldTypes = rowTypeInfo.getFieldTypes(); + final String[] fieldNames = rowTypeInfo.getFieldNames(); + + final RuntimeConverter[] fieldConverters = createFieldRuntimeConverters(fieldTypes); + + return assembleRowRuntimeConverter(isTopLevel, fieldNames, fieldConverters); } - public void setCharset(String charset) { - this.charset = charset; + private static RuntimeConverter[] createFieldRuntimeConverters(TypeInformation[] fieldTypes) { + final RuntimeConverter[] fieldConverters = new RuntimeConverter[fieldTypes.length]; + for (int i = 0; i < fieldTypes.length; i++) { + fieldConverters[i] = createNullableRuntimeConverter(fieldTypes[i]); + } + return fieldConverters; } - public void setFieldDelimiter(String s) { - if (s.length() != 1) { - throw new RuntimeException("FieldDelimiter's length must be one !"); + private static RuntimeConverter assembleRowRuntimeConverter( + boolean isTopLevel, + String[] fieldNames, + RuntimeConverter[] fieldConverters) { + final int rowArity = fieldNames.length; + // top level reuses the object node container + if (isTopLevel) { + return (csvMapper, container, obj) -> { + final Row row = (Row) obj; + + validateArity(rowArity, row.getArity()); + + final ObjectNode objectNode = (ObjectNode) container; + for (int i = 0; i < rowArity; i++) { + objectNode.set( + fieldNames[i], + fieldConverters[i].convert(csvMapper, container, row.getField(i))); + } + return objectNode; + }; + } else { + return (csvMapper, container, obj) -> { + final Row row = (Row) obj; + + validateArity(rowArity, row.getArity()); + + final ArrayNode arrayNode = csvMapper.createArrayNode(); + for (int i = 0; i < rowArity; i++) { + arrayNode.add(fieldConverters[i].convert(csvMapper, arrayNode, row.getField(i))); + } + return arrayNode; + }; } - this.csvSchema = this.csvSchema.rebuild().setColumnSeparator(s.charAt(0)).build(); } - public void setArrayElementDelimiter(String s) { - this.csvSchema = this.csvSchema.rebuild().setArrayElementSeparator(s).build(); + private static RuntimeConverter createNullableRuntimeConverter(TypeInformation info) { + final RuntimeConverter valueConverter = createRuntimeConverter(info); + return (csvMapper, container, obj) -> { + if (obj == null) { + return container.nullNode(); + } + return valueConverter.convert(csvMapper, container, obj); + }; } - public void setQuoteCharacter(char c) { - this.csvSchema = this.csvSchema.rebuild().setQuoteChar(c).build(); + private static RuntimeConverter createRuntimeConverter(TypeInformation info) { + if (info.equals(Types.VOID)) { + return (csvMapper, container, obj) -> container.nullNode(); + } else if (info.equals(Types.STRING)) { + return (csvMapper, container, obj) -> container.textNode((String) obj); + } else if (info.equals(Types.BOOLEAN)) { + return (csvMapper, container, obj) -> container.booleanNode((Boolean) obj); + } else if (info.equals(Types.BYTE)) { + return (csvMapper, container, obj) -> container.numberNode((Byte) obj); + } else if (info.equals(Types.SHORT)) { + return (csvMapper, container, obj) -> container.numberNode((Short) obj); + } else if (info.equals(Types.INT)) { + return (csvMapper, container, obj) -> container.numberNode((Integer) obj); + } else if (info.equals(Types.LONG)) { + return (csvMapper, container, obj) -> container.numberNode((Long) obj); + } else if (info.equals(Types.FLOAT)) { + return (csvMapper, container, obj) -> container.numberNode((Float) obj); + } else if (info.equals(Types.DOUBLE)) { + return (csvMapper, container, obj) -> container.numberNode((Double) obj); + } else if (info.equals(Types.BIG_DEC)) { + return (csvMapper, container, obj) -> container.numberNode((BigDecimal) obj); + } else if (info.equals(Types.BIG_INT)) { + return (csvMapper, container, obj) -> container.numberNode((BigInteger) obj); + } else if (info.equals(Types.SQL_DATE)) { + return (csvMapper, container, obj) -> container.textNode(obj.toString()); + } else if (info.equals(Types.SQL_TIME)) { + return (csvMapper, container, obj) -> container.textNode(obj.toString()); + } else if (info.equals(Types.SQL_TIMESTAMP)) { + return (csvMapper, container, obj) -> container.textNode(obj.toString()); + } else if (info instanceof RowTypeInfo){ + return createRowRuntimeConverter((RowTypeInfo) info, false); + } else if (info instanceof BasicArrayTypeInfo) { + return createObjectArrayRuntimeConverter(((BasicArrayTypeInfo) info).getComponentInfo()); + } else if (info instanceof ObjectArrayTypeInfo) { + return createObjectArrayRuntimeConverter(((ObjectArrayTypeInfo) info).getComponentInfo()); + } else if (info instanceof PrimitiveArrayTypeInfo && + ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { + return createByteArrayRuntimeConverter(); + } + else { + throw new RuntimeException("Unsupported type information '" + info + "'."); + } } - public void setEscapeCharacter(char c) { - this.csvSchema = this.csvSchema.rebuild().setEscapeChar(c).build(); + private static RuntimeConverter createObjectArrayRuntimeConverter(TypeInformation elementType) { + final RuntimeConverter elementConverter = createNullableRuntimeConverter(elementType); + return (csvMapper, container, obj) -> { + final Object[] array = (Object[]) obj; + final ArrayNode arrayNode = csvMapper.createArrayNode(); + for (Object element : array) { + arrayNode.add(elementConverter.convert(csvMapper, arrayNode, element)); + } + return arrayNode; + }; } - public void setNullValue(String s) { - this.csvSchema = this.csvSchema.rebuild().setNullValue(s).build(); + private static RuntimeConverter createByteArrayRuntimeConverter() { + return (csvMapper, container, obj) -> container.binaryNode((byte[]) obj); } - @Override - public boolean equals(Object o) { - if (o == null || o.getClass() != this.getClass()) { - return false; + private static void validateArity(int expected, int actual) { + if (expected != actual) { + throw new RuntimeException("Row length mismatch. " + expected + + " fields expected but was " + actual + "."); } - if (this == o) { - return true; - } - final CsvRowSerializationSchema that = (CsvRowSerializationSchema) o; - - return rowTypeInfo.equals(that.rowTypeInfo) && - csvSchema.toString().equals(that.csvSchema.toString()); } } diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/table/descriptors/Csv.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/table/descriptors/Csv.java new file mode 100644 index 00000000000..c467f1f0023 --- /dev/null +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/table/descriptors/Csv.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.utils.TypeStringUtils; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.Map; + +import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_ALLOW_COMMENTS; +import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER; +import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_ESCAPE_CHARACTER; +import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_FIELD_DELIMITER; +import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_IGNORE_PARSE_ERRORS; +import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_LINE_DELIMITER; +import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_NULL_LITERAL; +import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_QUOTE_CHARACTER; +import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_SCHEMA; +import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_TYPE_VALUE; +import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA; + +/** + * Format descriptor for comma-separated values (CSV). + * + *

This descriptor aims to comply with RFC-4180 ("Common Format and MIME Type for + * Comma-Separated Values (CSV) Files") proposed by the Internet Engineering Task Force (IETF). + * + *

Note: This descriptor does not describe Flink's old non-standard CSV table + * source/sink. Currently, this descriptor can be used when writing to Kafka. The old one is + * still available under "org.apache.flink.table.descriptors.OldCsv" for stream/batch + * filesystem operations. + */ +@PublicEvolving +public class Csv extends FormatDescriptor { + + private DescriptorProperties internalProperties = new DescriptorProperties(true); + + /** + * Format descriptor for comma-separated values (CSV). + * + *

This descriptor aims to comply with RFC-4180 ("Common Format and MIME Type for + * Comma-Separated Values (CSV) Files) proposed by the Internet Engineering Task Force (IETF). + */ + public Csv() { + super(FORMAT_TYPE_VALUE, 1); + } + + /** + * Sets the field delimiter character (',' by default). + * + * @param delimiter the field delimiter character + */ + public Csv fieldDelimiter(char delimiter) { + internalProperties.putCharacter(FORMAT_FIELD_DELIMITER, delimiter); + return this; + } + + /** + * Sets the line delimiter ("\n" by default; otherwise "\r" or "\r\n" are allowed). + * + * @param delimiter the line delimiter + */ + public Csv lineDelimiter(String delimiter) { + Preconditions.checkNotNull(delimiter); + internalProperties.putString(FORMAT_LINE_DELIMITER, delimiter); + return this; + } + + /** + * Sets the quote character for enclosing field values ('"' by default). + * + * @param quoteCharacter the quote character + */ + public Csv quoteCharacter(char quoteCharacter) { + internalProperties.putCharacter(FORMAT_QUOTE_CHARACTER, quoteCharacter); + return this; + } + + /** + * Ignores comment lines that start with '#' (disabled by default). If enabled, make sure to + * also ignore parse errors to allow empty rows. + */ + public Csv allowComments() { + internalProperties.putBoolean(FORMAT_ALLOW_COMMENTS, true); + return this; + } + + /** + * Skip fields and rows with parse errors instead of failing. Fields are set to {@code null} + * in case of errors. By default, an exception is thrown. + */ + public Csv ignoreParseErrors() { + internalProperties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, true); + return this; + } + + /** + * Sets the array element delimiter string for separating array or row element + * values (";" by default). + * + * @param delimiter the array element delimiter + */ + public Csv arrayElementDelimiter(String delimiter) { + Preconditions.checkNotNull(delimiter); + internalProperties.putString(FORMAT_ARRAY_ELEMENT_DELIMITER, delimiter); + return this; + } + + /** + * Sets the escape character for escaping values (disabled by default). + * + * @param escapeCharacter escaping character (e.g. backslash) + */ + public Csv escapeCharacter(char escapeCharacter) { + internalProperties.putCharacter(FORMAT_ESCAPE_CHARACTER, escapeCharacter); + return this; + } + + /** + * Sets the null literal string that is interpreted as a null value (disabled by default). + * + * @param nullLiteral null literal (e.g. "null" or "n/a") + */ + public Csv nullLiteral(String nullLiteral) { + Preconditions.checkNotNull(nullLiteral); + internalProperties.putString(FORMAT_NULL_LITERAL, nullLiteral); + return this; + } + + /** + * Sets the format schema with field names and the types. Required if schema is not derived. + * + * @param schemaType type information that describes the schema + */ + public Csv schema(TypeInformation schemaType) { + Preconditions.checkNotNull(schemaType); + internalProperties.putString(FORMAT_SCHEMA, TypeStringUtils.writeTypeInfo(schemaType)); + return this; + } + + /** + * Derives the format schema from the table's schema. Required if no format schema is defined. + * + *

This allows for defining schema information only once. + * + *

The names, types, and fields' order of the format are determined by the table's + * schema. Time attributes are ignored if their origin is not a field. A "from" definition + * is interpreted as a field renaming in the format. + */ + public Csv deriveSchema() { + internalProperties.putBoolean(FORMAT_DERIVE_SCHEMA, true); + return this; + } + + @Override + protected Map toFormatProperties() { + final DescriptorProperties properties = new DescriptorProperties(); + properties.putProperties(internalProperties); + return properties.asMap(); + } +} diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/table/descriptors/CsvValidator.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/table/descriptors/CsvValidator.java new file mode 100644 index 00000000000..bc539716813 --- /dev/null +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/table/descriptors/CsvValidator.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ValidationException; + +import java.util.Arrays; + +/** + * Validator for {@link Csv}. + */ +@Internal +public class CsvValidator extends FormatDescriptorValidator { + + public static final String FORMAT_TYPE_VALUE = "csv"; + public static final String FORMAT_FIELD_DELIMITER = "format.field-delimiter"; + public static final String FORMAT_LINE_DELIMITER = "format.line-delimiter"; + public static final String FORMAT_QUOTE_CHARACTER = "format.quote-character"; + public static final String FORMAT_ALLOW_COMMENTS = "format.allow-comments"; + public static final String FORMAT_IGNORE_PARSE_ERRORS = "format.ignore-parse-errors"; + public static final String FORMAT_ARRAY_ELEMENT_DELIMITER = "format.array-element-delimiter"; + public static final String FORMAT_ESCAPE_CHARACTER = "format.escape-character"; + public static final String FORMAT_NULL_LITERAL = "format.null-literal"; + public static final String FORMAT_SCHEMA = "format.schema"; + + @Override + public void validate(DescriptorProperties properties) { + super.validate(properties); + properties.validateString(FORMAT_FIELD_DELIMITER, true, 1, 1); + properties.validateEnumValues(FORMAT_LINE_DELIMITER, true, Arrays.asList("\r", "\n", "\r\n")); + properties.validateString(FORMAT_QUOTE_CHARACTER, true, 1, 1); + properties.validateBoolean(FORMAT_ALLOW_COMMENTS, true); + properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, true); + properties.validateString(FORMAT_ARRAY_ELEMENT_DELIMITER, true, 1); + properties.validateString(FORMAT_ESCAPE_CHARACTER, true, 1, 1); + properties.validateBoolean(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA, true); + + final boolean hasSchema = properties.containsKey(FORMAT_SCHEMA); + final boolean isDerived = properties + .getOptionalBoolean(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA) + .orElse(false); + if (isDerived && hasSchema) { + throw new ValidationException( + "Format cannot define a schema and derive from the table's schema at the same time."); + } else if (hasSchema) { + properties.validateType(FORMAT_SCHEMA, false, true); + } else if (!isDerived) { + throw new ValidationException( + "A definition of a schema or derivation from the table's schema is required."); + } + } +} diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java new file mode 100644 index 00000000000..ece2e45e187 --- /dev/null +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.types.Row; +import org.apache.flink.util.InstantiationUtil; + +import org.junit.Test; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.function.Consumer; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests for {@link CsvRowSerializationSchema} and {@link CsvRowDeserializationSchema}. + */ +public class CsvRowDeSerializationSchemaTest { + + @Test + @SuppressWarnings("unchecked") + public void testSerializeDeserialize() throws Exception { + + testNullableField(Types.LONG, "null", null); + testNullableField(Types.STRING, "null", null); + testNullableField(Types.VOID, "null", null); + testNullableField(Types.STRING, "\"This is a test.\"", "This is a test."); + testNullableField(Types.STRING, "\"This is a test\n\r.\"", "This is a test\n\r."); + testNullableField(Types.BOOLEAN, "true", true); + testNullableField(Types.BOOLEAN, "null", null); + testNullableField(Types.BYTE, "124", (byte) 124); + testNullableField(Types.SHORT, "10000", (short) 10000); + testNullableField(Types.INT, "1234567", 1234567); + testNullableField(Types.LONG, "12345678910", 12345678910L); + testNullableField(Types.FLOAT, "0.33333334", 0.33333334f); + testNullableField(Types.DOUBLE, "0.33333333332", 0.33333333332d); + testNullableField(Types.BIG_DEC, + "\"1234.0000000000000000000000001\"", + new BigDecimal("1234.0000000000000000000000001")); + testNullableField(Types.BIG_INT, + "\"123400000000000000000000000000\"", + new BigInteger("123400000000000000000000000000")); + testNullableField(Types.SQL_DATE, "2018-10-12", Date.valueOf("2018-10-12")); + testNullableField(Types.SQL_TIME, "12:12:12", Time.valueOf("12:12:12")); + testNullableField( + Types.SQL_TIMESTAMP, + "\"2018-10-12 12:12:12.0\"", + Timestamp.valueOf("2018-10-12 12:12:12")); + testNullableField( + Types.ROW(Types.STRING, Types.INT, Types.BOOLEAN), + "Hello;42;false", + Row.of("Hello", 42, false)); + testNullableField( + Types.OBJECT_ARRAY(Types.STRING), + "a;b;c", + new String[] {"a", "b", "c"}); + testNullableField( + Types.OBJECT_ARRAY(Types.BYTE), + "12;4;null", + new Byte[] {12, 4, null}); + testNullableField( + (TypeInformation) Types.PRIMITIVE_ARRAY(Types.BYTE), + "awML", + new byte[] {107, 3, 11}); + } + + @Test + public void testSerializeDeserializeCustomizedProperties() throws Exception { + + final Consumer serConfig = (serSchemaBuilder) -> serSchemaBuilder + .setEscapeCharacter('*') + .setQuoteCharacter('\'') + .setArrayElementDelimiter(":") + .setFieldDelimiter(';'); + + final Consumer deserConfig = (deserSchemaBuilder) -> deserSchemaBuilder + .setEscapeCharacter('*') + .setQuoteCharacter('\'') + .setArrayElementDelimiter(":") + .setFieldDelimiter(';'); + + testField(Types.STRING, "123*'4**", "123'4*", deserConfig, ";"); + testField(Types.STRING, "'123''4**'", "123'4*", serConfig, deserConfig, ";"); + testField(Types.STRING, "'a;b*'c'", "a;b'c", deserConfig, ";"); + testField(Types.STRING, "'a;b''c'", "a;b'c", serConfig, deserConfig, ";"); + testField(Types.INT, " 12 ", 12, deserConfig, ";"); + testField(Types.INT, "12", 12, serConfig, deserConfig, ";"); + testField(Types.ROW(Types.STRING, Types.STRING), "1:hello", Row.of("1", "hello"), deserConfig, ";"); + testField(Types.ROW(Types.STRING, Types.STRING), "'1:hello'", Row.of("1", "hello"), serConfig, deserConfig, ";"); + testField(Types.ROW(Types.STRING, Types.STRING), "'1:hello world'", Row.of("1", "hello world"), serConfig, deserConfig, ";"); + testField(Types.STRING, "null", "null", serConfig, deserConfig, ";"); // string because null literal has not been set + } + + @Test + public void testDeserializeParseError() throws Exception { + try { + testDeserialization(false, false, "Test,null,Test"); // null not supported + fail("Missing field should cause failure."); + } catch (IOException e) { + // valid exception + } + } + + @Test + public void testDeserializeUnsupportedNull() throws Exception { + // unsupported null for integer + assertEquals(Row.of("Test", null, "Test"), testDeserialization(true, false, "Test,null,Test")); + } + + @Test + public void testDeserializeIncompleteRow() throws Exception { + // last two columns are missing + assertEquals(Row.of("Test", null, null), testDeserialization(true, false, "Test")); + } + + @Test + public void testDeserializeMoreColumnsThanExpected() throws Exception { + // one additional string column + assertNull(testDeserialization(true, false, "Test,12,Test,Test")); + } + + @Test + public void testDeserializeIgnoreComment() throws Exception { + // # is part of the string + assertEquals(Row.of("#Test", 12, "Test"), testDeserialization(false, false, "#Test,12,Test")); + } + + @Test + public void testDeserializeAllowComment() throws Exception { + // entire row is ignored + assertNull(testDeserialization(true, true, "#Test,12,Test")); + } + + @Test + public void testSerializationProperties() throws Exception { + final TypeInformation rowInfo = Types.ROW(Types.STRING, Types.INT, Types.STRING); + final CsvRowSerializationSchema.Builder serSchemaBuilder = new CsvRowSerializationSchema.Builder(rowInfo) + .setLineDelimiter("\r"); + + assertArrayEquals( + "Test,12,Hello\r".getBytes(), + serialize(serSchemaBuilder, Row.of("Test", 12, "Hello"))); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidNesting() throws Exception { + testNullableField(Types.ROW(Types.ROW(Types.STRING)), "FAIL", Row.of(Row.of("FAIL"))); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidType() throws Exception { + testNullableField(Types.GENERIC(java.util.Date.class), "FAIL", new java.util.Date()); + } + + private void testNullableField(TypeInformation fieldInfo, String string, T value) throws Exception { + testField( + fieldInfo, + string, + value, + (deserSchema) -> deserSchema.setNullLiteral("null"), + (serSchema) -> serSchema.setNullLiteral("null"), + ","); + } + + private void testField( + TypeInformation fieldInfo, + String csvValue, + T value, + Consumer serializationConfig, + Consumer deserializationConfig, + String fieldDelimiter) throws Exception { + final TypeInformation rowInfo = Types.ROW(Types.STRING, fieldInfo, Types.STRING); + final String expectedCsv = "BEGIN" + fieldDelimiter + csvValue + fieldDelimiter + "END\n"; + final Row expectedRow = Row.of("BEGIN", value, "END"); + + // serialization + final CsvRowSerializationSchema.Builder serSchemaBuilder = new CsvRowSerializationSchema.Builder(rowInfo); + serializationConfig.accept(serSchemaBuilder); + final byte[] serializedRow = serialize(serSchemaBuilder, expectedRow); + assertEquals(expectedCsv, new String(serializedRow)); + + // deserialization + final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(rowInfo); + deserializationConfig.accept(deserSchemaBuilder); + final Row deserializedRow = deserialize(deserSchemaBuilder, expectedCsv); + assertEquals(expectedRow, deserializedRow); + } + + private void testField( + TypeInformation fieldInfo, + String csvValue, + T value, + Consumer deserializationConfig, + String fieldDelimiter) throws Exception { + final TypeInformation rowInfo = Types.ROW(Types.STRING, fieldInfo, Types.STRING); + final String csv = "BEGIN" + fieldDelimiter + csvValue + fieldDelimiter + "END\n"; + final Row expectedRow = Row.of("BEGIN", value, "END"); + + // deserialization + final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(rowInfo); + deserializationConfig.accept(deserSchemaBuilder); + final Row deserializedRow = deserialize(deserSchemaBuilder, csv); + assertEquals(expectedRow, deserializedRow); + } + + private Row testDeserialization( + boolean allowParsingErrors, + boolean allowComments, + String string) throws Exception { + final TypeInformation rowInfo = Types.ROW(Types.STRING, Types.INT, Types.STRING); + final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(rowInfo) + .setIgnoreParseErrors(allowParsingErrors) + .setAllowComments(allowComments); + return deserialize(deserSchemaBuilder, string); + } + + private static byte[] serialize(CsvRowSerializationSchema.Builder serSchemaBuilder, Row row) throws Exception { + // we serialize and deserialize the schema to test runtime behavior + // when the schema is shipped to the cluster + final CsvRowSerializationSchema schema = InstantiationUtil.deserializeObject( + InstantiationUtil.serializeObject(serSchemaBuilder.build()), + CsvRowDeSerializationSchemaTest.class.getClassLoader()); + return schema.serialize(row); + } + + private static Row deserialize(CsvRowDeserializationSchema.Builder deserSchemaBuilder, String csv) throws Exception { + // we serialize and deserialize the schema to test runtime behavior + // when the schema is shipped to the cluster + final CsvRowDeserializationSchema schema = InstantiationUtil.deserializeObject( + InstantiationUtil.serializeObject(deserSchemaBuilder.build()), + CsvRowDeSerializationSchemaTest.class.getClassLoader()); + return schema.deserialize(csv.getBytes()); + } +} diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeserializationSchemaTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeserializationSchemaTest.java deleted file mode 100644 index 7e63a4ad852..00000000000 --- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeserializationSchemaTest.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.formats.csv; - -import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.api.Types; -import org.apache.flink.types.Row; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.charset.StandardCharsets; -import java.sql.Timestamp; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -/** - * Testing for {@link CsvRowDeserializationSchema}. - */ -public class CsvRowDeserializationSchemaTest extends TestLogger { - - @Test - public void testDeserialize() throws IOException { - final long currentMills = System.currentTimeMillis(); - final TypeInformation rowTypeInfo = Types.ROW( - new String[]{"a", "b", "c", "d", "e", "f", "g"}, - new TypeInformation[]{ - Types.STRING(), Types.LONG(), Types.DECIMAL(), - Types.ROW( - new String[]{"c1", "c2", "c3"}, - new TypeInformation[]{ - Types.INT(), - PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, - Types.STRING() - } - ), Types.SQL_TIMESTAMP(), Types.BOOLEAN(), - PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO - } - ); - - String c1 = "123"; - String c2 = String.valueOf(34L); - String c3 = String.valueOf(1233.2); - String c4 = "1" + ";" + new String("abc".getBytes()) + ";" + "cba"; - String c5 = new Timestamp(currentMills).toString(); - String c6 = "true"; - String c7 = new String("12345".getBytes()); - byte[] bytes = (c1 + "," + c2 + "," + c3 + "," + c4 + "," - + c5 + "," + c6 + "," + c7).getBytes(); - CsvRowDeserializationSchema deserializationSchema = new CsvRowDeserializationSchema(rowTypeInfo); - Row deserializedRow = deserializationSchema.deserialize(bytes); - - assertEquals(7, deserializedRow.getArity()); - assertEquals("123", deserializedRow.getField(0)); - assertEquals(34L, deserializedRow.getField(1)); - assertEquals(BigDecimal.valueOf(1233.2), deserializedRow.getField(2)); - assertArrayEquals("abc".getBytes(), (byte[]) ((Row) deserializedRow.getField(3)).getField(1)); - assertEquals(new Timestamp(currentMills), deserializedRow.getField(4)); - assertEquals(true, deserializedRow.getField(5)); - assertArrayEquals("12345".getBytes(), (byte[]) deserializedRow.getField(6)); - } - - @Test - public void testCustomizedProperties() throws IOException { - final TypeInformation rowTypeInfo = Types.ROW( - new String[]{"a", "b", "c"}, - new TypeInformation[]{Types.STRING(), Types.STRING(), - Types.ROW( - new String[]{"c1", "c2"}, - new TypeInformation[]{Types.INT(), Types.STRING()} - )} - ); - - String c1 = "123*\"4"; - String c2 = "'a,bc'"; - String c3 = "1:zxv"; - - byte[] bytes = (c1 + "," + c2 + "," + c3).getBytes(); - CsvRowDeserializationSchema deserializationSchema = new CsvRowDeserializationSchema(rowTypeInfo); - deserializationSchema.setEscapeCharacter('*'); - deserializationSchema.setQuoteCharacter('\''); - deserializationSchema.setArrayElementDelimiter(":"); - Row deserializedRow = deserializationSchema.deserialize(bytes); - - assertEquals("123\"4", deserializedRow.getField(0)); - assertEquals("a,bc", deserializedRow.getField(1)); - assertEquals("zxv", ((Row) deserializedRow.getField(2)).getField(1)); - } - - @Test - public void testCharset() throws IOException { - final TypeInformation rowTypeInfo = Types.ROW( - new String[]{"a"}, - new TypeInformation[]{PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO} - ); - final CsvRowDeserializationSchema schema = new CsvRowDeserializationSchema(rowTypeInfo); - schema.setCharset("UTF-16"); - - byte[] bytes = "abc".getBytes(StandardCharsets.UTF_16); - Row result = schema.deserialize(bytes); - - assertEquals("abc", new String((byte[]) result.getField(0), StandardCharsets.UTF_16)); - } - - @Test - public void testNull() throws IOException { - final TypeInformation rowTypeInfo = Types.ROW( - new String[]{"a"}, - new TypeInformation[]{Types.STRING()} - ); - - final byte[] bytes = "123".getBytes(); - - final CsvRowDeserializationSchema deserializationSchema = new CsvRowDeserializationSchema(rowTypeInfo); - deserializationSchema.setNullValue("123"); - final Row row = deserializationSchema.deserialize(bytes); - assertNull(row.getField(0)); - } - - @Test(expected = IllegalArgumentException.class) - public void testNumberOfFieldNamesAndTypesMismatch() { - TypeInformation rowTypeInfo = Types.ROW( - new String[]{"a", "b"}, - new TypeInformation[]{Types.STRING()} - ); - new CsvRowDeserializationSchema(rowTypeInfo); - } - -} diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowFormatFactoryTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowFormatFactoryTest.java index 9d8263f0ac3..bc2fb07222f 100644 --- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowFormatFactoryTest.java +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowFormatFactoryTest.java @@ -21,11 +21,9 @@ package org.apache.flink.formats.csv; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.api.NoMatchingTableFactoryException; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.Types; import org.apache.flink.table.descriptors.Csv; -import org.apache.flink.table.descriptors.Descriptor; -import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.table.factories.DeserializationSchemaFactory; import org.apache.flink.table.factories.SerializationSchemaFactory; @@ -35,16 +33,16 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; +import java.util.HashMap; import java.util.Map; import static org.junit.Assert.assertEquals; /** - * Testing for {@link CsvRowFormatFactory}. + * Tests for {@link CsvRowFormatFactory}. */ public class CsvRowFormatFactoryTest extends TestLogger { - private static final TypeInformation SCHEMA = Types.ROW( new String[]{"a", "b", "c"}, new TypeInformation[]{Types.STRING(), Types.INT(), Types.ROW( @@ -55,67 +53,69 @@ public class CsvRowFormatFactoryTest extends TestLogger { @Test public void testSchema() { - final Map properties = toMap( - new Csv() - .field("a", Types.STRING()) - .field("b", Types.INT()) - .field("c", Types.ROW( - new String[]{"a", "b", "c"}, - new TypeInformation[]{Types.STRING(), Types.INT(), Types.BOOLEAN()} - )) - .arrayElementDelim("^^") - .escapeCharacter('c') - ); - testSchemaSerializationSchema(properties); - testSchemaDeserializationSchema(properties); - } - - @Test - public void testDerived() { - final Map properties = toMap( - new Schema() - .field("a", Types.STRING()) - .field("b", Types.INT()) - .field("c", Types.ROW( - new String[]{"a", "b", "c"}, - new TypeInformation[]{Types.STRING(), Types.INT(), Types.BOOLEAN()} - )), new Csv().derived(true) - ); - testSchemaSerializationSchema(properties); - testSchemaDeserializationSchema(properties); - } + final Map properties = new Csv() + .schema(SCHEMA) + .fieldDelimiter(';') + .lineDelimiter("\r\n") + .quoteCharacter('\'') + .allowComments() + .ignoreParseErrors() + .arrayElementDelimiter("|") + .escapeCharacter('\\') + .nullLiteral("n/a") + .toProperties(); - @Test(expected = NoMatchingTableFactoryException.class) - public void testUnsupportedProperties() { - final Map properties = toMap( - new Csv() - .field("a", Types.STRING()) - .lineDelimiter("%") - ); - testSchemaSerializationSchema(properties); - } + final CsvRowDeserializationSchema expectedDeser = new CsvRowDeserializationSchema.Builder(SCHEMA) + .setFieldDelimiter(';') + .setQuoteCharacter('\'') + .setAllowComments(true) + .setIgnoreParseErrors(true) + .setArrayElementDelimiter("|") + .setEscapeCharacter('\\') + .setNullLiteral("n/a") + .build(); - private void testSchemaDeserializationSchema(Map properties) { - final DeserializationSchema actual2 = TableFactoryService + final DeserializationSchema actualDeser = TableFactoryService .find(DeserializationSchemaFactory.class, properties) .createDeserializationSchema(properties); - final CsvRowDeserializationSchema expected2 = new CsvRowDeserializationSchema(SCHEMA); - assertEquals(expected2, actual2); - } - private void testSchemaSerializationSchema(Map properties) { - final SerializationSchema actual1 = TableFactoryService + assertEquals(expectedDeser, actualDeser); + + final CsvRowSerializationSchema expectedSer = new CsvRowSerializationSchema.Builder(SCHEMA) + .setFieldDelimiter(';') + .setLineDelimiter("\r\n") + .setQuoteCharacter('\'') + .setArrayElementDelimiter("|") + .setEscapeCharacter('\\') + .setNullLiteral("n/a") + .build(); + + final SerializationSchema actualSer = TableFactoryService .find(SerializationSchemaFactory.class, properties) .createSerializationSchema(properties); - final SerializationSchema expected1 = new CsvRowSerializationSchema(SCHEMA); - assertEquals(expected1, actual1); + + assertEquals(expectedSer, actualSer); } - private static Map toMap(Descriptor... desc) { - final DescriptorProperties descriptorProperties = new DescriptorProperties(true); - for (Descriptor d : desc) { - d.addProperties(descriptorProperties); - } - return descriptorProperties.asMap(); + @Test + public void testSchemaDerivation() { + final Map properties = new HashMap<>(); + properties.putAll(new Schema().schema(TableSchema.fromTypeInfo(SCHEMA)).toProperties()); + properties.putAll(new Csv().deriveSchema().toProperties()); + + final CsvRowSerializationSchema expectedSer = new CsvRowSerializationSchema.Builder(SCHEMA).build(); + final CsvRowDeserializationSchema expectedDeser = new CsvRowDeserializationSchema.Builder(SCHEMA).build(); + + final SerializationSchema actualSer = TableFactoryService + .find(SerializationSchemaFactory.class, properties) + .createSerializationSchema(properties); + + assertEquals(expectedSer, actualSer); + + final DeserializationSchema actualDeser = TableFactoryService + .find(DeserializationSchemaFactory.class, properties) + .createDeserializationSchema(properties); + + assertEquals(expectedDeser, actualDeser); } } diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSchemaConverterTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSchemaConverterTest.java deleted file mode 100644 index e9635fb6887..00000000000 --- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSchemaConverterTest.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.formats.csv; - -import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.util.TestLogger; - -import com.fasterxml.jackson.dataformat.csv.CsvSchema; -import com.fasterxml.jackson.dataformat.csv.CsvSchema.ColumnType; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -/** - * testing for {@link CsvRowSchemaConverter}. - */ -public class CsvRowSchemaConverterTest extends TestLogger { - - @Test - public void testRowToCsvSchema() { - RowTypeInfo rowTypeInfo = new RowTypeInfo( - new TypeInformation[] { - Types.STRING, - Types.LONG, - Types.ROW(Types.STRING), - Types.BIG_DEC, - Types.BOOLEAN, - BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, - PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO - }, - new String[]{"a", "b", "c", "d", "e", "f", "g"} - ); - CsvSchema expect = CsvSchema.builder() - .addColumn("a", ColumnType.STRING) - .addColumn("b", ColumnType.NUMBER) - .addColumn("c", ColumnType.ARRAY) - .addColumn("d", ColumnType.NUMBER) - .addColumn("e", ColumnType.BOOLEAN) - .addColumn("f", ColumnType.ARRAY) - .addColumn("g", ColumnType.STRING) - .build(); - CsvSchema actual = CsvRowSchemaConverter.rowTypeToCsvSchema(rowTypeInfo); - assertEquals(expect.toString(), actual.toString()); - } - - @Test(expected = RuntimeException.class) - public void testUnsupportedType() { - CsvRowSchemaConverter.rowTypeToCsvSchema(new RowTypeInfo( - new TypeInformation[]{Types.STRING, - PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO}, - new String[]{"a", "b"} - )); - } - -} diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSerializationSchemaTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSerializationSchemaTest.java deleted file mode 100644 index 72b5fa9795b..00000000000 --- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowSerializationSchemaTest.java +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.formats.csv; - -import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.api.Types; -import org.apache.flink.types.Row; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.math.BigDecimal; -import java.nio.charset.StandardCharsets; -import java.sql.Timestamp; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -/** - * Testing for {@link CsvRowSerializationSchema}. - */ -public class CsvRowSerializationSchemaTest extends TestLogger { - - @Test - public void testSerializeAndDeserialize() throws IOException { - final String[] fields = new String[]{"a", "b", "c", "d", "e", "f", "g", "h"}; - final TypeInformation[] types = new TypeInformation[]{ - Types.BOOLEAN(), Types.STRING(), Types.INT(), Types.DECIMAL(), - Types.SQL_TIMESTAMP(), PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, - Types.ROW( - new String[]{"g1", "g2"}, - new TypeInformation[]{Types.STRING(), Types.LONG()}), - Types.STRING() - }; - final TypeInformation rowSchema = Types.ROW(fields, types); - final Row row = new Row(8); - final Row nestedRow = new Row(2); - nestedRow.setField(0, "z\"xcv"); - nestedRow.setField(1, 123L); - row.setField(0, true); - row.setField(1, "abcd"); - row.setField(2, 1); - row.setField(3, BigDecimal.valueOf(1.2334)); - row.setField(4, new Timestamp(System.currentTimeMillis())); - row.setField(5, "qwecxcr".getBytes()); - row.setField(6, nestedRow); - row.setField(7, null); - - final Row resultRow = serializeAndDeserialize(rowSchema, row); - assertEquals(row, resultRow); - } - - @Test - public void testSerialize() { - long currentMillis = System.currentTimeMillis(); - Row row = new Row(4); - Row nestedRow = new Row(2); - row.setField(0, "abc"); - row.setField(1, 34); - row.setField(2, new Timestamp(currentMillis)); - nestedRow.setField(0, "bc"); - nestedRow.setField(1, "qwertyu".getBytes()); - row.setField(3, nestedRow); - - final TypeInformation typeInfo = Types.ROW( - new String[]{"a", "b", "c", "d"}, - new TypeInformation[]{Types.STRING(), Types.INT(), Types.SQL_TIMESTAMP(), - Types.ROW( - new String[]{"d1", "d2"}, - new TypeInformation[]{Types.STRING(), PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO} - )} - ); - - final CsvRowSerializationSchema schema = new CsvRowSerializationSchema(typeInfo); - byte[] result = schema.serialize(row); - String c1 = "abc"; - String c2 = String.valueOf(34); - String c3 = "\"" + new Timestamp(currentMillis).toString() + "\""; - String c4 = "bc;" + new String("qwertyu".getBytes()); - byte[] expect = (c1 + "," + c2 + "," + c3 + "," + c4 + "\n").getBytes(); - assertArrayEquals(expect, result); - } - - @Test - public void testCustomizedProperties() throws IOException { - final TypeInformation rowTypeInfo = Types.ROW( - new String[]{"a", "b", "c"}, - new TypeInformation[]{Types.STRING(), Types.STRING(), - Types.ROW( - new String[]{"c1", "c2"}, - new TypeInformation[]{Types.INT(), Types.STRING()} - )} - ); - - final Row row = new Row(3); - final Row nestedRow = new Row(2); - nestedRow.setField(0, 1); - nestedRow.setField(1, "zxv"); - row.setField(0, "12*3'4"); - row.setField(1, "a,bc"); - row.setField(2, nestedRow); - - final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowTypeInfo); - serializationSchema.setEscapeCharacter('*'); - serializationSchema.setQuoteCharacter('\''); - serializationSchema.setArrayElementDelimiter(":"); - byte[] result = serializationSchema.serialize(row); - - final String c1 = "'12**3''4'"; - final String c2 = "'a,bc'"; - final String c3 = "1:zxv"; - byte[] expect = (c1 + "," + c2 + "," + c3 + "\n").getBytes(); - assertArrayEquals(expect, result); - } - - @Test - public void testCharset() throws UnsupportedEncodingException { - final TypeInformation rowTypeInfo = Types.ROW( - new String[]{"a"}, - new TypeInformation[]{PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO} - ); - final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowTypeInfo); - serializationSchema.setCharset("UTF-16"); - - final Row row = new Row(1); - row.setField(0, "123".getBytes(StandardCharsets.UTF_16)); - byte[] result = serializationSchema.serialize(row); - byte[] expect = "123\n".getBytes(); - - assertArrayEquals(expect, result); - } - - @Test - public void testSerializationOfTwoRows() throws IOException { - final TypeInformation rowSchema = Types.ROW( - new String[] {"f1", "f2", "f3"}, - new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()}); - - final Row row1 = new Row(3); - row1.setField(0, 1); - row1.setField(1, true); - row1.setField(2, "str"); - - final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowSchema); - final CsvRowDeserializationSchema deserializationSchema = new CsvRowDeserializationSchema(rowSchema); - - byte[] bytes = serializationSchema.serialize(row1); - assertEquals(row1, deserializationSchema.deserialize(bytes)); - - final Row row2 = new Row(3); - row2.setField(0, 10); - row2.setField(1, false); - row2.setField(2, "newStr"); - - bytes = serializationSchema.serialize(row2); - assertEquals(row2, deserializationSchema.deserialize(bytes)); - } - - @Test - public void testNull() { - final TypeInformation rowTypeInfo = Types.ROW( - new String[]{"a"}, - new TypeInformation[]{Types.STRING()} - ); - final Row row = new Row(1); - row.setField(0, null); - - final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowTypeInfo); - serializationSchema.setNullValue("123"); - byte[] bytes = serializationSchema.serialize(row); - assertArrayEquals("123\n".getBytes(), bytes); - } - - @Test(expected = RuntimeException.class) - public void testSerializeRowWithInvalidNumberOfFields() { - final TypeInformation rowSchema = Types.ROW( - new String[] {"f1", "f2", "f3"}, - new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()}); - final Row row = new Row(1); - row.setField(0, 1); - - final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowSchema); - serializationSchema.serialize(row); - } - - @Test(expected = RuntimeException.class) - public void testSerializeNestedRowInNestedRow() { - final TypeInformation rowSchema = Types.ROW( - new String[]{"a"}, - new TypeInformation[]{Types.ROW( - new String[]{"a1"}, - new TypeInformation[]{Types.ROW( - new String[]{"a11"}, - new TypeInformation[]{Types.STRING()} - )} - )} - ); - final Row row = new Row(1); - final Row nestedRow = new Row(1); - final Row doubleNestedRow = new Row(1); - doubleNestedRow.setField(0, "123"); - nestedRow.setField(0, doubleNestedRow); - row.setField(0, nestedRow); - final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowSchema); - serializationSchema.serialize(row); - } - - private Row serializeAndDeserialize(TypeInformation rowSchema, Row row) throws IOException { - final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowSchema); - final CsvRowDeserializationSchema deserializationSchema = new CsvRowDeserializationSchema(rowSchema); - - final byte[] bytes = serializationSchema.serialize(row); - return deserializationSchema.deserialize(bytes); - } -} diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/table/descriptors/CsvTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/table/descriptors/CsvTest.java new file mode 100644 index 00000000000..7fe22509e56 --- /dev/null +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/table/descriptors/CsvTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.Types; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tests for the {@link Csv} descriptor. + */ +public class CsvTest extends DescriptorTestBase { + + private static final TypeInformation SCHEMA = Types.ROW( + new String[]{"a", "b", "c"}, + new TypeInformation[]{Types.STRING(), Types.INT(), Types.ROW( + new String[]{"a", "b", "c"}, + new TypeInformation[]{Types.STRING(), Types.INT(), Types.BOOLEAN()} + )} + ); + + private static final Descriptor CUSTOM_DESCRIPTOR_WITH_SCHEMA = new Csv() + .schema(SCHEMA) + .fieldDelimiter(';') + .lineDelimiter("\r\n") + .quoteCharacter('\'') + .allowComments() + .ignoreParseErrors() + .arrayElementDelimiter("|") + .escapeCharacter('\\') + .nullLiteral("n/a"); + + private static final Descriptor MINIMAL_DESCRIPTOR_WITH_DERIVED_SCHEMA = new Csv() + .deriveSchema(); + + @Test(expected = ValidationException.class) + public void testInvalidAllowComments() { + addPropertyAndVerify(CUSTOM_DESCRIPTOR_WITH_SCHEMA, "format.allow-comments", "DDD"); + } + + @Test(expected = ValidationException.class) + public void testMissingSchema() { + removePropertyAndVerify(CUSTOM_DESCRIPTOR_WITH_SCHEMA, "format.schema"); + } + + @Test(expected = ValidationException.class) + public void testDuplicateSchema() { + // we add an additional schema + addPropertyAndVerify( + MINIMAL_DESCRIPTOR_WITH_DERIVED_SCHEMA, + "format.schema", + "ROW>"); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public List descriptors() { + return Arrays.asList(CUSTOM_DESCRIPTOR_WITH_SCHEMA, MINIMAL_DESCRIPTOR_WITH_DERIVED_SCHEMA); + } + + @Override + public List> properties() { + final Map props1 = new HashMap<>(); + props1.put("format.type", "csv"); + props1.put("format.property-version", "1"); + props1.put("format.schema", "ROW>"); + props1.put("format.field-delimiter", ";"); + props1.put("format.line-delimiter", "\r\n"); + props1.put("format.quote-character", "'"); + props1.put("format.allow-comments", "true"); + props1.put("format.ignore-parse-errors", "true"); + props1.put("format.array-element-delimiter", "|"); + props1.put("format.escape-character", "\\"); + props1.put("format.null-literal", "n/a"); + + final Map props2 = new HashMap<>(); + props2.put("format.type", "csv"); + props2.put("format.property-version", "1"); + props2.put("format.derive-schema", "true"); + + return Arrays.asList(props1, props2); + } + + @Override + public DescriptorValidator validator() { + return new CsvValidator(); + } +} diff --git a/flink-formats/flink-json/pom.xml b/flink-formats/flink-json/pom.xml index dd52836743c..f8191b16604 100644 --- a/flink-formats/flink-json/pom.xml +++ b/flink-formats/flink-json/pom.xml @@ -60,11 +60,6 @@ under the License. - - org.apache.flink - flink-test-utils-junit - - org.apache.flink diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java index dc8a116ac62..df1fbc5d8e1 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java @@ -46,7 +46,7 @@ import java.util.Objects; *

Deserializes a byte[] message as a JSON object and reads * the specified fields. * - *

Failure during deserialization are forwarded as wrapped IOExceptions. + *

Failures during deserialization are forwarded as wrapped IOExceptions. */ @PublicEvolving public class JsonRowDeserializationSchema implements DeserializationSchema { diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java index 3c4888c96b0..7bc7cfbf054 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java @@ -93,11 +93,11 @@ public class Json extends FormatDescriptor { } /** - * Derives the format schema from the table's schema described using {@link Schema}. + * Derives the format schema from the table's schema described. * *

This allows for defining schema information only once. * - *

The names, types, and field order of the format are determined by the table's + *

The names, types, and fields' order of the format are determined by the table's * schema. Time attributes are ignored if their origin is not a field. A "from" definition * is interpreted as a field renaming in the format. */ diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml index a236770e26c..3b8e9d277c9 100644 --- a/flink-formats/pom.xml +++ b/flink-formats/pom.xml @@ -58,6 +58,7 @@ under the License. org.apache.flink flink-test-utils-junit + test diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Csv.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsv.scala similarity index 74% rename from flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Csv.scala rename to flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsv.scala index ebd6e6460aa..031e757b466 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Csv.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsv.scala @@ -22,7 +22,7 @@ import java.util import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.{TableSchema, ValidationException} -import org.apache.flink.table.descriptors.CsvValidator._ +import org.apache.flink.table.descriptors.OldCsvValidator._ import org.apache.flink.table.utils.TypeStringUtils import scala.collection.mutable @@ -30,8 +30,18 @@ import scala.collection.JavaConverters._ /** * Format descriptor for comma-separated values (CSV). + * + * Note: This descriptor describes Flink's non-standard CSV table source/sink. In the future, the + * descriptor will be replaced by a proper RFC-compliant version. Use the RFC-compliant `Csv` + * format in the dedicated `flink-formats/flink-csv` module instead when writing to Kafka. Use the + * old one for stream/batch filesystem operations for now. + * + * @deprecated Use the RFC-compliant `Csv` format in the dedicated + * `flink-formats/flink-csv` module instead when writing to Kafka. */ -class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) { +@Deprecated +@deprecated +class OldCsv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) { private var fieldDelim: Option[String] = None private var lineDelim: Option[String] = None @@ -41,17 +51,13 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) { private var commentPrefix: Option[String] = None private var isIgnoreFirstLine: Option[Boolean] = None private var lenient: Option[Boolean] = None - private var arrayElementDelim: Option[String] = None - private var escapeCharacter: Option[Character] = None - private var bytesCharset: Option[String] = None - private var derived: Option[Boolean] = None /** * Sets the field delimiter, "," by default. * * @param delim the field delimiter */ - def fieldDelimiter(delim: String): Csv = { + def fieldDelimiter(delim: String): OldCsv = { this.fieldDelim = Some(delim) this } @@ -61,7 +67,7 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) { * * @param delim the line delimiter */ - def lineDelimiter(delim: String): Csv = { + def lineDelimiter(delim: String): OldCsv = { this.lineDelim = Some(delim) this } @@ -74,7 +80,7 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) { * * @param schema the table schema */ - def schema(schema: TableSchema): Csv = { + def schema(schema: TableSchema): OldCsv = { this.schema.clear() schema.getFieldNames.zip(schema.getFieldTypes).foreach { case (n, t) => field(n, t) @@ -90,7 +96,7 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) { * @param fieldName the field name * @param fieldType the type information of the field */ - def field(fieldName: String, fieldType: TypeInformation[_]): Csv = { + def field(fieldName: String, fieldType: TypeInformation[_]): OldCsv = { field(fieldName, TypeStringUtils.writeTypeInfo(fieldType)) this } @@ -103,7 +109,7 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) { * @param fieldName the field name * @param fieldType the type string of the field */ - def field(fieldName: String, fieldType: String): Csv = { + def field(fieldName: String, fieldType: String): OldCsv = { if (schema.contains(fieldName)) { throw new ValidationException(s"Duplicate field name $fieldName.") } @@ -116,7 +122,7 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) { * * @param quote the quote character */ - def quoteCharacter(quote: Character): Csv = { + def quoteCharacter(quote: Character): OldCsv = { this.quoteCharacter = Option(quote) this } @@ -126,7 +132,7 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) { * * @param prefix the prefix to indicate comments */ - def commentPrefix(prefix: String): Csv = { + def commentPrefix(prefix: String): OldCsv = { this.commentPrefix = Option(prefix) this } @@ -134,7 +140,7 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) { /** * Ignore the first line. Not skip the first line by default. */ - def ignoreFirstLine(): Csv = { + def ignoreFirstLine(): OldCsv = { this.isIgnoreFirstLine = Some(true) this } @@ -142,45 +148,14 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) { /** * Skip records with parse error instead to fail. Throw an exception by default. */ - def ignoreParseErrors(): Csv = { + def ignoreParseErrors(): OldCsv = { this.lenient = Some(true) this } - /** - * Set delimiter of array elements, ';' by default. - */ - def arrayElementDelim(delim: String): Csv = { - this.arrayElementDelim = Some(delim) - this - } - - /** - * Set escape character, none by default. - */ - def escapeCharacter(escape: Character): Csv = { - this.escapeCharacter = Some(escape) - this - } - - /** - * Set charset of byte[], 'UTF-8' by defaut. - */ - def bytesCharset(charset: String): Csv = { - this.bytesCharset = Some(charset) - this - } - - /** - * Set true if format schema derives from table schema. - */ - def derived(derived: Boolean): Csv = { - this.derived = Some(derived) - this - } - override protected def toFormatProperties: util.Map[String, String] = { val properties = new DescriptorProperties() + fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _)) lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _)) @@ -206,14 +181,20 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) { /** * Format descriptor for comma-separated values (CSV). */ -object Csv { +object OldCsv { /** * Format descriptor for comma-separated values (CSV). * - * @deprecated Use `new Csv()`. + * Note: This descriptor describes Flink's non-standard CSV table source/sink. In the future, the + * descriptor will be replaced by a proper RFC-compliant version. Use the RFC-compliant `Csv` + * format in the dedicated `flink-formats/flink-csv` module instead when writing to Kafka. Use + * the old one for stream/batch filesystem operations for now. + * + * @deprecated Use the RFC-compliant `Csv` format in the dedicated + * `flink-formats/flink-csv` module instead when writing to Kafka. */ @deprecated - def apply(): Csv = new Csv() + def apply(): OldCsv = new OldCsv() } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsvValidator.scala similarity index 57% rename from flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala rename to flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsvValidator.scala index 97db7fa49bc..87c01a2675b 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsvValidator.scala @@ -18,14 +18,18 @@ package org.apache.flink.table.descriptors -import org.apache.flink.table.api.ValidationException -import org.apache.flink.table.descriptors.CsvValidator._ +import org.apache.flink.table.descriptors.OldCsvValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE /** - * Validator for [[Csv]]. + * Validator for [[OldCsv]]. + * + * @deprecated Use the RFC-compliant `Csv` format in the dedicated + * `flink-formats/flink-csv` module instead. */ -class CsvValidator extends FormatDescriptorValidator { +@Deprecated +@deprecated +class OldCsvValidator extends FormatDescriptorValidator { override def validate(properties: DescriptorProperties): Unit = { super.validate(properties) @@ -34,30 +38,13 @@ class CsvValidator extends FormatDescriptorValidator { properties.validateString(FORMAT_LINE_DELIMITER, true, 1) properties.validateString(FORMAT_QUOTE_CHARACTER, true, 1, 1) properties.validateString(FORMAT_COMMENT_PREFIX, true, 1) - properties.validateString(FORMAT_ARRAY_ELEMENT_DELIMITER, true, 1) - properties.validateString(FORMAT_ESCAPE_CHARACTER, true, 1, 1) - properties.validateString(FORMAT_BYTES_CHARSET, true, 1) - properties.validateString(FORMAT_NULL_VALUE, true, 1) properties.validateBoolean(FORMAT_IGNORE_FIRST_LINE, true) properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, true) - properties.validateBoolean(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA, true) - - val tableSchema = properties.getOptionalTableSchema(FORMAT_FIELDS) - val derived = properties.getOptionalBoolean( - FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA).orElse(false) - if (derived && tableSchema.isPresent) { - throw new ValidationException( - "Format cannot define a schema and derive from the table's schema at the same time.") - } else if (tableSchema.isPresent) { - properties.validateTableSchema(FORMAT_FIELDS, false) - } else if (!tableSchema.isPresent && derived) { - throw new ValidationException( - "A definition of a schema or derive from the table's schema is required.") - } + properties.validateTableSchema(FORMAT_FIELDS, false) } } -object CsvValidator { +object OldCsvValidator { val FORMAT_TYPE_VALUE = "csv" val FORMAT_FIELD_DELIMITER = "format.field-delimiter" @@ -67,8 +54,4 @@ object CsvValidator { val FORMAT_IGNORE_FIRST_LINE = "format.ignore-first-line" val FORMAT_IGNORE_PARSE_ERRORS = "format.ignore-parse-errors" val FORMAT_FIELDS = "format.fields" - val FORMAT_ARRAY_ELEMENT_DELIMITER = "format.array-element-delimiter" - val FORMAT_ESCAPE_CHARACTER = "format.escape-character" - val FORMAT_BYTES_CHARSET = "format.bytes-charset" - val FORMAT_NULL_VALUE = "format.null-value" } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala index 5ee9f4ea565..0892abaa9c9 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala @@ -22,7 +22,7 @@ import java.util import org.apache.flink.table.api.TableException import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ -import org.apache.flink.table.descriptors.CsvValidator._ +import org.apache.flink.table.descriptors.OldCsvValidator._ import org.apache.flink.table.descriptors.FileSystemValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.SchemaValidator._ @@ -69,7 +69,7 @@ abstract class CsvTableSinkFactoryBase extends TableFactory { // validate new FileSystemValidator().validate(params) - new CsvValidator().validate(params) + new OldCsvValidator().validate(params) new SchemaValidator( isStreaming, supportsSourceTimestamps = false, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala index 4a257bc5077..232dd3566c3 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala @@ -22,7 +22,7 @@ import java.util import org.apache.flink.table.api.TableException import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE} -import org.apache.flink.table.descriptors.CsvValidator._ +import org.apache.flink.table.descriptors.OldCsvValidator._ import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE} import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE} import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA @@ -74,7 +74,7 @@ abstract class CsvTableSourceFactoryBase extends TableFactory { // validate new FileSystemValidator().validate(params) - new CsvValidator().validate(params) + new OldCsvValidator().validate(params) new SchemaValidator( isStreaming, supportsSourceTimestamps = false, diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/OldCsvTest.scala similarity index 88% rename from flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala rename to flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/OldCsvTest.scala index 1c01e712e3a..11931849237 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/OldCsvTest.scala @@ -28,9 +28,9 @@ import org.junit.Test import scala.collection.JavaConverters._ /** - * Tests for [[Csv]]. + * Tests for [[OldCsv]]. */ -class CsvTest extends DescriptorTestBase { +class OldCsvTest extends DescriptorTestBase { @Test(expected = classOf[ValidationException]) def testInvalidType(): Unit = { @@ -47,20 +47,10 @@ class CsvTest extends DescriptorTestBase { addPropertyAndVerify(descriptors().get(0), "format.quote-character", "qq") } - @Test(expected = classOf[ValidationException]) - def testTwoSchemas(): Unit = { - addPropertyAndVerify(descriptors().get(0), "format.derive-schema", "true") - } - - @Test - def testOneSchema(): Unit = { - addPropertyAndVerify(descriptors().get(0), "format.derive-schema", "false") - } - // ---------------------------------------------------------------------------------------------- override def descriptors(): util.List[Descriptor] = { - val desc1 = Csv() + val desc1 = OldCsv() .field("field1", "STRING") .field("field2", Types.SQL_TIMESTAMP) .field("field3", TypeExtractor.createTypeInfo(classOf[Class[_]])) @@ -69,7 +59,7 @@ class CsvTest extends DescriptorTestBase { Array[TypeInformation[_]](Types.INT, Types.STRING))) .lineDelimiter("^") - val desc2 = Csv() + val desc2 = OldCsv() .schema(new TableSchema( Array[String]("test", "row"), Array[TypeInformation[_]](Types.INT, Types.STRING))) @@ -107,6 +97,6 @@ class CsvTest extends DescriptorTestBase { } override def validator(): DescriptorValidator = { - new CsvValidator() + new OldCsvValidator() } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala index dcc1ab94852..c5fa8f6d525 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala @@ -59,7 +59,7 @@ class TableDescriptorTest extends TableTestBase { val connector = FileSystem() .path("/path/to/csv") - val format = Csv() + val format = OldCsv() .field("myfield", Types.STRING) .field("myfield2", Types.INT) .field("myfield3", Types.MAP(Types.STRING, Types.INT)) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala index 1209595837b..198583849e2 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala @@ -26,7 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.table.api.TableSchema import org.apache.flink.table.catalog._ -import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} +import org.apache.flink.table.descriptors.{OldCsv, FileSystem, Schema} import org.apache.flink.table.sources.{BatchTableSource, CsvTableSource} object CommonTestData { @@ -71,7 +71,7 @@ object CommonTestData { val tempFilePath1 = writeToTempFile(csvRecord1.mkString("\n"), "csv-test1", "tmp") val connDesc1 = FileSystem().path(tempFilePath1) - val formatDesc1 = Csv() + val formatDesc1 = OldCsv() .field("a", Types.INT) .field("b", Types.LONG) .field("c", Types.STRING) @@ -110,7 +110,7 @@ object CommonTestData { val tempFilePath2 = writeToTempFile(csvRecord2.mkString("\n"), "csv-test2", "tmp") val connDesc2 = FileSystem().path(tempFilePath2) - val formatDesc2 = Csv() + val formatDesc2 = OldCsv() .field("d", Types.INT) .field("e", Types.LONG) .field("f", Types.INT) @@ -135,7 +135,7 @@ object CommonTestData { val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp") val connDesc3 = FileSystem().path(tempFilePath3) - val formatDesc3 = Csv() + val formatDesc3 = OldCsv() .field("x", Types.INT) .field("y", Types.LONG) .field("z", Types.STRING) diff --git a/tools/travis/stage.sh b/tools/travis/stage.sh index 683dd1312c3..da74ca24f7c 100644 --- a/tools/travis/stage.sh +++ b/tools/travis/stage.sh @@ -66,6 +66,8 @@ flink-filesystems/flink-s3-fs-hadoop,\ flink-filesystems/flink-s3-fs-presto,\ flink-formats/flink-avro,\ flink-formats/flink-parquet,\ +flink-formats/flink-json,\ +flink-formats/flink-csv,\ flink-connectors/flink-hbase,\ flink-connectors/flink-hcatalog,\ flink-connectors/flink-hadoop-compatibility,\ -- GitLab