提交 3d07a97c 编写于 作者: T Timo Walther

[FLINK-9964][table] Add a full CSV table format factory

It adds CsvRowSerializationSchema, CsvRowDeserializationSchema, a new CSV descriptor,
CsvRowTableFormatFactory, documentation and tests

The format integrates nicely with most SQL types.

It deprecates the "old CSV" descriptor stack and prepares also for FLINK-7050 (#4660).

The old CSV descriptor is still available under as "OldCsv".

This closes #7777.
上级 c3277c55
......@@ -52,11 +52,12 @@ The following tables list all available connectors and formats. Their mutual com
### Formats
| Name | Maven dependency | SQL Client JAR |
| :---------------- | :--------------------------- | :--------------------- |
| CSV | Built-in | Built-in |
| JSON | `flink-json` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar) |
| Apache Avro | `flink-avro` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar) |
| Name | Maven dependency | SQL Client JAR |
| :------------------------- | :--------------------------- | :--------------------- |
| Old CSV (for files) | Built-in | Built-in |
| CSV (for Kafka) | `flink-csv` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-csv/{{site.version}}/flink-csv-{{site.version}}-sql-jar.jar) |
| JSON | `flink-json` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar) |
| Apache Avro | `flink-avro` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar) |
{% else %}
......@@ -708,21 +709,47 @@ A format tag indicates the format type for matching with a connector.
### CSV Format
The CSV format allows to read and write comma-separated rows.
<span class="label label-info">Format: Serialization Schema</span>
<span class="label label-info">Format: Deserialization Schema</span>
The CSV format aims to comply with [RFC-4180](https://tools.ietf.org/html/rfc4180) ("Common Format and
MIME Type for Comma-Separated Values (CSV) Files") proposed by the Internet Engineering Task Force (IETF).
The format allows to read and write CSV data that corresponds to a given format schema. The format schema can be
defined either as a Flink type or derived from the desired table schema.
If the format schema is equal to the table schema, the schema can also be automatically derived. This allows for
defining schema information only once. The names, types, and fields' order of the format are determined by the
table's schema. Time attributes are ignored if their origin is not a field. A `from` definition in the table
schema is interpreted as a field renaming in the format.
The CSV format can be used as follows:
<div class="codetabs" markdown="1">
<div data-lang="Java/Scala" markdown="1">
{% highlight java %}
.withFormat(
new Csv()
.field("field1", Types.STRING) // required: ordered format fields
.field("field2", Types.TIMESTAMP)
.fieldDelimiter(",") // optional: string delimiter "," by default
.lineDelimiter("\n") // optional: string delimiter "\n" by default
.quoteCharacter('"') // optional: single character for string values, empty by default
.commentPrefix('#') // optional: string to indicate comments, empty by default
.ignoreFirstLine() // optional: ignore the first line, by default it is not skipped
.ignoreParseErrors() // optional: skip records with parse error instead of failing by default
// required: define the schema either by using type information
.schema(Type.ROW(...))
// or use the table's schema
.deriveSchema()
.fieldDelimiter(';') // optional: field delimiter character (',' by default)
.lineDelimiter("\r\n") // optional: line delimiter ("\n" by default;
// otherwise "\r" or "\r\n" are allowed)
.quoteCharacter('\'') // optional: quote character for enclosing field values ('"' by default)
.allowComments() // optional: ignores comment lines that start with '#' (disabled by default);
// if enabled, make sure to also ignore parse errors to allow empty rows
.ignoreParseErrors() // optional: skip fields and rows with parse errors instead of failing;
// fields are set to null in case of errors
.arrayElementDelimiter("|") // optional: the array element delimiter string for separating
// array and row element values (";" by default)
.escapeCharacter('\\') // optional: escape character for escaping values (disabled by default)
.nullLiteral("n/a") // optional: null literal string that is interpreted as a
// null value (disabled by default)
)
{% endhighlight %}
</div>
......@@ -731,24 +758,76 @@ The CSV format allows to read and write comma-separated rows.
{% highlight yaml %}
format:
type: csv
fields: # required: ordered format fields
- name: field1
type: VARCHAR
- name: field2
type: TIMESTAMP
field-delimiter: "," # optional: string delimiter "," by default
line-delimiter: "\n" # optional: string delimiter "\n" by default
quote-character: '"' # optional: single character for string values, empty by default
comment-prefix: '#' # optional: string to indicate comments, empty by default
ignore-first-line: false # optional: boolean flag to ignore the first line, by default it is not skipped
ignore-parse-errors: true # optional: skip records with parse error instead of failing by default
# required: define the schema either by using type information
schema: "ROW(lon FLOAT, rideTime TIMESTAMP)"
# or use the table's schema
derive-schema: true
field-delimiter: ";" # optional: field delimiter character (',' by default)
line-delimiter: "\r\n" # optional: line delimiter ("\n" by default; otherwise "\r" or "\r\n" are allowed)
quote-character: "'" # optional: quote character for enclosing field values ('"' by default)
allow-comments: true # optional: ignores comment lines that start with "#" (disabled by default);
# if enabled, make sure to also ignore parse errors to allow empty rows
ignore-parse-errors: true # optional: skip fields and rows with parse errors instead of failing;
# fields are set to null in case of errors
array-element-delimiter: "|" # optional: the array element delimiter string for separating
# array and row element values (";" by default)
escape-character: "\\" # optional: escape character for escaping values (disabled by default)
null-literal: "n/a" # optional: null literal string that is interpreted as a
# null value (disabled by default)
{% endhighlight %}
</div>
</div>
The CSV format is included in Flink and does not require additional dependencies.
The following table lists supported types that can be read and written:
| Supported Flink SQL Types |
| :------------------------ |
| `ROW` |
| `VARCHAR` |
| `ARRAY[_]` |
| `INT` |
| `BIGINT` |
| `FLOAT` |
| `DOUBLE` |
| `BOOLEAN` |
| `DATE` |
| `TIME` |
| `TIMESTAMP` |
| `DECIMAL` |
| `NULL` (unsupported yet) |
**Numeric types:** Value should be a number but the literal `"null"` can also be understood. An empty string is
considered `null`. Values are also trimmed (leading/trailing white space). Numbers are parsed using
Java's `valueOf` semantics. Other non-numeric strings may cause a parsing exception.
**String and time types:** Value is not trimmed. The literal `"null"` can also be understood. Time types
must be formatted according to the Java SQL time format with millisecond precision. For example:
`2018-01-01` for date, `20:43:59` for time, and `2018-01-01 20:43:59.999` for timestamp.
**Boolean type:** Value is expected to be a boolean (`"true"`, `"false"`) string or `"null"`. Empty strings are
interpreted as `false`. Values are trimmed (leading/trailing white space). Other values result in an exception.
**Nested types:** Array and row types are supported for one level of nesting using the array element delimiter.
**Primitive byte arrays:** Primitive byte arrays are handled in Base64-encoded representation.
**Line endings:** Line endings need to be considered even for row-based connectors (such as Kafka)
to be ignored for unquoted string fields at the end of a row.
**Escaping and quoting:** The following table shows examples of how escaping and quoting affect the parsing
of a string using `*` for escaping and `'` for quoting:
| CSV Field | Parsed String |
| :---------------- | :------------------- |
| `123*'4**` | `123'4*` |
| `'123''4**'` | `123'4*` |
| `'a;b*'c'` | `a;b'c` |
| `'a;b''c'` | `a;b'c` |
<span class="label label-danger">Attention</span> The CSV format for writing rows is limited at the moment. Only a custom field delimiter is supported as optional parameter.
Make sure to add the CSV format as a dependency.
### JSON Format
......@@ -757,7 +836,9 @@ The CSV format is included in Flink and does not require additional dependencies
The JSON format allows to read and write JSON data that corresponds to a given format schema. The format schema can be defined either as a Flink type, as a JSON schema, or derived from the desired table schema. A Flink type enables a more SQL-like definition and mapping to the corresponding SQL data types. The JSON schema allows for more complex and nested structures.
If the format schema is equal to the table schema, the schema can also be automatically derived. This allows for defining schema information only once. The names, types, and field order of the format are determined by the table's schema. Time attributes are ignored if their origin is not a field. A `from` definition in the table schema is interpreted as a field renaming in the format.
If the format schema is equal to the table schema, the schema can also be automatically derived. This allows for defining schema information only once. The names, types, and fields' order of the format are determined by the table's schema. Time attributes are ignored if their origin is not a field. A `from` definition in the table schema is interpreted as a field renaming in the format.
The JSON format can be used as follows:
<div class="codetabs" markdown="1">
<div data-lang="Java/Scala" markdown="1">
......@@ -837,7 +918,6 @@ The following table shows the mapping of JSON schema types to Flink SQL types:
| `string` with `encoding: base64` | `ARRAY[TINYINT]` |
| `null` | `NULL` (unsupported yet)|
Currently, Flink supports only a subset of the [JSON schema specification](http://json-schema.org/) `draft-07`. Union types (as well as `allOf`, `anyOf`, `not`) are not supported yet. `oneOf` and arrays of types are only supported for specifying nullability.
Simple references that link to a common definition in the document are supported as shown in the more complex example below:
......@@ -891,7 +971,6 @@ Simple references that link to a common definition in the document are supported
Make sure to add the JSON format as a dependency.
### Apache Avro Format
<span class="label label-info">Format: Serialization Schema</span>
......@@ -899,6 +978,8 @@ Make sure to add the JSON format as a dependency.
The [Apache Avro](https://avro.apache.org/) format allows to read and write Avro data that corresponds to a given format schema. The format schema can be defined either as a fully qualified class name of an Avro specific record or as an Avro schema string. If a class name is used, the class must be available in the classpath during runtime.
The Avro format can be used as follows:
<div class="codetabs" markdown="1">
<div data-lang="Java/Scala" markdown="1">
{% highlight java %}
......@@ -975,6 +1056,56 @@ Avro uses [Joda-Time](http://www.joda.org/joda-time/) for representing logical d
Make sure to add the Apache Avro dependency.
### Old CSV Format
<span class="label label-danger">Attention</span> For prototyping purposes only!
The old CSV format allows to read and write comma-separated rows using the filesystem connector.
This format describes Flink's non-standard CSV table source/sink. In the future, the format will be
replaced by a proper RFC-compliant version. Use the RFC-compliant CSV format when writing to Kafka.
Use the old one for stream/batch filesystem operations for now.
<div class="codetabs" markdown="1">
<div data-lang="Java/Scala" markdown="1">
{% highlight java %}
.withFormat(
new OldCsv()
.field("field1", Types.STRING) // required: ordered format fields
.field("field2", Types.TIMESTAMP)
.fieldDelimiter(",") // optional: string delimiter "," by default
.lineDelimiter("\n") // optional: string delimiter "\n" by default
.quoteCharacter('"') // optional: single character for string values, empty by default
.commentPrefix('#') // optional: string to indicate comments, empty by default
.ignoreFirstLine() // optional: ignore the first line, by default it is not skipped
.ignoreParseErrors() // optional: skip records with parse error instead of failing by default
)
{% endhighlight %}
</div>
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
format:
type: csv
fields: # required: ordered format fields
- name: field1
type: VARCHAR
- name: field2
type: TIMESTAMP
field-delimiter: "," # optional: string delimiter "," by default
line-delimiter: "\n" # optional: string delimiter "\n" by default
quote-character: '"' # optional: single character for string values, empty by default
comment-prefix: '#' # optional: string to indicate comments, empty by default
ignore-first-line: false # optional: boolean flag to ignore the first line, by default it is not skipped
ignore-parse-errors: true # optional: skip records with parse error instead of failing by default
{% endhighlight %}
</div>
</div>
The old CSV format is included in Flink and does not require additional dependencies.
<span class="label label-danger">Attention</span> The old CSV format for writing rows is limited at the moment. Only a custom field delimiter is supported as optional parameter.
{% top %}
Further TableSources and TableSinks
......
......@@ -165,4 +165,4 @@ public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializer
Kryo kryo = serializer.getKryo();
assertTrue(kryo.getReferences());
}
}
\ No newline at end of file
}
......@@ -60,6 +60,14 @@ under the License.
<classifier>sql-jar</classifier>
<scope>provided</scope>
</dependency>
<dependency>
<!-- Used by maven-dependency-plugin -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${project.version}</version>
<classifier>sql-jar</classifier>
<scope>provided</scope>
</dependency>
<dependency>
<!-- Used by maven-dependency-plugin -->
<groupId>org.apache.flink</groupId>
......@@ -159,6 +167,13 @@ under the License.
<classifier>sql-jar</classifier>
<type>jar</type>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${project.version}</version>
<classifier>sql-jar</classifier>
<type>jar</type>
</artifactItem>
<!-- This SQL JAR is not used for now to avoid dependency conflicts; see FLINK-10107.
<artifactItem>
<groupId>org.apache.flink</groupId>
......
......@@ -18,6 +18,7 @@
package org.apache.flink.table.descriptors;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;
import org.apache.avro.specific.SpecificRecord;
......@@ -27,6 +28,7 @@ import java.util.Map;
/**
* Format descriptor for Apache Avro records.
*/
@PublicEvolving
public class Avro extends FormatDescriptor {
private Class<? extends SpecificRecord> recordClass;
......
......@@ -25,7 +25,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-formats</artifactId>
<version>1.7-SNAPSHOT</version>
<version>1.8-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
......@@ -40,49 +40,79 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<artifactId>flink-shaded-jackson</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<!-- use a dedicated Scala version to not depend on it -->
<artifactId>flink-table_2.11</artifactId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<!-- Projects depending on this project, won't depend on flink-table. -->
<optional>true</optional>
</dependency>
<!-- jackson -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
<version>2.7.9</version>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!-- test dependencies -->
<!-- CSV table descriptor testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<!-- TODO This could be dropped if we change CsvRowFormatFactoryTest -->
<dependency>
<groupId>org.apache.flink</groupId>
<!-- use a dedicated Scala version to not depend on it -->
<artifactId>flink-table_2.11</artifactId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- flink-table needs Scala -->
<!-- TODO This could be dropped if we change CsvRowFormatFactoryTest -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<!-- Create SQL Client uber jars by default -->
<profile>
<id>sql-jars</id>
<activation>
<property>
<name>!skipSqlJars</name>
</property>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>sql-jar</classifier>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
......@@ -18,72 +18,153 @@
package org.apache.flink.formats.csv;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.io.Serializable;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Objects;
/**
* Deserialization schema from CSV to Flink types.
*
* <p>Deserializes a <code>byte[]</code> message as a {@link JsonNode} and
* convert it to {@link Row}.
* converts it to {@link Row}.
*
* <p>Failure during deserialization are forwarded as wrapped IOExceptions.
* <p>Failure during deserialization are forwarded as wrapped {@link IOException}s.
*/
@Public
@PublicEvolving
public final class CsvRowDeserializationSchema implements DeserializationSchema<Row> {
/** Schema describing the input csv data. */
private CsvSchema csvSchema;
private static final long serialVersionUID = 2135553495874539201L;
/** Type information describing the input csv data. */
private TypeInformation<Row> rowTypeInfo;
/** Type information describing the result type. */
private final TypeInformation<Row> typeInfo;
/** ObjectReader used to read message, it will be changed when csvSchema is changed. */
private ObjectReader objectReader;
/** Runtime instance that performs the actual work. */
private final RuntimeConverter runtimeConverter;
/** Charset for byte[]. */
private String charset = "UTF-8";
/** Schema describing the input CSV data. */
private final CsvSchema csvSchema;
/** Object reader used to read rows. It is configured by {@link CsvSchema}. */
private final ObjectReader objectReader;
/** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
private final boolean ignoreParseErrors;
private CsvRowDeserializationSchema(
RowTypeInfo typeInfo,
CsvSchema csvSchema,
boolean ignoreParseErrors) {
this.typeInfo = typeInfo;
this.runtimeConverter = createRowRuntimeConverter(typeInfo, ignoreParseErrors, true);
this.csvSchema = csvSchema;
this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
this.ignoreParseErrors = ignoreParseErrors;
}
/**
* Create a csv row DeserializationSchema with given {@link TypeInformation}.
* A builder for creating a {@link CsvRowDeserializationSchema}.
*/
public CsvRowDeserializationSchema(TypeInformation<Row> rowTypeInfo) {
Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not be null !");
CsvMapper csvMapper = new CsvMapper();
this.rowTypeInfo = rowTypeInfo;
this.csvSchema = CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
this.objectReader = csvMapper.readerFor(JsonNode.class).with(csvSchema);
this.setNullValue("null");
@PublicEvolving
public static class Builder {
private final RowTypeInfo typeInfo;
private CsvSchema csvSchema;
private boolean ignoreParseErrors;
/**
* Creates a CSV deserialization schema for the given {@link TypeInformation} with
* optional parameters.
*/
public Builder(TypeInformation<Row> typeInfo) {
Preconditions.checkNotNull(typeInfo, "Type information must not be null.");
if (!(typeInfo instanceof RowTypeInfo)) {
throw new IllegalArgumentException("Row type information expected.");
}
this.typeInfo = (RowTypeInfo) typeInfo;
this.csvSchema = CsvRowSchemaConverter.convert((RowTypeInfo) typeInfo);
}
public Builder setFieldDelimiter(char delimiter) {
this.csvSchema = this.csvSchema.rebuild().setColumnSeparator(delimiter).build();
return this;
}
public Builder setAllowComments(boolean allowComments) {
this.csvSchema = this.csvSchema.rebuild().setAllowComments(allowComments).build();
return this;
}
public Builder setArrayElementDelimiter(String delimiter) {
Preconditions.checkNotNull(delimiter, "Array element delimiter must not be null.");
this.csvSchema = this.csvSchema.rebuild().setArrayElementSeparator(delimiter).build();
return this;
}
public Builder setQuoteCharacter(char c) {
this.csvSchema = this.csvSchema.rebuild().setQuoteChar(c).build();
return this;
}
public Builder setEscapeCharacter(char c) {
this.csvSchema = this.csvSchema.rebuild().setEscapeChar(c).build();
return this;
}
public Builder setNullLiteral(String nullLiteral) {
Preconditions.checkNotNull(nullLiteral, "Null literal must not be null.");
this.csvSchema = this.csvSchema.rebuild().setNullValue(nullLiteral).build();
return this;
}
public Builder setIgnoreParseErrors(boolean ignoreParseErrors) {
this.ignoreParseErrors = ignoreParseErrors;
return this;
}
public CsvRowDeserializationSchema build() {
return new CsvRowDeserializationSchema(
typeInfo,
csvSchema,
ignoreParseErrors);
}
}
@Override
public Row deserialize(byte[] message) throws IOException {
JsonNode root = objectReader.readValue(message);
return convertRow(root, (RowTypeInfo) rowTypeInfo);
try {
final JsonNode root = objectReader.readValue(message);
return (Row) runtimeConverter.convert(root);
} catch (Throwable t) {
if (ignoreParseErrors) {
return null;
}
throw new IOException("Failed to deserialize CSV row '" + new String(message) + "'.", t);
}
}
@Override
......@@ -93,133 +174,195 @@ public final class CsvRowDeserializationSchema implements DeserializationSchema<
@Override
public TypeInformation<Row> getProducedType() {
return rowTypeInfo;
return typeInfo;
}
private Row convertRow(JsonNode root, RowTypeInfo rowTypeInfo) {
String[] fields = rowTypeInfo.getFieldNames();
TypeInformation<?>[] types = rowTypeInfo.getFieldTypes();
Row row = new Row(fields.length);
for (int i = 0; i < fields.length; i++) {
String columnName = fields[i];
JsonNode node = root.get(columnName);
row.setField(i, convert(node, types[i]));
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || o.getClass() != this.getClass()) {
return false;
}
return row;
final CsvRowDeserializationSchema that = (CsvRowDeserializationSchema) o;
final CsvSchema otherSchema = that.csvSchema;
return typeInfo.equals(that.typeInfo) &&
ignoreParseErrors == that.ignoreParseErrors &&
csvSchema.getColumnSeparator() == otherSchema.getColumnSeparator() &&
csvSchema.allowsComments() == otherSchema.allowsComments() &&
csvSchema.getArrayElementSeparator().equals(otherSchema.getArrayElementSeparator()) &&
csvSchema.getQuoteChar() == otherSchema.getQuoteChar() &&
csvSchema.getEscapeChar() == otherSchema.getEscapeChar() &&
Arrays.equals(csvSchema.getNullValue(), otherSchema.getNullValue());
}
private Row convertRow(ArrayNode node, RowTypeInfo rowTypeInfo) {
TypeInformation[] types = rowTypeInfo.getFieldTypes();
String[] fields = rowTypeInfo.getFieldNames();
Row row = new Row(fields.length);
for (int i = 0; i < fields.length; i++) {
row.setField(i, convert(node.get(i), types[i]));
}
return row;
@Override
public int hashCode() {
return Objects.hash(
typeInfo,
ignoreParseErrors,
csvSchema.getColumnSeparator(),
csvSchema.allowsComments(),
csvSchema.getArrayElementSeparator(),
csvSchema.getQuoteChar(),
csvSchema.getEscapeChar(),
csvSchema.getNullValue());
}
/**
* Converts json node to object with given type information.
* @param node json node to be converted.
* @param info type information for the json data.
* @return converted object
*/
private Object convert(JsonNode node, TypeInformation<?> info) {
if (node instanceof NullNode) {
return null;
}
if (info == Types.STRING) {
return node.asText();
} else if (info == Types.LONG) {
return node.asLong();
} else if (info == Types.INT) {
return node.asInt();
} else if (info == Types.DOUBLE) {
return node.asDouble();
} else if (info == Types.FLOAT) {
return Double.valueOf(node.asDouble()).floatValue();
} else if (info == Types.BIG_DEC) {
return BigDecimal.valueOf(node.asDouble());
} else if (info == Types.BIG_INT) {
return BigInteger.valueOf(node.asLong());
} else if (info == Types.SQL_DATE) {
return Date.valueOf(node.asText());
} else if (info == Types.SQL_TIME) {
return Time.valueOf(node.asText());
} else if (info == Types.SQL_TIMESTAMP) {
return Timestamp.valueOf(node.asText());
} else if (info == Types.BOOLEAN) {
return node.asBoolean();
} else if (info instanceof RowTypeInfo) {
return convertRow((ArrayNode) node, (RowTypeInfo) info);
} else if (info instanceof BasicArrayTypeInfo) {
return convertArray((ArrayNode) node, ((BasicArrayTypeInfo) info).getComponentInfo());
} else if (info instanceof PrimitiveArrayTypeInfo &&
((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) {
return convertByteArray((TextNode) node);
} else {
throw new RuntimeException("Unable to support type " + info.toString() + " yet");
}
// --------------------------------------------------------------------------------------------
private interface RuntimeConverter extends Serializable {
Object convert(JsonNode node);
}
private Object[] convertArray(ArrayNode node, TypeInformation<?> elementType) {
final Object[] array = (Object[]) Array.newInstance(elementType.getTypeClass(), node.size());
for (int i = 0; i < node.size(); i++) {
array[i] = convert(node.get(i), elementType);
}
return array;
private static RuntimeConverter createRowRuntimeConverter(
RowTypeInfo rowTypeInfo,
boolean ignoreParseErrors,
boolean isTopLevel) {
final TypeInformation<?>[] fieldTypes = rowTypeInfo.getFieldTypes();
final String[] fieldNames = rowTypeInfo.getFieldNames();
final RuntimeConverter[] fieldConverters =
createFieldRuntimeConverters(ignoreParseErrors, fieldTypes);
return assembleRowRuntimeConverter(ignoreParseErrors, isTopLevel, fieldNames, fieldConverters);
}
private byte[] convertByteArray(TextNode node) {
try {
return node.asText().getBytes(charset);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("Unsupport encoding charset" + charset, e);
private static RuntimeConverter[] createFieldRuntimeConverters(boolean ignoreParseErrors, TypeInformation<?>[] fieldTypes) {
final RuntimeConverter[] fieldConverters = new RuntimeConverter[fieldTypes.length];
for (int i = 0; i < fieldTypes.length; i++) {
fieldConverters[i] = createNullableRuntimeConverter(fieldTypes[i], ignoreParseErrors);
}
return fieldConverters;
}
public void setCharset(String charset) {
this.charset = charset;
}
private static RuntimeConverter assembleRowRuntimeConverter(
boolean ignoreParseErrors,
boolean isTopLevel,
String[] fieldNames,
RuntimeConverter[] fieldConverters) {
final int rowArity = fieldNames.length;
public void setFieldDelimiter(String s) {
if (s.length() != 1) {
throw new RuntimeException("FieldDelimiter's length must be one !");
}
this.csvSchema = this.csvSchema.rebuild().setColumnSeparator(s.charAt(0)).build();
return (node) -> {
final int nodeSize = node.size();
validateArity(rowArity, nodeSize, ignoreParseErrors);
final Row row = new Row(rowArity);
for (int i = 0; i < Math.min(rowArity, nodeSize); i++) {
// Jackson only supports mapping by name in the first level
if (isTopLevel) {
row.setField(i, fieldConverters[i].convert(node.get(fieldNames[i])));
} else {
row.setField(i, fieldConverters[i].convert(node.get(i)));
}
}
return row;
};
}
public void setArrayElementDelimiter(String s) {
this.csvSchema = this.csvSchema.rebuild().setArrayElementSeparator(s).build();
this.objectReader = objectReader.with(csvSchema);
private static RuntimeConverter createNullableRuntimeConverter(
TypeInformation<?> info,
boolean ignoreParseErrors) {
final RuntimeConverter valueConverter = createRuntimeConverter(info, ignoreParseErrors);
return (node) -> {
if (node.isNull()) {
return null;
}
try {
return valueConverter.convert(node);
} catch (Throwable t) {
if (!ignoreParseErrors) {
throw t;
}
return null;
}
};
}
public void setQuoteCharacter(char c) {
this.csvSchema = this.csvSchema.rebuild().setQuoteChar(c).build();
this.objectReader = objectReader.with(csvSchema);
private static RuntimeConverter createRuntimeConverter(TypeInformation<?> info, boolean ignoreParseErrors) {
if (info.equals(Types.VOID)) {
return (node) -> null;
} else if (info.equals(Types.STRING)) {
return JsonNode::asText;
} else if (info.equals(Types.BOOLEAN)) {
return (node) -> Boolean.valueOf(node.asText().trim());
} else if (info.equals(Types.BYTE)) {
return (node) -> Byte.valueOf(node.asText().trim());
} else if (info.equals(Types.SHORT)) {
return (node) -> Short.valueOf(node.asText().trim());
} else if (info.equals(Types.INT)) {
return (node) -> Integer.valueOf(node.asText().trim());
} else if (info.equals(Types.LONG)) {
return (node) -> Long.valueOf(node.asText().trim());
} else if (info.equals(Types.FLOAT)) {
return (node) -> Float.valueOf(node.asText().trim());
} else if (info.equals(Types.DOUBLE)) {
return (node) -> Double.valueOf(node.asText().trim());
} else if (info.equals(Types.BIG_DEC)) {
return (node) -> new BigDecimal(node.asText().trim());
} else if (info.equals(Types.BIG_INT)) {
return (node) -> new BigInteger(node.asText().trim());
} else if (info.equals(Types.SQL_DATE)) {
return (node) -> Date.valueOf(node.asText());
} else if (info.equals(Types.SQL_TIME)) {
return (node) -> Time.valueOf(node.asText());
} else if (info.equals(Types.SQL_TIMESTAMP)) {
return (node) -> Timestamp.valueOf(node.asText());
} else if (info instanceof RowTypeInfo) {
final RowTypeInfo rowTypeInfo = (RowTypeInfo) info;
return createRowRuntimeConverter(rowTypeInfo, ignoreParseErrors, false);
} else if (info instanceof BasicArrayTypeInfo) {
return createObjectArrayRuntimeConverter(
((BasicArrayTypeInfo<?, ?>) info).getComponentInfo(),
ignoreParseErrors);
} else if (info instanceof ObjectArrayTypeInfo) {
return createObjectArrayRuntimeConverter(
((ObjectArrayTypeInfo<?, ?>) info).getComponentInfo(),
ignoreParseErrors);
} else if (info instanceof PrimitiveArrayTypeInfo &&
((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) {
return createByteArrayRuntimeConverter(ignoreParseErrors);
} else {
throw new RuntimeException("Unsupported type information '" + info + "'.");
}
}
public void setEscapeCharacter(char c) {
this.csvSchema = this.csvSchema.rebuild().setEscapeChar(c).build();
this.objectReader = objectReader.with(csvSchema);
private static RuntimeConverter createObjectArrayRuntimeConverter(
TypeInformation<?> elementType,
boolean ignoreParseErrors) {
final Class<?> elementClass = elementType.getTypeClass();
final RuntimeConverter elementConverter = createNullableRuntimeConverter(elementType, ignoreParseErrors);
return (node) -> {
final int nodeSize = node.size();
final Object[] array = (Object[]) Array.newInstance(elementClass, nodeSize);
for (int i = 0; i < nodeSize; i++) {
array[i] = elementConverter.convert(node.get(i));
}
return array;
};
}
public void setNullValue(String s) {
this.csvSchema = this.csvSchema.rebuild().setNullValue(s).build();
this.objectReader = objectReader.with(csvSchema);
private static RuntimeConverter createByteArrayRuntimeConverter(boolean ignoreParseErrors) {
return (node) -> {
try {
return node.binaryValue();
} catch (IOException e) {
if (!ignoreParseErrors) {
throw new RuntimeException("Unable to deserialize byte array.", e);
}
return null;
}
};
}
@Override
public boolean equals(Object o) {
if (o == null || o.getClass() != this.getClass()) {
return false;
private static void validateArity(int expected, int actual, boolean ignoreParseErrors) {
if (expected != actual && !ignoreParseErrors) {
throw new RuntimeException("Row length mismatch. " + expected +
" fields expected but was " + actual + ".");
}
if (this == o) {
return true;
}
final CsvRowDeserializationSchema that = (CsvRowDeserializationSchema) o;
return rowTypeInfo.equals(that.rowTypeInfo) &&
csvSchema.toString().equals(that.csvSchema.toString());
}
}
......@@ -21,123 +21,117 @@ package org.apache.flink.formats.csv;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.descriptors.CsvValidator;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FormatDescriptorValidator;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.factories.DeserializationSchemaFactory;
import org.apache.flink.table.factories.SerializationSchemaFactory;
import org.apache.flink.table.factories.TableFormatFactoryBase;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Table format for providing configured instances of CSV-to-row {@link SerializationSchema}
* Table format factory for providing configured instances of CSV-to-row {@link SerializationSchema}
* and {@link DeserializationSchema}.
*/
public final class CsvRowFormatFactory implements SerializationSchemaFactory<Row>,
DeserializationSchemaFactory<Row> {
public final class CsvRowFormatFactory extends TableFormatFactoryBase<Row>
implements SerializationSchemaFactory<Row>, DeserializationSchemaFactory<Row> {
@Override
public Map<String, String> requiredContext() {
final Map<String, String> context = new HashMap<>();
context.put(FormatDescriptorValidator.FORMAT_TYPE(), CsvValidator.FORMAT_TYPE_VALUE());
context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION(), "1");
return context;
public CsvRowFormatFactory() {
super(CsvValidator.FORMAT_TYPE_VALUE, 1, true);
}
@Override
public boolean supportsSchemaDerivation() {
return true;
}
@Override
public List<String> supportedProperties() {
public List<String> supportedFormatProperties() {
final List<String> properties = new ArrayList<>();
properties.add(CsvValidator.FORMAT_FIELDS() + ".#." + DescriptorProperties.TYPE());
properties.add(CsvValidator.FORMAT_FIELDS() + ".#." + DescriptorProperties.NAME());
properties.add(CsvValidator.FORMAT_FIELD_DELIMITER());
properties.add(CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER());
properties.add(CsvValidator.FORMAT_QUOTE_CHARACTER());
properties.add(CsvValidator.FORMAT_ESCAPE_CHARACTER());
properties.add(CsvValidator.FORMAT_BYTES_CHARSET());
properties.add(CsvValidator.FORMAT_NULL_VALUE());
properties.add(SchemaValidator.SCHEMA() + ".#." + SchemaValidator.SCHEMA_TYPE());
properties.add(SchemaValidator.SCHEMA() + ".#." + SchemaValidator.SCHEMA_NAME());
properties.add(CsvValidator.FORMAT_FIELD_DELIMITER);
properties.add(CsvValidator.FORMAT_LINE_DELIMITER);
properties.add(CsvValidator.FORMAT_QUOTE_CHARACTER);
properties.add(CsvValidator.FORMAT_ALLOW_COMMENTS);
properties.add(CsvValidator.FORMAT_IGNORE_PARSE_ERRORS);
properties.add(CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER);
properties.add(CsvValidator.FORMAT_ESCAPE_CHARACTER);
properties.add(CsvValidator.FORMAT_NULL_LITERAL);
properties.add(CsvValidator.FORMAT_SCHEMA);
return properties;
}
@Override
public DeserializationSchema<Row> createDeserializationSchema(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = validateAndGetProperties(properties);
final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
final CsvRowDeserializationSchema schema = new CsvRowDeserializationSchema(
final CsvRowDeserializationSchema.Builder schemaBuilder = new CsvRowDeserializationSchema.Builder(
createTypeInformation(descriptorProperties));
// update csv schema with properties
descriptorProperties.getOptionalString(CsvValidator.FORMAT_FIELD_DELIMITER())
.ifPresent(schema::setFieldDelimiter);
descriptorProperties.getOptionalString(CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER())
.ifPresent(schema::setArrayElementDelimiter);
descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_QUOTE_CHARACTER())
.ifPresent(schema::setQuoteCharacter);
descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_ESCAPE_CHARACTER())
.ifPresent(schema::setEscapeCharacter);
descriptorProperties.getOptionalString(CsvValidator.FORMAT_BYTES_CHARSET())
.ifPresent(schema::setCharset);
descriptorProperties.getOptionalString(CsvValidator.FORMAT_NULL_VALUE())
.ifPresent(schema::setCharset);
return new CsvRowDeserializationSchema(createTypeInformation(descriptorProperties));
descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_FIELD_DELIMITER)
.ifPresent(schemaBuilder::setFieldDelimiter);
descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_QUOTE_CHARACTER)
.ifPresent(schemaBuilder::setQuoteCharacter);
descriptorProperties.getOptionalBoolean(CsvValidator.FORMAT_ALLOW_COMMENTS)
.ifPresent(schemaBuilder::setAllowComments);
descriptorProperties.getOptionalBoolean(CsvValidator.FORMAT_IGNORE_PARSE_ERRORS)
.ifPresent(schemaBuilder::setIgnoreParseErrors);
descriptorProperties.getOptionalString(CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER)
.ifPresent(schemaBuilder::setArrayElementDelimiter);
descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_ESCAPE_CHARACTER)
.ifPresent(schemaBuilder::setEscapeCharacter);
descriptorProperties.getOptionalString(CsvValidator.FORMAT_NULL_LITERAL)
.ifPresent(schemaBuilder::setNullLiteral);
return schemaBuilder.build();
}
@Override
public SerializationSchema<Row> createSerializationSchema(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = validateAndGetProperties(properties);
final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
final CsvRowSerializationSchema schema = new CsvRowSerializationSchema(
final CsvRowSerializationSchema.Builder schemaBuilder = new CsvRowSerializationSchema.Builder(
createTypeInformation(descriptorProperties));
// update csv schema with properties
descriptorProperties.getOptionalString(CsvValidator.FORMAT_FIELD_DELIMITER())
.ifPresent(schema::setFieldDelimiter);
descriptorProperties.getOptionalString(CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER())
.ifPresent(schema::setArrayElementDelimiter);
descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_QUOTE_CHARACTER())
.ifPresent(schema::setQuoteCharacter);
descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_ESCAPE_CHARACTER())
.ifPresent(schema::setEscapeCharacter);
descriptorProperties.getOptionalString(CsvValidator.FORMAT_BYTES_CHARSET())
.ifPresent(schema::setCharset);
descriptorProperties.getOptionalString(CsvValidator.FORMAT_NULL_VALUE())
.ifPresent(schema::setCharset);
return new CsvRowSerializationSchema(createTypeInformation(descriptorProperties));
descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_FIELD_DELIMITER)
.ifPresent(schemaBuilder::setFieldDelimiter);
descriptorProperties.getOptionalString(CsvValidator.FORMAT_LINE_DELIMITER)
.ifPresent(schemaBuilder::setLineDelimiter);
descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_QUOTE_CHARACTER)
.ifPresent(schemaBuilder::setQuoteCharacter);
descriptorProperties.getOptionalString(CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER)
.ifPresent(schemaBuilder::setArrayElementDelimiter);
descriptorProperties.getOptionalCharacter(CsvValidator.FORMAT_ESCAPE_CHARACTER)
.ifPresent(schemaBuilder::setEscapeCharacter);
descriptorProperties.getOptionalString(CsvValidator.FORMAT_NULL_LITERAL)
.ifPresent(schemaBuilder::setNullLiteral);
return schemaBuilder.build();
}
private static DescriptorProperties validateAndGetProperties(Map<String, String> propertiesMap) {
private static DescriptorProperties getValidatedProperties(Map<String, String> propertiesMap) {
final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
descriptorProperties.putProperties(propertiesMap);
// validate
new CsvValidator().validate(descriptorProperties);
return descriptorProperties;
}
/**
* Create a {@link TypeInformation} based on the "format-fields" in {@link CsvValidator}.
* @param descriptorProperties descriptor properties
* @return {@link TypeInformation}
*/
private static TypeInformation<Row> createTypeInformation(DescriptorProperties descriptorProperties) {
if (descriptorProperties.getOptionalTableSchema(CsvValidator.FORMAT_FIELDS()).isPresent()) {
return descriptorProperties.getTableSchema(CsvValidator.FORMAT_FIELDS()).toRowType();
if (descriptorProperties.containsKey(CsvValidator.FORMAT_SCHEMA)) {
return (RowTypeInfo) descriptorProperties.getType(CsvValidator.FORMAT_SCHEMA);
} else {
return SchemaValidator.deriveFormatFields(descriptorProperties).toRowType();
return deriveSchema(descriptorProperties.asMap()).toRowType();
}
}
}
......@@ -25,77 +25,108 @@ import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
import com.fasterxml.jackson.dataformat.csv.CsvSchema.Column;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema.Column;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
/**
* Converting functions that related to {@link CsvSchema}.
* In {@link CsvSchema}, there are four types(string,number,boolean
* and array), in order to satisfy various flink types, this class
* sorts out instances of {@link TypeInformation} and convert them to
* one of CsvSchema's types.
* Converter functions that covert Flink's type information to Jackson's {@link CsvSchema}.
*
* <p>In {@link CsvSchema}, there are four types (string, number, boolean, and array). In order
* to satisfy various Flink types, this class sorts out instances of {@link TypeInformation} that
* are not supported. It converts supported types to one of CsvSchema's types.
*
* <p>Note: Changes in this class need to be kept in sync with the corresponding runtime
* classes {@link CsvRowDeserializationSchema} and {@link CsvRowSerializationSchema}.
*/
public final class CsvRowSchemaConverter {
private CsvRowSchemaConverter() {
// private
}
/**
* Types that can be converted to ColumnType.NUMBER.
*
* <p>From Jackson: Value should be a number, but literals "null", "true" and "false" are also
* understood, and an empty String is considered null. Values are also trimmed (leading/trailing
* white space). Other non-numeric Strings may cause parsing exception.
*/
private static final HashSet<TypeInformation<?>> NUMBER_TYPES =
new HashSet<>(Arrays.asList(Types.LONG, Types.INT, Types.DOUBLE, Types.FLOAT,
Types.BIG_DEC, Types.BIG_INT));
new HashSet<>(Arrays.asList(Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.DOUBLE,
Types.FLOAT, Types.BIG_DEC, Types.BIG_INT));
/**
* Types that can be converted to ColumnType.STRING.
*
* <p>From Jackson: Default type if not explicitly defined; no type-inference is performed,
* and value is not trimmed.
*/
private static final HashSet<TypeInformation<?>> STRING_TYPES =
new HashSet<>(Arrays.asList(Types.STRING, Types.SQL_DATE,
Types.SQL_TIME, Types.SQL_TIMESTAMP));
new HashSet<>(Arrays.asList(Types.STRING, Types.SQL_DATE, Types.SQL_TIME, Types.SQL_TIMESTAMP));
/**
* Types that can be converted to ColumnType.BOOLEAN.
*
* <p>From Jackson: Value is expected to be a boolean ("true", "false") String, or "null", or
* empty String (equivalent to null). Values are trimmed (leading/trailing white space).
* Values other than indicated above may result in an exception.
*/
private static final HashSet<TypeInformation<?>> BOOLEAN_TYPES =
new HashSet<>(Collections.singletonList(Types.BOOLEAN));
new HashSet<>(Arrays.asList(Types.BOOLEAN, Types.VOID));
/**
* Convert {@link RowTypeInfo} to {@link CsvSchema}.
*/
public static CsvSchema rowTypeToCsvSchema(RowTypeInfo rowType) {
Builder builder = new CsvSchema.Builder();
String[] fields = rowType.getFieldNames();
TypeInformation<?>[] infos = rowType.getFieldTypes();
public static CsvSchema convert(RowTypeInfo rowType) {
final Builder builder = new CsvSchema.Builder();
final String[] fields = rowType.getFieldNames();
final TypeInformation<?>[] types = rowType.getFieldTypes();
for (int i = 0; i < rowType.getArity(); i++) {
builder.addColumn(new Column(i, fields[i], convertType(infos[i])));
builder.addColumn(new Column(i, fields[i], convertType(fields[i], types[i])));
}
return builder.build();
}
/**
* Convert {@link TypeInformation} to {@link CsvSchema.ColumnType}
* based on their catogories.
* Convert {@link TypeInformation} to {@link CsvSchema.ColumnType} based on Jackson's categories.
*/
private static CsvSchema.ColumnType convertType(TypeInformation<?> info) {
private static CsvSchema.ColumnType convertType(String fieldName, TypeInformation<?> info) {
if (STRING_TYPES.contains(info)) {
return CsvSchema.ColumnType.STRING;
} else if (NUMBER_TYPES.contains(info)) {
return CsvSchema.ColumnType.NUMBER;
} else if (BOOLEAN_TYPES.contains(info)) {
return CsvSchema.ColumnType.BOOLEAN;
} else if (info instanceof ObjectArrayTypeInfo
|| info instanceof BasicArrayTypeInfo
|| info instanceof RowTypeInfo) {
} else if (info instanceof ObjectArrayTypeInfo) {
validateNestedField(fieldName, ((ObjectArrayTypeInfo) info).getComponentInfo());
return CsvSchema.ColumnType.ARRAY;
} else if (info instanceof BasicArrayTypeInfo) {
validateNestedField(fieldName, ((BasicArrayTypeInfo) info).getComponentInfo());
return CsvSchema.ColumnType.ARRAY;
} else if (info instanceof RowTypeInfo) {
final TypeInformation<?>[] types = ((RowTypeInfo) info).getFieldTypes();
for (TypeInformation<?> type : types) {
validateNestedField(fieldName, type);
}
return CsvSchema.ColumnType.ARRAY;
} else if (info instanceof PrimitiveArrayTypeInfo &&
((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) {
((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) {
return CsvSchema.ColumnType.STRING;
} else {
throw new RuntimeException("Unable to support " + info.toString()
+ " yet");
throw new IllegalArgumentException(
"Unsupported type information '" + info.toString() + "' for field '" + fieldName + "'.");
}
}
private static void validateNestedField(String fieldName, TypeInformation<?> info) {
if (!NUMBER_TYPES.contains(info) && !STRING_TYPES.contains(info) && !BOOLEAN_TYPES.contains(info)) {
throw new IllegalArgumentException(
"Only simple types are supported in the second level nesting of fields '" +
fieldName + "' but was: " + info);
}
}
}
......@@ -18,30 +18,30 @@
package org.apache.flink.formats.csv;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ContainerNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ContainerNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
import java.io.UnsupportedEncodingException;
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Objects;
/**
* Serialization schema that serializes an object of Flink types into a CSV bytes.
......@@ -51,33 +51,105 @@ import java.sql.Timestamp;
*
* <p>Result <code>byte[]</code> messages can be deserialized using {@link CsvRowDeserializationSchema}.
*/
@Public
@PublicEvolving
public final class CsvRowSerializationSchema implements SerializationSchema<Row> {
/** Schema describing the input csv data. */
private CsvSchema csvSchema;
private static final long serialVersionUID = 2098447220136965L;
/** Type information describing the input csv data. */
private TypeInformation<Row> rowTypeInfo;
/** Type information describing the input CSV data. */
private final RowTypeInfo typeInfo;
/** Runtime instance that performs the actual work. */
private final RuntimeConverter runtimeConverter;
/** CsvMapper used to write {@link JsonNode} into bytes. */
private CsvMapper csvMapper = new CsvMapper();
private final CsvMapper csvMapper;
/** Schema describing the input CSV data. */
private final CsvSchema csvSchema;
/** Object writer used to write rows. It is configured by {@link CsvSchema}. */
private final ObjectWriter objectWriter;
/** Reusable object node. */
private ObjectNode root;
private transient ObjectNode root;
/** Charset for byte[]. */
private String charset = "UTF-8";
private CsvRowSerializationSchema(
RowTypeInfo typeInfo,
CsvSchema csvSchema) {
this.typeInfo = typeInfo;
this.runtimeConverter = createRowRuntimeConverter(typeInfo, true);
this.csvMapper = new CsvMapper();
this.csvSchema = csvSchema;
this.objectWriter = csvMapper.writer(csvSchema);
}
/**
* Create a {@link CsvRowSerializationSchema} with given {@link TypeInformation}.
* @param rowTypeInfo type information used to create schem.
* A builder for creating a {@link CsvRowSerializationSchema}.
*/
CsvRowSerializationSchema(TypeInformation<Row> rowTypeInfo) {
Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not be null !");
this.rowTypeInfo = rowTypeInfo;
this.csvSchema = CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
this.setNullValue("null");
@PublicEvolving
public static class Builder {
private final RowTypeInfo typeInfo;
private CsvSchema csvSchema;
/**
* Creates a {@link CsvRowSerializationSchema} expecting the given {@link TypeInformation}.
*
* @param typeInfo type information used to create schema.
*/
public Builder(TypeInformation<Row> typeInfo) {
Preconditions.checkNotNull(typeInfo, "Type information must not be null.");
if (!(typeInfo instanceof RowTypeInfo)) {
throw new IllegalArgumentException("Row type information expected.");
}
this.typeInfo = (RowTypeInfo) typeInfo;
this.csvSchema = CsvRowSchemaConverter.convert((RowTypeInfo) typeInfo);
}
public Builder setFieldDelimiter(char c) {
this.csvSchema = this.csvSchema.rebuild().setColumnSeparator(c).build();
return this;
}
public Builder setLineDelimiter(String delimiter) {
Preconditions.checkNotNull(delimiter, "Delimiter must not be null.");
if (!delimiter.equals("\n") && !delimiter.equals("\r") && !delimiter.equals("\r\n")) {
throw new IllegalArgumentException(
"Unsupported new line delimiter. Only \\n, \\r, or \\r\\n are supported.");
}
this.csvSchema = this.csvSchema.rebuild().setLineSeparator(delimiter).build();
return this;
}
public Builder setArrayElementDelimiter(String delimiter) {
Preconditions.checkNotNull(delimiter, "Delimiter must not be null.");
this.csvSchema = this.csvSchema.rebuild().setArrayElementSeparator(delimiter).build();
return this;
}
public Builder setQuoteCharacter(char c) {
this.csvSchema = this.csvSchema.rebuild().setQuoteChar(c).build();
return this;
}
public Builder setEscapeCharacter(char c) {
this.csvSchema = this.csvSchema.rebuild().setEscapeChar(c).build();
return this;
}
public Builder setNullLiteral(String s) {
this.csvSchema = this.csvSchema.rebuild().setNullValue(s).build();
return this;
}
public CsvRowSerializationSchema build() {
return new CsvRowSerializationSchema(
typeInfo,
csvSchema);
}
}
@Override
......@@ -86,157 +158,177 @@ public final class CsvRowSerializationSchema implements SerializationSchema<Row>
root = csvMapper.createObjectNode();
}
try {
convertRow(root, row, (RowTypeInfo) rowTypeInfo);
return csvMapper.writer(csvSchema).writeValueAsBytes(root);
} catch (JsonProcessingException e) {
throw new RuntimeException("Could not serialize row '" + row + "'. " +
"Make sure that the schema matches the input.", e);
runtimeConverter.convert(csvMapper, root, row);
return objectWriter.writeValueAsBytes(root);
} catch (Throwable t) {
throw new RuntimeException("Could not serialize row '" + row + "'.", t);
}
}
private void convertRow(ObjectNode reuse, Row row, RowTypeInfo rowTypeInfo) {
if (reuse == null) {
reuse = csvMapper.createObjectNode();
}
if (row.getArity() != rowTypeInfo.getFieldNames().length) {
throw new IllegalStateException(String.format(
"Number of elements in the row '%s' is different from number of field names: %d",
row, rowTypeInfo.getFieldNames().length));
@Override
public boolean equals(Object o) {
if (o == null || o.getClass() != this.getClass()) {
return false;
}
TypeInformation[] types = rowTypeInfo.getFieldTypes();
String[] fields = rowTypeInfo.getFieldNames();
for (int i = 0; i < types.length; i++) {
String columnName = fields[i];
Object obj = row.getField(i);
reuse.set(columnName, convert(reuse, obj, types[i], false));
if (this == o) {
return true;
}
final CsvRowSerializationSchema that = (CsvRowSerializationSchema) o;
final CsvSchema otherSchema = that.csvSchema;
return typeInfo.equals(that.typeInfo) &&
csvSchema.getColumnSeparator() == otherSchema.getColumnSeparator() &&
Arrays.equals(csvSchema.getLineSeparator(), otherSchema.getLineSeparator()) &&
csvSchema.getArrayElementSeparator().equals(otherSchema.getArrayElementSeparator()) &&
csvSchema.getQuoteChar() == otherSchema.getQuoteChar() &&
csvSchema.getEscapeChar() == otherSchema.getEscapeChar() &&
Arrays.equals(csvSchema.getNullValue(), otherSchema.getNullValue());
}
/**
* Converts an object to a JsonNode.
* @param container {@link ContainerNode} that creates {@link JsonNode}.
* @param obj Object that used to {@link JsonNode}.
* @param info Type infomation that decides the type of {@link JsonNode}.
* @param nested variable that indicates whether the obj is in a nested structure
* like a string in an array.
* @return result after converting.
*/
private JsonNode convert(ContainerNode<?> container, Object obj, TypeInformation info, Boolean nested) {
if (obj == null) {
return container.nullNode();
}
if (info == Types.STRING) {
return container.textNode((String) obj);
} else if (info == Types.LONG) {
return container.numberNode((Long) obj);
} else if (info == Types.INT) {
return container.numberNode((Integer) obj);
} else if (info == Types.DOUBLE) {
return container.numberNode((Double) obj);
} else if (info == Types.FLOAT) {
return container.numberNode((Float) obj);
} else if (info == Types.BIG_DEC) {
return container.numberNode(new BigDecimal(String.valueOf(obj)));
} else if (info == Types.BIG_INT) {
return container.numberNode(BigInteger.valueOf(Long.valueOf(String.valueOf(obj))));
} else if (info == Types.SQL_DATE) {
return container.textNode(Date.valueOf(String.valueOf(obj)).toString());
} else if (info == Types.SQL_TIME) {
return container.textNode(Time.valueOf(String.valueOf(obj)).toString());
} else if (info == Types.SQL_TIMESTAMP) {
return container.textNode(Timestamp.valueOf(String.valueOf(obj)).toString());
} else if (info == Types.BOOLEAN) {
return container.booleanNode((Boolean) obj);
} else if (info instanceof RowTypeInfo){
if (nested) {
throw new RuntimeException("Unable to support nested row type " + info.toString() + " yet");
}
return convertArray((Row) obj, (RowTypeInfo) info);
} else if (info instanceof BasicArrayTypeInfo) {
if (nested) {
throw new RuntimeException("Unable to support nested array type " + info.toString() + " yet");
}
return convertArray((Object[]) obj,
((BasicArrayTypeInfo) info).getComponentInfo());
} else if (info instanceof PrimitiveArrayTypeInfo &&
((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) {
/* We converts byte[] to TextNode instead of BinaryNode here,
because the instance of BinaryNode will be serialized to base64 string in
{@link com.fasterxml.jackson.databind.node.BinaryNode#serialize(JsonGenerator, SerializerProvider)},
which is unacceptable for users.
*/
try {
return container.textNode(new String((byte[]) obj, charset));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("Unsupport encoding charset " + charset, e);
}
} else {
throw new RuntimeException("Unable to support type " + info.toString() + " yet");
}
@Override
public int hashCode() {
return Objects.hash(
typeInfo,
csvSchema.getColumnSeparator(),
csvSchema.getLineSeparator(),
csvSchema.getArrayElementSeparator(),
csvSchema.getQuoteChar(),
csvSchema.getEscapeChar(),
csvSchema.getNullValue());
}
/**
* Use {@link ArrayNode} to represents a row.
*/
private ArrayNode convertArray(Row row, RowTypeInfo rowTypeInfo) {
ArrayNode arrayNode = csvMapper.createArrayNode();
TypeInformation[] types = rowTypeInfo.getFieldTypes();
String[] fields = rowTypeInfo.getFieldNames();
for (int i = 0; i < fields.length; i++) {
arrayNode.add(convert(arrayNode, row.getField(i), types[i], true));
}
return arrayNode;
// --------------------------------------------------------------------------------------------
private interface RuntimeConverter extends Serializable {
JsonNode convert(CsvMapper csvMapper, ContainerNode<?> container, Object obj);
}
/**
* Use {@link ArrayNode} to represents an array.
*/
private ArrayNode convertArray(Object[] obj, TypeInformation elementInfo) {
ArrayNode arrayNode = csvMapper.createArrayNode();
for (Object elementObj : obj) {
arrayNode.add(convert(arrayNode, elementObj, elementInfo, true));
}
return arrayNode;
private static RuntimeConverter createRowRuntimeConverter(RowTypeInfo rowTypeInfo, boolean isTopLevel) {
final TypeInformation[] fieldTypes = rowTypeInfo.getFieldTypes();
final String[] fieldNames = rowTypeInfo.getFieldNames();
final RuntimeConverter[] fieldConverters = createFieldRuntimeConverters(fieldTypes);
return assembleRowRuntimeConverter(isTopLevel, fieldNames, fieldConverters);
}
public void setCharset(String charset) {
this.charset = charset;
private static RuntimeConverter[] createFieldRuntimeConverters(TypeInformation<?>[] fieldTypes) {
final RuntimeConverter[] fieldConverters = new RuntimeConverter[fieldTypes.length];
for (int i = 0; i < fieldTypes.length; i++) {
fieldConverters[i] = createNullableRuntimeConverter(fieldTypes[i]);
}
return fieldConverters;
}
public void setFieldDelimiter(String s) {
if (s.length() != 1) {
throw new RuntimeException("FieldDelimiter's length must be one !");
private static RuntimeConverter assembleRowRuntimeConverter(
boolean isTopLevel,
String[] fieldNames,
RuntimeConverter[] fieldConverters) {
final int rowArity = fieldNames.length;
// top level reuses the object node container
if (isTopLevel) {
return (csvMapper, container, obj) -> {
final Row row = (Row) obj;
validateArity(rowArity, row.getArity());
final ObjectNode objectNode = (ObjectNode) container;
for (int i = 0; i < rowArity; i++) {
objectNode.set(
fieldNames[i],
fieldConverters[i].convert(csvMapper, container, row.getField(i)));
}
return objectNode;
};
} else {
return (csvMapper, container, obj) -> {
final Row row = (Row) obj;
validateArity(rowArity, row.getArity());
final ArrayNode arrayNode = csvMapper.createArrayNode();
for (int i = 0; i < rowArity; i++) {
arrayNode.add(fieldConverters[i].convert(csvMapper, arrayNode, row.getField(i)));
}
return arrayNode;
};
}
this.csvSchema = this.csvSchema.rebuild().setColumnSeparator(s.charAt(0)).build();
}
public void setArrayElementDelimiter(String s) {
this.csvSchema = this.csvSchema.rebuild().setArrayElementSeparator(s).build();
private static RuntimeConverter createNullableRuntimeConverter(TypeInformation<?> info) {
final RuntimeConverter valueConverter = createRuntimeConverter(info);
return (csvMapper, container, obj) -> {
if (obj == null) {
return container.nullNode();
}
return valueConverter.convert(csvMapper, container, obj);
};
}
public void setQuoteCharacter(char c) {
this.csvSchema = this.csvSchema.rebuild().setQuoteChar(c).build();
private static RuntimeConverter createRuntimeConverter(TypeInformation<?> info) {
if (info.equals(Types.VOID)) {
return (csvMapper, container, obj) -> container.nullNode();
} else if (info.equals(Types.STRING)) {
return (csvMapper, container, obj) -> container.textNode((String) obj);
} else if (info.equals(Types.BOOLEAN)) {
return (csvMapper, container, obj) -> container.booleanNode((Boolean) obj);
} else if (info.equals(Types.BYTE)) {
return (csvMapper, container, obj) -> container.numberNode((Byte) obj);
} else if (info.equals(Types.SHORT)) {
return (csvMapper, container, obj) -> container.numberNode((Short) obj);
} else if (info.equals(Types.INT)) {
return (csvMapper, container, obj) -> container.numberNode((Integer) obj);
} else if (info.equals(Types.LONG)) {
return (csvMapper, container, obj) -> container.numberNode((Long) obj);
} else if (info.equals(Types.FLOAT)) {
return (csvMapper, container, obj) -> container.numberNode((Float) obj);
} else if (info.equals(Types.DOUBLE)) {
return (csvMapper, container, obj) -> container.numberNode((Double) obj);
} else if (info.equals(Types.BIG_DEC)) {
return (csvMapper, container, obj) -> container.numberNode((BigDecimal) obj);
} else if (info.equals(Types.BIG_INT)) {
return (csvMapper, container, obj) -> container.numberNode((BigInteger) obj);
} else if (info.equals(Types.SQL_DATE)) {
return (csvMapper, container, obj) -> container.textNode(obj.toString());
} else if (info.equals(Types.SQL_TIME)) {
return (csvMapper, container, obj) -> container.textNode(obj.toString());
} else if (info.equals(Types.SQL_TIMESTAMP)) {
return (csvMapper, container, obj) -> container.textNode(obj.toString());
} else if (info instanceof RowTypeInfo){
return createRowRuntimeConverter((RowTypeInfo) info, false);
} else if (info instanceof BasicArrayTypeInfo) {
return createObjectArrayRuntimeConverter(((BasicArrayTypeInfo) info).getComponentInfo());
} else if (info instanceof ObjectArrayTypeInfo) {
return createObjectArrayRuntimeConverter(((ObjectArrayTypeInfo) info).getComponentInfo());
} else if (info instanceof PrimitiveArrayTypeInfo &&
((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) {
return createByteArrayRuntimeConverter();
}
else {
throw new RuntimeException("Unsupported type information '" + info + "'.");
}
}
public void setEscapeCharacter(char c) {
this.csvSchema = this.csvSchema.rebuild().setEscapeChar(c).build();
private static RuntimeConverter createObjectArrayRuntimeConverter(TypeInformation<?> elementType) {
final RuntimeConverter elementConverter = createNullableRuntimeConverter(elementType);
return (csvMapper, container, obj) -> {
final Object[] array = (Object[]) obj;
final ArrayNode arrayNode = csvMapper.createArrayNode();
for (Object element : array) {
arrayNode.add(elementConverter.convert(csvMapper, arrayNode, element));
}
return arrayNode;
};
}
public void setNullValue(String s) {
this.csvSchema = this.csvSchema.rebuild().setNullValue(s).build();
private static RuntimeConverter createByteArrayRuntimeConverter() {
return (csvMapper, container, obj) -> container.binaryNode((byte[]) obj);
}
@Override
public boolean equals(Object o) {
if (o == null || o.getClass() != this.getClass()) {
return false;
private static void validateArity(int expected, int actual) {
if (expected != actual) {
throw new RuntimeException("Row length mismatch. " + expected +
" fields expected but was " + actual + ".");
}
if (this == o) {
return true;
}
final CsvRowSerializationSchema that = (CsvRowSerializationSchema) o;
return rowTypeInfo.equals(that.rowTypeInfo) &&
csvSchema.toString().equals(that.csvSchema.toString());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.descriptors;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.utils.TypeStringUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import java.util.Map;
import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_ALLOW_COMMENTS;
import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER;
import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_ESCAPE_CHARACTER;
import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_FIELD_DELIMITER;
import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_IGNORE_PARSE_ERRORS;
import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_LINE_DELIMITER;
import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_NULL_LITERAL;
import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_QUOTE_CHARACTER;
import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_SCHEMA;
import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_TYPE_VALUE;
import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA;
/**
* Format descriptor for comma-separated values (CSV).
*
* <p>This descriptor aims to comply with RFC-4180 ("Common Format and MIME Type for
* Comma-Separated Values (CSV) Files") proposed by the Internet Engineering Task Force (IETF).
*
* <p>Note: This descriptor does not describe Flink's old non-standard CSV table
* source/sink. Currently, this descriptor can be used when writing to Kafka. The old one is
* still available under "org.apache.flink.table.descriptors.OldCsv" for stream/batch
* filesystem operations.
*/
@PublicEvolving
public class Csv extends FormatDescriptor {
private DescriptorProperties internalProperties = new DescriptorProperties(true);
/**
* Format descriptor for comma-separated values (CSV).
*
* <p>This descriptor aims to comply with RFC-4180 ("Common Format and MIME Type for
* Comma-Separated Values (CSV) Files) proposed by the Internet Engineering Task Force (IETF).
*/
public Csv() {
super(FORMAT_TYPE_VALUE, 1);
}
/**
* Sets the field delimiter character (',' by default).
*
* @param delimiter the field delimiter character
*/
public Csv fieldDelimiter(char delimiter) {
internalProperties.putCharacter(FORMAT_FIELD_DELIMITER, delimiter);
return this;
}
/**
* Sets the line delimiter ("\n" by default; otherwise "\r" or "\r\n" are allowed).
*
* @param delimiter the line delimiter
*/
public Csv lineDelimiter(String delimiter) {
Preconditions.checkNotNull(delimiter);
internalProperties.putString(FORMAT_LINE_DELIMITER, delimiter);
return this;
}
/**
* Sets the quote character for enclosing field values ('"' by default).
*
* @param quoteCharacter the quote character
*/
public Csv quoteCharacter(char quoteCharacter) {
internalProperties.putCharacter(FORMAT_QUOTE_CHARACTER, quoteCharacter);
return this;
}
/**
* Ignores comment lines that start with '#' (disabled by default). If enabled, make sure to
* also ignore parse errors to allow empty rows.
*/
public Csv allowComments() {
internalProperties.putBoolean(FORMAT_ALLOW_COMMENTS, true);
return this;
}
/**
* Skip fields and rows with parse errors instead of failing. Fields are set to {@code null}
* in case of errors. By default, an exception is thrown.
*/
public Csv ignoreParseErrors() {
internalProperties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, true);
return this;
}
/**
* Sets the array element delimiter string for separating array or row element
* values (";" by default).
*
* @param delimiter the array element delimiter
*/
public Csv arrayElementDelimiter(String delimiter) {
Preconditions.checkNotNull(delimiter);
internalProperties.putString(FORMAT_ARRAY_ELEMENT_DELIMITER, delimiter);
return this;
}
/**
* Sets the escape character for escaping values (disabled by default).
*
* @param escapeCharacter escaping character (e.g. backslash)
*/
public Csv escapeCharacter(char escapeCharacter) {
internalProperties.putCharacter(FORMAT_ESCAPE_CHARACTER, escapeCharacter);
return this;
}
/**
* Sets the null literal string that is interpreted as a null value (disabled by default).
*
* @param nullLiteral null literal (e.g. "null" or "n/a")
*/
public Csv nullLiteral(String nullLiteral) {
Preconditions.checkNotNull(nullLiteral);
internalProperties.putString(FORMAT_NULL_LITERAL, nullLiteral);
return this;
}
/**
* Sets the format schema with field names and the types. Required if schema is not derived.
*
* @param schemaType type information that describes the schema
*/
public Csv schema(TypeInformation<Row> schemaType) {
Preconditions.checkNotNull(schemaType);
internalProperties.putString(FORMAT_SCHEMA, TypeStringUtils.writeTypeInfo(schemaType));
return this;
}
/**
* Derives the format schema from the table's schema. Required if no format schema is defined.
*
* <p>This allows for defining schema information only once.
*
* <p>The names, types, and fields' order of the format are determined by the table's
* schema. Time attributes are ignored if their origin is not a field. A "from" definition
* is interpreted as a field renaming in the format.
*/
public Csv deriveSchema() {
internalProperties.putBoolean(FORMAT_DERIVE_SCHEMA, true);
return this;
}
@Override
protected Map<String, String> toFormatProperties() {
final DescriptorProperties properties = new DescriptorProperties();
properties.putProperties(internalProperties);
return properties.asMap();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.descriptors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;
import java.util.Arrays;
/**
* Validator for {@link Csv}.
*/
@Internal
public class CsvValidator extends FormatDescriptorValidator {
public static final String FORMAT_TYPE_VALUE = "csv";
public static final String FORMAT_FIELD_DELIMITER = "format.field-delimiter";
public static final String FORMAT_LINE_DELIMITER = "format.line-delimiter";
public static final String FORMAT_QUOTE_CHARACTER = "format.quote-character";
public static final String FORMAT_ALLOW_COMMENTS = "format.allow-comments";
public static final String FORMAT_IGNORE_PARSE_ERRORS = "format.ignore-parse-errors";
public static final String FORMAT_ARRAY_ELEMENT_DELIMITER = "format.array-element-delimiter";
public static final String FORMAT_ESCAPE_CHARACTER = "format.escape-character";
public static final String FORMAT_NULL_LITERAL = "format.null-literal";
public static final String FORMAT_SCHEMA = "format.schema";
@Override
public void validate(DescriptorProperties properties) {
super.validate(properties);
properties.validateString(FORMAT_FIELD_DELIMITER, true, 1, 1);
properties.validateEnumValues(FORMAT_LINE_DELIMITER, true, Arrays.asList("\r", "\n", "\r\n"));
properties.validateString(FORMAT_QUOTE_CHARACTER, true, 1, 1);
properties.validateBoolean(FORMAT_ALLOW_COMMENTS, true);
properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, true);
properties.validateString(FORMAT_ARRAY_ELEMENT_DELIMITER, true, 1);
properties.validateString(FORMAT_ESCAPE_CHARACTER, true, 1, 1);
properties.validateBoolean(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA, true);
final boolean hasSchema = properties.containsKey(FORMAT_SCHEMA);
final boolean isDerived = properties
.getOptionalBoolean(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA)
.orElse(false);
if (isDerived && hasSchema) {
throw new ValidationException(
"Format cannot define a schema and derive from the table's schema at the same time.");
} else if (hasSchema) {
properties.validateType(FORMAT_SCHEMA, false, true);
} else if (!isDerived) {
throw new ValidationException(
"A definition of a schema or derivation from the table's schema is required.");
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.csv;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;
import org.apache.flink.util.InstantiationUtil;
import org.junit.Test;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.function.Consumer;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
/**
* Tests for {@link CsvRowSerializationSchema} and {@link CsvRowDeserializationSchema}.
*/
public class CsvRowDeSerializationSchemaTest {
@Test
@SuppressWarnings("unchecked")
public void testSerializeDeserialize() throws Exception {
testNullableField(Types.LONG, "null", null);
testNullableField(Types.STRING, "null", null);
testNullableField(Types.VOID, "null", null);
testNullableField(Types.STRING, "\"This is a test.\"", "This is a test.");
testNullableField(Types.STRING, "\"This is a test\n\r.\"", "This is a test\n\r.");
testNullableField(Types.BOOLEAN, "true", true);
testNullableField(Types.BOOLEAN, "null", null);
testNullableField(Types.BYTE, "124", (byte) 124);
testNullableField(Types.SHORT, "10000", (short) 10000);
testNullableField(Types.INT, "1234567", 1234567);
testNullableField(Types.LONG, "12345678910", 12345678910L);
testNullableField(Types.FLOAT, "0.33333334", 0.33333334f);
testNullableField(Types.DOUBLE, "0.33333333332", 0.33333333332d);
testNullableField(Types.BIG_DEC,
"\"1234.0000000000000000000000001\"",
new BigDecimal("1234.0000000000000000000000001"));
testNullableField(Types.BIG_INT,
"\"123400000000000000000000000000\"",
new BigInteger("123400000000000000000000000000"));
testNullableField(Types.SQL_DATE, "2018-10-12", Date.valueOf("2018-10-12"));
testNullableField(Types.SQL_TIME, "12:12:12", Time.valueOf("12:12:12"));
testNullableField(
Types.SQL_TIMESTAMP,
"\"2018-10-12 12:12:12.0\"",
Timestamp.valueOf("2018-10-12 12:12:12"));
testNullableField(
Types.ROW(Types.STRING, Types.INT, Types.BOOLEAN),
"Hello;42;false",
Row.of("Hello", 42, false));
testNullableField(
Types.OBJECT_ARRAY(Types.STRING),
"a;b;c",
new String[] {"a", "b", "c"});
testNullableField(
Types.OBJECT_ARRAY(Types.BYTE),
"12;4;null",
new Byte[] {12, 4, null});
testNullableField(
(TypeInformation<byte[]>) Types.PRIMITIVE_ARRAY(Types.BYTE),
"awML",
new byte[] {107, 3, 11});
}
@Test
public void testSerializeDeserializeCustomizedProperties() throws Exception {
final Consumer<CsvRowSerializationSchema.Builder> serConfig = (serSchemaBuilder) -> serSchemaBuilder
.setEscapeCharacter('*')
.setQuoteCharacter('\'')
.setArrayElementDelimiter(":")
.setFieldDelimiter(';');
final Consumer<CsvRowDeserializationSchema.Builder> deserConfig = (deserSchemaBuilder) -> deserSchemaBuilder
.setEscapeCharacter('*')
.setQuoteCharacter('\'')
.setArrayElementDelimiter(":")
.setFieldDelimiter(';');
testField(Types.STRING, "123*'4**", "123'4*", deserConfig, ";");
testField(Types.STRING, "'123''4**'", "123'4*", serConfig, deserConfig, ";");
testField(Types.STRING, "'a;b*'c'", "a;b'c", deserConfig, ";");
testField(Types.STRING, "'a;b''c'", "a;b'c", serConfig, deserConfig, ";");
testField(Types.INT, " 12 ", 12, deserConfig, ";");
testField(Types.INT, "12", 12, serConfig, deserConfig, ";");
testField(Types.ROW(Types.STRING, Types.STRING), "1:hello", Row.of("1", "hello"), deserConfig, ";");
testField(Types.ROW(Types.STRING, Types.STRING), "'1:hello'", Row.of("1", "hello"), serConfig, deserConfig, ";");
testField(Types.ROW(Types.STRING, Types.STRING), "'1:hello world'", Row.of("1", "hello world"), serConfig, deserConfig, ";");
testField(Types.STRING, "null", "null", serConfig, deserConfig, ";"); // string because null literal has not been set
}
@Test
public void testDeserializeParseError() throws Exception {
try {
testDeserialization(false, false, "Test,null,Test"); // null not supported
fail("Missing field should cause failure.");
} catch (IOException e) {
// valid exception
}
}
@Test
public void testDeserializeUnsupportedNull() throws Exception {
// unsupported null for integer
assertEquals(Row.of("Test", null, "Test"), testDeserialization(true, false, "Test,null,Test"));
}
@Test
public void testDeserializeIncompleteRow() throws Exception {
// last two columns are missing
assertEquals(Row.of("Test", null, null), testDeserialization(true, false, "Test"));
}
@Test
public void testDeserializeMoreColumnsThanExpected() throws Exception {
// one additional string column
assertNull(testDeserialization(true, false, "Test,12,Test,Test"));
}
@Test
public void testDeserializeIgnoreComment() throws Exception {
// # is part of the string
assertEquals(Row.of("#Test", 12, "Test"), testDeserialization(false, false, "#Test,12,Test"));
}
@Test
public void testDeserializeAllowComment() throws Exception {
// entire row is ignored
assertNull(testDeserialization(true, true, "#Test,12,Test"));
}
@Test
public void testSerializationProperties() throws Exception {
final TypeInformation<Row> rowInfo = Types.ROW(Types.STRING, Types.INT, Types.STRING);
final CsvRowSerializationSchema.Builder serSchemaBuilder = new CsvRowSerializationSchema.Builder(rowInfo)
.setLineDelimiter("\r");
assertArrayEquals(
"Test,12,Hello\r".getBytes(),
serialize(serSchemaBuilder, Row.of("Test", 12, "Hello")));
}
@Test(expected = IllegalArgumentException.class)
public void testInvalidNesting() throws Exception {
testNullableField(Types.ROW(Types.ROW(Types.STRING)), "FAIL", Row.of(Row.of("FAIL")));
}
@Test(expected = IllegalArgumentException.class)
public void testInvalidType() throws Exception {
testNullableField(Types.GENERIC(java.util.Date.class), "FAIL", new java.util.Date());
}
private <T> void testNullableField(TypeInformation<T> fieldInfo, String string, T value) throws Exception {
testField(
fieldInfo,
string,
value,
(deserSchema) -> deserSchema.setNullLiteral("null"),
(serSchema) -> serSchema.setNullLiteral("null"),
",");
}
private <T> void testField(
TypeInformation<T> fieldInfo,
String csvValue,
T value,
Consumer<CsvRowSerializationSchema.Builder> serializationConfig,
Consumer<CsvRowDeserializationSchema.Builder> deserializationConfig,
String fieldDelimiter) throws Exception {
final TypeInformation<Row> rowInfo = Types.ROW(Types.STRING, fieldInfo, Types.STRING);
final String expectedCsv = "BEGIN" + fieldDelimiter + csvValue + fieldDelimiter + "END\n";
final Row expectedRow = Row.of("BEGIN", value, "END");
// serialization
final CsvRowSerializationSchema.Builder serSchemaBuilder = new CsvRowSerializationSchema.Builder(rowInfo);
serializationConfig.accept(serSchemaBuilder);
final byte[] serializedRow = serialize(serSchemaBuilder, expectedRow);
assertEquals(expectedCsv, new String(serializedRow));
// deserialization
final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(rowInfo);
deserializationConfig.accept(deserSchemaBuilder);
final Row deserializedRow = deserialize(deserSchemaBuilder, expectedCsv);
assertEquals(expectedRow, deserializedRow);
}
private <T> void testField(
TypeInformation<T> fieldInfo,
String csvValue,
T value,
Consumer<CsvRowDeserializationSchema.Builder> deserializationConfig,
String fieldDelimiter) throws Exception {
final TypeInformation<Row> rowInfo = Types.ROW(Types.STRING, fieldInfo, Types.STRING);
final String csv = "BEGIN" + fieldDelimiter + csvValue + fieldDelimiter + "END\n";
final Row expectedRow = Row.of("BEGIN", value, "END");
// deserialization
final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(rowInfo);
deserializationConfig.accept(deserSchemaBuilder);
final Row deserializedRow = deserialize(deserSchemaBuilder, csv);
assertEquals(expectedRow, deserializedRow);
}
private Row testDeserialization(
boolean allowParsingErrors,
boolean allowComments,
String string) throws Exception {
final TypeInformation<Row> rowInfo = Types.ROW(Types.STRING, Types.INT, Types.STRING);
final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(rowInfo)
.setIgnoreParseErrors(allowParsingErrors)
.setAllowComments(allowComments);
return deserialize(deserSchemaBuilder, string);
}
private static byte[] serialize(CsvRowSerializationSchema.Builder serSchemaBuilder, Row row) throws Exception {
// we serialize and deserialize the schema to test runtime behavior
// when the schema is shipped to the cluster
final CsvRowSerializationSchema schema = InstantiationUtil.deserializeObject(
InstantiationUtil.serializeObject(serSchemaBuilder.build()),
CsvRowDeSerializationSchemaTest.class.getClassLoader());
return schema.serialize(row);
}
private static Row deserialize(CsvRowDeserializationSchema.Builder deserSchemaBuilder, String csv) throws Exception {
// we serialize and deserialize the schema to test runtime behavior
// when the schema is shipped to the cluster
final CsvRowDeserializationSchema schema = InstantiationUtil.deserializeObject(
InstantiationUtil.serializeObject(deserSchemaBuilder.build()),
CsvRowDeSerializationSchemaTest.class.getClassLoader());
return schema.deserialize(csv.getBytes());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.csv;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.Types;
import org.apache.flink.types.Row;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
/**
* Testing for {@link CsvRowDeserializationSchema}.
*/
public class CsvRowDeserializationSchemaTest extends TestLogger {
@Test
public void testDeserialize() throws IOException {
final long currentMills = System.currentTimeMillis();
final TypeInformation<Row> rowTypeInfo = Types.ROW(
new String[]{"a", "b", "c", "d", "e", "f", "g"},
new TypeInformation[]{
Types.STRING(), Types.LONG(), Types.DECIMAL(),
Types.ROW(
new String[]{"c1", "c2", "c3"},
new TypeInformation[]{
Types.INT(),
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
Types.STRING()
}
), Types.SQL_TIMESTAMP(), Types.BOOLEAN(),
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
}
);
String c1 = "123";
String c2 = String.valueOf(34L);
String c3 = String.valueOf(1233.2);
String c4 = "1" + ";" + new String("abc".getBytes()) + ";" + "cba";
String c5 = new Timestamp(currentMills).toString();
String c6 = "true";
String c7 = new String("12345".getBytes());
byte[] bytes = (c1 + "," + c2 + "," + c3 + "," + c4 + ","
+ c5 + "," + c6 + "," + c7).getBytes();
CsvRowDeserializationSchema deserializationSchema = new CsvRowDeserializationSchema(rowTypeInfo);
Row deserializedRow = deserializationSchema.deserialize(bytes);
assertEquals(7, deserializedRow.getArity());
assertEquals("123", deserializedRow.getField(0));
assertEquals(34L, deserializedRow.getField(1));
assertEquals(BigDecimal.valueOf(1233.2), deserializedRow.getField(2));
assertArrayEquals("abc".getBytes(), (byte[]) ((Row) deserializedRow.getField(3)).getField(1));
assertEquals(new Timestamp(currentMills), deserializedRow.getField(4));
assertEquals(true, deserializedRow.getField(5));
assertArrayEquals("12345".getBytes(), (byte[]) deserializedRow.getField(6));
}
@Test
public void testCustomizedProperties() throws IOException {
final TypeInformation<Row> rowTypeInfo = Types.ROW(
new String[]{"a", "b", "c"},
new TypeInformation[]{Types.STRING(), Types.STRING(),
Types.ROW(
new String[]{"c1", "c2"},
new TypeInformation[]{Types.INT(), Types.STRING()}
)}
);
String c1 = "123*\"4";
String c2 = "'a,bc'";
String c3 = "1:zxv";
byte[] bytes = (c1 + "," + c2 + "," + c3).getBytes();
CsvRowDeserializationSchema deserializationSchema = new CsvRowDeserializationSchema(rowTypeInfo);
deserializationSchema.setEscapeCharacter('*');
deserializationSchema.setQuoteCharacter('\'');
deserializationSchema.setArrayElementDelimiter(":");
Row deserializedRow = deserializationSchema.deserialize(bytes);
assertEquals("123\"4", deserializedRow.getField(0));
assertEquals("a,bc", deserializedRow.getField(1));
assertEquals("zxv", ((Row) deserializedRow.getField(2)).getField(1));
}
@Test
public void testCharset() throws IOException {
final TypeInformation<Row> rowTypeInfo = Types.ROW(
new String[]{"a"},
new TypeInformation[]{PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO}
);
final CsvRowDeserializationSchema schema = new CsvRowDeserializationSchema(rowTypeInfo);
schema.setCharset("UTF-16");
byte[] bytes = "abc".getBytes(StandardCharsets.UTF_16);
Row result = schema.deserialize(bytes);
assertEquals("abc", new String((byte[]) result.getField(0), StandardCharsets.UTF_16));
}
@Test
public void testNull() throws IOException {
final TypeInformation<Row> rowTypeInfo = Types.ROW(
new String[]{"a"},
new TypeInformation[]{Types.STRING()}
);
final byte[] bytes = "123".getBytes();
final CsvRowDeserializationSchema deserializationSchema = new CsvRowDeserializationSchema(rowTypeInfo);
deserializationSchema.setNullValue("123");
final Row row = deserializationSchema.deserialize(bytes);
assertNull(row.getField(0));
}
@Test(expected = IllegalArgumentException.class)
public void testNumberOfFieldNamesAndTypesMismatch() {
TypeInformation<Row> rowTypeInfo = Types.ROW(
new String[]{"a", "b"},
new TypeInformation[]{Types.STRING()}
);
new CsvRowDeserializationSchema(rowTypeInfo);
}
}
......@@ -21,11 +21,9 @@ package org.apache.flink.formats.csv;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.NoMatchingTableFactoryException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Descriptor;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.factories.DeserializationSchemaFactory;
import org.apache.flink.table.factories.SerializationSchemaFactory;
......@@ -35,16 +33,16 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
/**
* Testing for {@link CsvRowFormatFactory}.
* Tests for {@link CsvRowFormatFactory}.
*/
public class CsvRowFormatFactoryTest extends TestLogger {
private static final TypeInformation<Row> SCHEMA = Types.ROW(
new String[]{"a", "b", "c"},
new TypeInformation[]{Types.STRING(), Types.INT(), Types.ROW(
......@@ -55,67 +53,69 @@ public class CsvRowFormatFactoryTest extends TestLogger {
@Test
public void testSchema() {
final Map<String, String> properties = toMap(
new Csv()
.field("a", Types.STRING())
.field("b", Types.INT())
.field("c", Types.ROW(
new String[]{"a", "b", "c"},
new TypeInformation[]{Types.STRING(), Types.INT(), Types.BOOLEAN()}
))
.arrayElementDelim("^^")
.escapeCharacter('c')
);
testSchemaSerializationSchema(properties);
testSchemaDeserializationSchema(properties);
}
@Test
public void testDerived() {
final Map<String, String> properties = toMap(
new Schema()
.field("a", Types.STRING())
.field("b", Types.INT())
.field("c", Types.ROW(
new String[]{"a", "b", "c"},
new TypeInformation[]{Types.STRING(), Types.INT(), Types.BOOLEAN()}
)), new Csv().derived(true)
);
testSchemaSerializationSchema(properties);
testSchemaDeserializationSchema(properties);
}
final Map<String, String> properties = new Csv()
.schema(SCHEMA)
.fieldDelimiter(';')
.lineDelimiter("\r\n")
.quoteCharacter('\'')
.allowComments()
.ignoreParseErrors()
.arrayElementDelimiter("|")
.escapeCharacter('\\')
.nullLiteral("n/a")
.toProperties();
@Test(expected = NoMatchingTableFactoryException.class)
public void testUnsupportedProperties() {
final Map<String, String> properties = toMap(
new Csv()
.field("a", Types.STRING())
.lineDelimiter("%")
);
testSchemaSerializationSchema(properties);
}
final CsvRowDeserializationSchema expectedDeser = new CsvRowDeserializationSchema.Builder(SCHEMA)
.setFieldDelimiter(';')
.setQuoteCharacter('\'')
.setAllowComments(true)
.setIgnoreParseErrors(true)
.setArrayElementDelimiter("|")
.setEscapeCharacter('\\')
.setNullLiteral("n/a")
.build();
private void testSchemaDeserializationSchema(Map<String, String> properties) {
final DeserializationSchema<?> actual2 = TableFactoryService
final DeserializationSchema<?> actualDeser = TableFactoryService
.find(DeserializationSchemaFactory.class, properties)
.createDeserializationSchema(properties);
final CsvRowDeserializationSchema expected2 = new CsvRowDeserializationSchema(SCHEMA);
assertEquals(expected2, actual2);
}
private void testSchemaSerializationSchema(Map<String, String> properties) {
final SerializationSchema<?> actual1 = TableFactoryService
assertEquals(expectedDeser, actualDeser);
final CsvRowSerializationSchema expectedSer = new CsvRowSerializationSchema.Builder(SCHEMA)
.setFieldDelimiter(';')
.setLineDelimiter("\r\n")
.setQuoteCharacter('\'')
.setArrayElementDelimiter("|")
.setEscapeCharacter('\\')
.setNullLiteral("n/a")
.build();
final SerializationSchema<?> actualSer = TableFactoryService
.find(SerializationSchemaFactory.class, properties)
.createSerializationSchema(properties);
final SerializationSchema expected1 = new CsvRowSerializationSchema(SCHEMA);
assertEquals(expected1, actual1);
assertEquals(expectedSer, actualSer);
}
private static Map<String, String> toMap(Descriptor... desc) {
final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
for (Descriptor d : desc) {
d.addProperties(descriptorProperties);
}
return descriptorProperties.asMap();
@Test
public void testSchemaDerivation() {
final Map<String, String> properties = new HashMap<>();
properties.putAll(new Schema().schema(TableSchema.fromTypeInfo(SCHEMA)).toProperties());
properties.putAll(new Csv().deriveSchema().toProperties());
final CsvRowSerializationSchema expectedSer = new CsvRowSerializationSchema.Builder(SCHEMA).build();
final CsvRowDeserializationSchema expectedDeser = new CsvRowDeserializationSchema.Builder(SCHEMA).build();
final SerializationSchema<?> actualSer = TableFactoryService
.find(SerializationSchemaFactory.class, properties)
.createSerializationSchema(properties);
assertEquals(expectedSer, actualSer);
final DeserializationSchema<?> actualDeser = TableFactoryService
.find(DeserializationSchemaFactory.class, properties)
.createDeserializationSchema(properties);
assertEquals(expectedDeser, actualDeser);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.csv;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.util.TestLogger;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import com.fasterxml.jackson.dataformat.csv.CsvSchema.ColumnType;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
/**
* testing for {@link CsvRowSchemaConverter}.
*/
public class CsvRowSchemaConverterTest extends TestLogger {
@Test
public void testRowToCsvSchema() {
RowTypeInfo rowTypeInfo = new RowTypeInfo(
new TypeInformation<?>[] {
Types.STRING,
Types.LONG,
Types.ROW(Types.STRING),
Types.BIG_DEC,
Types.BOOLEAN,
BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO,
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
},
new String[]{"a", "b", "c", "d", "e", "f", "g"}
);
CsvSchema expect = CsvSchema.builder()
.addColumn("a", ColumnType.STRING)
.addColumn("b", ColumnType.NUMBER)
.addColumn("c", ColumnType.ARRAY)
.addColumn("d", ColumnType.NUMBER)
.addColumn("e", ColumnType.BOOLEAN)
.addColumn("f", ColumnType.ARRAY)
.addColumn("g", ColumnType.STRING)
.build();
CsvSchema actual = CsvRowSchemaConverter.rowTypeToCsvSchema(rowTypeInfo);
assertEquals(expect.toString(), actual.toString());
}
@Test(expected = RuntimeException.class)
public void testUnsupportedType() {
CsvRowSchemaConverter.rowTypeToCsvSchema(new RowTypeInfo(
new TypeInformation[]{Types.STRING,
PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO},
new String[]{"a", "b"}
));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.csv;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.Types;
import org.apache.flink.types.Row;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
/**
* Testing for {@link CsvRowSerializationSchema}.
*/
public class CsvRowSerializationSchemaTest extends TestLogger {
@Test
public void testSerializeAndDeserialize() throws IOException {
final String[] fields = new String[]{"a", "b", "c", "d", "e", "f", "g", "h"};
final TypeInformation[] types = new TypeInformation[]{
Types.BOOLEAN(), Types.STRING(), Types.INT(), Types.DECIMAL(),
Types.SQL_TIMESTAMP(), PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
Types.ROW(
new String[]{"g1", "g2"},
new TypeInformation[]{Types.STRING(), Types.LONG()}),
Types.STRING()
};
final TypeInformation<Row> rowSchema = Types.ROW(fields, types);
final Row row = new Row(8);
final Row nestedRow = new Row(2);
nestedRow.setField(0, "z\"xcv");
nestedRow.setField(1, 123L);
row.setField(0, true);
row.setField(1, "abcd");
row.setField(2, 1);
row.setField(3, BigDecimal.valueOf(1.2334));
row.setField(4, new Timestamp(System.currentTimeMillis()));
row.setField(5, "qwecxcr".getBytes());
row.setField(6, nestedRow);
row.setField(7, null);
final Row resultRow = serializeAndDeserialize(rowSchema, row);
assertEquals(row, resultRow);
}
@Test
public void testSerialize() {
long currentMillis = System.currentTimeMillis();
Row row = new Row(4);
Row nestedRow = new Row(2);
row.setField(0, "abc");
row.setField(1, 34);
row.setField(2, new Timestamp(currentMillis));
nestedRow.setField(0, "bc");
nestedRow.setField(1, "qwertyu".getBytes());
row.setField(3, nestedRow);
final TypeInformation<Row> typeInfo = Types.ROW(
new String[]{"a", "b", "c", "d"},
new TypeInformation[]{Types.STRING(), Types.INT(), Types.SQL_TIMESTAMP(),
Types.ROW(
new String[]{"d1", "d2"},
new TypeInformation[]{Types.STRING(), PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO}
)}
);
final CsvRowSerializationSchema schema = new CsvRowSerializationSchema(typeInfo);
byte[] result = schema.serialize(row);
String c1 = "abc";
String c2 = String.valueOf(34);
String c3 = "\"" + new Timestamp(currentMillis).toString() + "\"";
String c4 = "bc;" + new String("qwertyu".getBytes());
byte[] expect = (c1 + "," + c2 + "," + c3 + "," + c4 + "\n").getBytes();
assertArrayEquals(expect, result);
}
@Test
public void testCustomizedProperties() throws IOException {
final TypeInformation<Row> rowTypeInfo = Types.ROW(
new String[]{"a", "b", "c"},
new TypeInformation[]{Types.STRING(), Types.STRING(),
Types.ROW(
new String[]{"c1", "c2"},
new TypeInformation[]{Types.INT(), Types.STRING()}
)}
);
final Row row = new Row(3);
final Row nestedRow = new Row(2);
nestedRow.setField(0, 1);
nestedRow.setField(1, "zxv");
row.setField(0, "12*3'4");
row.setField(1, "a,bc");
row.setField(2, nestedRow);
final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowTypeInfo);
serializationSchema.setEscapeCharacter('*');
serializationSchema.setQuoteCharacter('\'');
serializationSchema.setArrayElementDelimiter(":");
byte[] result = serializationSchema.serialize(row);
final String c1 = "'12**3''4'";
final String c2 = "'a,bc'";
final String c3 = "1:zxv";
byte[] expect = (c1 + "," + c2 + "," + c3 + "\n").getBytes();
assertArrayEquals(expect, result);
}
@Test
public void testCharset() throws UnsupportedEncodingException {
final TypeInformation<Row> rowTypeInfo = Types.ROW(
new String[]{"a"},
new TypeInformation[]{PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO}
);
final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowTypeInfo);
serializationSchema.setCharset("UTF-16");
final Row row = new Row(1);
row.setField(0, "123".getBytes(StandardCharsets.UTF_16));
byte[] result = serializationSchema.serialize(row);
byte[] expect = "123\n".getBytes();
assertArrayEquals(expect, result);
}
@Test
public void testSerializationOfTwoRows() throws IOException {
final TypeInformation<Row> rowSchema = Types.ROW(
new String[] {"f1", "f2", "f3"},
new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()});
final Row row1 = new Row(3);
row1.setField(0, 1);
row1.setField(1, true);
row1.setField(2, "str");
final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowSchema);
final CsvRowDeserializationSchema deserializationSchema = new CsvRowDeserializationSchema(rowSchema);
byte[] bytes = serializationSchema.serialize(row1);
assertEquals(row1, deserializationSchema.deserialize(bytes));
final Row row2 = new Row(3);
row2.setField(0, 10);
row2.setField(1, false);
row2.setField(2, "newStr");
bytes = serializationSchema.serialize(row2);
assertEquals(row2, deserializationSchema.deserialize(bytes));
}
@Test
public void testNull() {
final TypeInformation<Row> rowTypeInfo = Types.ROW(
new String[]{"a"},
new TypeInformation[]{Types.STRING()}
);
final Row row = new Row(1);
row.setField(0, null);
final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowTypeInfo);
serializationSchema.setNullValue("123");
byte[] bytes = serializationSchema.serialize(row);
assertArrayEquals("123\n".getBytes(), bytes);
}
@Test(expected = RuntimeException.class)
public void testSerializeRowWithInvalidNumberOfFields() {
final TypeInformation<Row> rowSchema = Types.ROW(
new String[] {"f1", "f2", "f3"},
new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()});
final Row row = new Row(1);
row.setField(0, 1);
final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowSchema);
serializationSchema.serialize(row);
}
@Test(expected = RuntimeException.class)
public void testSerializeNestedRowInNestedRow() {
final TypeInformation<Row> rowSchema = Types.ROW(
new String[]{"a"},
new TypeInformation[]{Types.ROW(
new String[]{"a1"},
new TypeInformation[]{Types.ROW(
new String[]{"a11"},
new TypeInformation[]{Types.STRING()}
)}
)}
);
final Row row = new Row(1);
final Row nestedRow = new Row(1);
final Row doubleNestedRow = new Row(1);
doubleNestedRow.setField(0, "123");
nestedRow.setField(0, doubleNestedRow);
row.setField(0, nestedRow);
final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowSchema);
serializationSchema.serialize(row);
}
private Row serializeAndDeserialize(TypeInformation<Row> rowSchema, Row row) throws IOException {
final CsvRowSerializationSchema serializationSchema = new CsvRowSerializationSchema(rowSchema);
final CsvRowDeserializationSchema deserializationSchema = new CsvRowDeserializationSchema(rowSchema);
final byte[] bytes = serializationSchema.serialize(row);
return deserializationSchema.deserialize(bytes);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.descriptors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.types.Row;
import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Tests for the {@link Csv} descriptor.
*/
public class CsvTest extends DescriptorTestBase {
private static final TypeInformation<Row> SCHEMA = Types.ROW(
new String[]{"a", "b", "c"},
new TypeInformation[]{Types.STRING(), Types.INT(), Types.ROW(
new String[]{"a", "b", "c"},
new TypeInformation[]{Types.STRING(), Types.INT(), Types.BOOLEAN()}
)}
);
private static final Descriptor CUSTOM_DESCRIPTOR_WITH_SCHEMA = new Csv()
.schema(SCHEMA)
.fieldDelimiter(';')
.lineDelimiter("\r\n")
.quoteCharacter('\'')
.allowComments()
.ignoreParseErrors()
.arrayElementDelimiter("|")
.escapeCharacter('\\')
.nullLiteral("n/a");
private static final Descriptor MINIMAL_DESCRIPTOR_WITH_DERIVED_SCHEMA = new Csv()
.deriveSchema();
@Test(expected = ValidationException.class)
public void testInvalidAllowComments() {
addPropertyAndVerify(CUSTOM_DESCRIPTOR_WITH_SCHEMA, "format.allow-comments", "DDD");
}
@Test(expected = ValidationException.class)
public void testMissingSchema() {
removePropertyAndVerify(CUSTOM_DESCRIPTOR_WITH_SCHEMA, "format.schema");
}
@Test(expected = ValidationException.class)
public void testDuplicateSchema() {
// we add an additional schema
addPropertyAndVerify(
MINIMAL_DESCRIPTOR_WITH_DERIVED_SCHEMA,
"format.schema",
"ROW<a VARCHAR, b INT, c ROW<a VARCHAR, b INT, c BOOLEAN>>");
}
// --------------------------------------------------------------------------------------------
@Override
public List<Descriptor> descriptors() {
return Arrays.asList(CUSTOM_DESCRIPTOR_WITH_SCHEMA, MINIMAL_DESCRIPTOR_WITH_DERIVED_SCHEMA);
}
@Override
public List<Map<String, String>> properties() {
final Map<String, String> props1 = new HashMap<>();
props1.put("format.type", "csv");
props1.put("format.property-version", "1");
props1.put("format.schema", "ROW<a VARCHAR, b INT, c ROW<a VARCHAR, b INT, c BOOLEAN>>");
props1.put("format.field-delimiter", ";");
props1.put("format.line-delimiter", "\r\n");
props1.put("format.quote-character", "'");
props1.put("format.allow-comments", "true");
props1.put("format.ignore-parse-errors", "true");
props1.put("format.array-element-delimiter", "|");
props1.put("format.escape-character", "\\");
props1.put("format.null-literal", "n/a");
final Map<String, String> props2 = new HashMap<>();
props2.put("format.type", "csv");
props2.put("format.property-version", "1");
props2.put("format.derive-schema", "true");
return Arrays.asList(props1, props2);
}
@Override
public DescriptorValidator validator() {
return new CsvValidator();
}
}
......@@ -60,11 +60,6 @@ under the License.
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
</dependency>
<!-- JSON table descriptor testing -->
<dependency>
<groupId>org.apache.flink</groupId>
......
......@@ -46,7 +46,7 @@ import java.util.Objects;
* <p>Deserializes a <code>byte[]</code> message as a JSON object and reads
* the specified fields.
*
* <p>Failure during deserialization are forwarded as wrapped IOExceptions.
* <p>Failures during deserialization are forwarded as wrapped IOExceptions.
*/
@PublicEvolving
public class JsonRowDeserializationSchema implements DeserializationSchema<Row> {
......
......@@ -93,11 +93,11 @@ public class Json extends FormatDescriptor {
}
/**
* Derives the format schema from the table's schema described using {@link Schema}.
* Derives the format schema from the table's schema described.
*
* <p>This allows for defining schema information only once.
*
* <p>The names, types, and field order of the format are determined by the table's
* <p>The names, types, and fields' order of the format are determined by the table's
* schema. Time attributes are ignored if their origin is not a field. A "from" definition
* is interpreted as a field renaming in the format.
*/
......
......@@ -58,6 +58,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
......
......@@ -22,7 +22,7 @@ import java.util
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.{TableSchema, ValidationException}
import org.apache.flink.table.descriptors.CsvValidator._
import org.apache.flink.table.descriptors.OldCsvValidator._
import org.apache.flink.table.utils.TypeStringUtils
import scala.collection.mutable
......@@ -30,8 +30,18 @@ import scala.collection.JavaConverters._
/**
* Format descriptor for comma-separated values (CSV).
*
* Note: This descriptor describes Flink's non-standard CSV table source/sink. In the future, the
* descriptor will be replaced by a proper RFC-compliant version. Use the RFC-compliant `Csv`
* format in the dedicated `flink-formats/flink-csv` module instead when writing to Kafka. Use the
* old one for stream/batch filesystem operations for now.
*
* @deprecated Use the RFC-compliant `Csv` format in the dedicated
* `flink-formats/flink-csv` module instead when writing to Kafka.
*/
class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
@Deprecated
@deprecated
class OldCsv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
private var fieldDelim: Option[String] = None
private var lineDelim: Option[String] = None
......@@ -41,17 +51,13 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
private var commentPrefix: Option[String] = None
private var isIgnoreFirstLine: Option[Boolean] = None
private var lenient: Option[Boolean] = None
private var arrayElementDelim: Option[String] = None
private var escapeCharacter: Option[Character] = None
private var bytesCharset: Option[String] = None
private var derived: Option[Boolean] = None
/**
* Sets the field delimiter, "," by default.
*
* @param delim the field delimiter
*/
def fieldDelimiter(delim: String): Csv = {
def fieldDelimiter(delim: String): OldCsv = {
this.fieldDelim = Some(delim)
this
}
......@@ -61,7 +67,7 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
*
* @param delim the line delimiter
*/
def lineDelimiter(delim: String): Csv = {
def lineDelimiter(delim: String): OldCsv = {
this.lineDelim = Some(delim)
this
}
......@@ -74,7 +80,7 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
*
* @param schema the table schema
*/
def schema(schema: TableSchema): Csv = {
def schema(schema: TableSchema): OldCsv = {
this.schema.clear()
schema.getFieldNames.zip(schema.getFieldTypes).foreach { case (n, t) =>
field(n, t)
......@@ -90,7 +96,7 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
* @param fieldName the field name
* @param fieldType the type information of the field
*/
def field(fieldName: String, fieldType: TypeInformation[_]): Csv = {
def field(fieldName: String, fieldType: TypeInformation[_]): OldCsv = {
field(fieldName, TypeStringUtils.writeTypeInfo(fieldType))
this
}
......@@ -103,7 +109,7 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
* @param fieldName the field name
* @param fieldType the type string of the field
*/
def field(fieldName: String, fieldType: String): Csv = {
def field(fieldName: String, fieldType: String): OldCsv = {
if (schema.contains(fieldName)) {
throw new ValidationException(s"Duplicate field name $fieldName.")
}
......@@ -116,7 +122,7 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
*
* @param quote the quote character
*/
def quoteCharacter(quote: Character): Csv = {
def quoteCharacter(quote: Character): OldCsv = {
this.quoteCharacter = Option(quote)
this
}
......@@ -126,7 +132,7 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
*
* @param prefix the prefix to indicate comments
*/
def commentPrefix(prefix: String): Csv = {
def commentPrefix(prefix: String): OldCsv = {
this.commentPrefix = Option(prefix)
this
}
......@@ -134,7 +140,7 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
/**
* Ignore the first line. Not skip the first line by default.
*/
def ignoreFirstLine(): Csv = {
def ignoreFirstLine(): OldCsv = {
this.isIgnoreFirstLine = Some(true)
this
}
......@@ -142,45 +148,14 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
/**
* Skip records with parse error instead to fail. Throw an exception by default.
*/
def ignoreParseErrors(): Csv = {
def ignoreParseErrors(): OldCsv = {
this.lenient = Some(true)
this
}
/**
* Set delimiter of array elements, ';' by default.
*/
def arrayElementDelim(delim: String): Csv = {
this.arrayElementDelim = Some(delim)
this
}
/**
* Set escape character, none by default.
*/
def escapeCharacter(escape: Character): Csv = {
this.escapeCharacter = Some(escape)
this
}
/**
* Set charset of byte[], 'UTF-8' by defaut.
*/
def bytesCharset(charset: String): Csv = {
this.bytesCharset = Some(charset)
this
}
/**
* Set true if format schema derives from table schema.
*/
def derived(derived: Boolean): Csv = {
this.derived = Some(derived)
this
}
override protected def toFormatProperties: util.Map[String, String] = {
val properties = new DescriptorProperties()
fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _))
lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _))
......@@ -206,14 +181,20 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
/**
* Format descriptor for comma-separated values (CSV).
*/
object Csv {
object OldCsv {
/**
* Format descriptor for comma-separated values (CSV).
*
* @deprecated Use `new Csv()`.
* Note: This descriptor describes Flink's non-standard CSV table source/sink. In the future, the
* descriptor will be replaced by a proper RFC-compliant version. Use the RFC-compliant `Csv`
* format in the dedicated `flink-formats/flink-csv` module instead when writing to Kafka. Use
* the old one for stream/batch filesystem operations for now.
*
* @deprecated Use the RFC-compliant `Csv` format in the dedicated
* `flink-formats/flink-csv` module instead when writing to Kafka.
*/
@deprecated
def apply(): Csv = new Csv()
def apply(): OldCsv = new OldCsv()
}
......@@ -18,14 +18,18 @@
package org.apache.flink.table.descriptors
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.descriptors.CsvValidator._
import org.apache.flink.table.descriptors.OldCsvValidator._
import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE
/**
* Validator for [[Csv]].
* Validator for [[OldCsv]].
*
* @deprecated Use the RFC-compliant `Csv` format in the dedicated
* `flink-formats/flink-csv` module instead.
*/
class CsvValidator extends FormatDescriptorValidator {
@Deprecated
@deprecated
class OldCsvValidator extends FormatDescriptorValidator {
override def validate(properties: DescriptorProperties): Unit = {
super.validate(properties)
......@@ -34,30 +38,13 @@ class CsvValidator extends FormatDescriptorValidator {
properties.validateString(FORMAT_LINE_DELIMITER, true, 1)
properties.validateString(FORMAT_QUOTE_CHARACTER, true, 1, 1)
properties.validateString(FORMAT_COMMENT_PREFIX, true, 1)
properties.validateString(FORMAT_ARRAY_ELEMENT_DELIMITER, true, 1)
properties.validateString(FORMAT_ESCAPE_CHARACTER, true, 1, 1)
properties.validateString(FORMAT_BYTES_CHARSET, true, 1)
properties.validateString(FORMAT_NULL_VALUE, true, 1)
properties.validateBoolean(FORMAT_IGNORE_FIRST_LINE, true)
properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, true)
properties.validateBoolean(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA, true)
val tableSchema = properties.getOptionalTableSchema(FORMAT_FIELDS)
val derived = properties.getOptionalBoolean(
FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA).orElse(false)
if (derived && tableSchema.isPresent) {
throw new ValidationException(
"Format cannot define a schema and derive from the table's schema at the same time.")
} else if (tableSchema.isPresent) {
properties.validateTableSchema(FORMAT_FIELDS, false)
} else if (!tableSchema.isPresent && derived) {
throw new ValidationException(
"A definition of a schema or derive from the table's schema is required.")
}
properties.validateTableSchema(FORMAT_FIELDS, false)
}
}
object CsvValidator {
object OldCsvValidator {
val FORMAT_TYPE_VALUE = "csv"
val FORMAT_FIELD_DELIMITER = "format.field-delimiter"
......@@ -67,8 +54,4 @@ object CsvValidator {
val FORMAT_IGNORE_FIRST_LINE = "format.ignore-first-line"
val FORMAT_IGNORE_PARSE_ERRORS = "format.ignore-parse-errors"
val FORMAT_FIELDS = "format.fields"
val FORMAT_ARRAY_ELEMENT_DELIMITER = "format.array-element-delimiter"
val FORMAT_ESCAPE_CHARACTER = "format.escape-character"
val FORMAT_BYTES_CHARSET = "format.bytes-charset"
val FORMAT_NULL_VALUE = "format.null-value"
}
......@@ -22,7 +22,7 @@ import java.util
import org.apache.flink.table.api.TableException
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
import org.apache.flink.table.descriptors.CsvValidator._
import org.apache.flink.table.descriptors.OldCsvValidator._
import org.apache.flink.table.descriptors.FileSystemValidator._
import org.apache.flink.table.descriptors.FormatDescriptorValidator._
import org.apache.flink.table.descriptors.SchemaValidator._
......@@ -69,7 +69,7 @@ abstract class CsvTableSinkFactoryBase extends TableFactory {
// validate
new FileSystemValidator().validate(params)
new CsvValidator().validate(params)
new OldCsvValidator().validate(params)
new SchemaValidator(
isStreaming,
supportsSourceTimestamps = false,
......
......@@ -22,7 +22,7 @@ import java.util
import org.apache.flink.table.api.TableException
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
import org.apache.flink.table.descriptors.CsvValidator._
import org.apache.flink.table.descriptors.OldCsvValidator._
import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE}
import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA
......@@ -74,7 +74,7 @@ abstract class CsvTableSourceFactoryBase extends TableFactory {
// validate
new FileSystemValidator().validate(params)
new CsvValidator().validate(params)
new OldCsvValidator().validate(params)
new SchemaValidator(
isStreaming,
supportsSourceTimestamps = false,
......
......@@ -28,9 +28,9 @@ import org.junit.Test
import scala.collection.JavaConverters._
/**
* Tests for [[Csv]].
* Tests for [[OldCsv]].
*/
class CsvTest extends DescriptorTestBase {
class OldCsvTest extends DescriptorTestBase {
@Test(expected = classOf[ValidationException])
def testInvalidType(): Unit = {
......@@ -47,20 +47,10 @@ class CsvTest extends DescriptorTestBase {
addPropertyAndVerify(descriptors().get(0), "format.quote-character", "qq")
}
@Test(expected = classOf[ValidationException])
def testTwoSchemas(): Unit = {
addPropertyAndVerify(descriptors().get(0), "format.derive-schema", "true")
}
@Test
def testOneSchema(): Unit = {
addPropertyAndVerify(descriptors().get(0), "format.derive-schema", "false")
}
// ----------------------------------------------------------------------------------------------
override def descriptors(): util.List[Descriptor] = {
val desc1 = Csv()
val desc1 = OldCsv()
.field("field1", "STRING")
.field("field2", Types.SQL_TIMESTAMP)
.field("field3", TypeExtractor.createTypeInfo(classOf[Class[_]]))
......@@ -69,7 +59,7 @@ class CsvTest extends DescriptorTestBase {
Array[TypeInformation[_]](Types.INT, Types.STRING)))
.lineDelimiter("^")
val desc2 = Csv()
val desc2 = OldCsv()
.schema(new TableSchema(
Array[String]("test", "row"),
Array[TypeInformation[_]](Types.INT, Types.STRING)))
......@@ -107,6 +97,6 @@ class CsvTest extends DescriptorTestBase {
}
override def validator(): DescriptorValidator = {
new CsvValidator()
new OldCsvValidator()
}
}
......@@ -59,7 +59,7 @@ class TableDescriptorTest extends TableTestBase {
val connector = FileSystem()
.path("/path/to/csv")
val format = Csv()
val format = OldCsv()
.field("myfield", Types.STRING)
.field("myfield2", Types.INT)
.field("myfield3", Types.MAP(Types.STRING, Types.INT))
......
......@@ -26,7 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.table.api.TableSchema
import org.apache.flink.table.catalog._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
import org.apache.flink.table.descriptors.{OldCsv, FileSystem, Schema}
import org.apache.flink.table.sources.{BatchTableSource, CsvTableSource}
object CommonTestData {
......@@ -71,7 +71,7 @@ object CommonTestData {
val tempFilePath1 = writeToTempFile(csvRecord1.mkString("\n"), "csv-test1", "tmp")
val connDesc1 = FileSystem().path(tempFilePath1)
val formatDesc1 = Csv()
val formatDesc1 = OldCsv()
.field("a", Types.INT)
.field("b", Types.LONG)
.field("c", Types.STRING)
......@@ -110,7 +110,7 @@ object CommonTestData {
val tempFilePath2 = writeToTempFile(csvRecord2.mkString("\n"), "csv-test2", "tmp")
val connDesc2 = FileSystem().path(tempFilePath2)
val formatDesc2 = Csv()
val formatDesc2 = OldCsv()
.field("d", Types.INT)
.field("e", Types.LONG)
.field("f", Types.INT)
......@@ -135,7 +135,7 @@ object CommonTestData {
val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp")
val connDesc3 = FileSystem().path(tempFilePath3)
val formatDesc3 = Csv()
val formatDesc3 = OldCsv()
.field("x", Types.INT)
.field("y", Types.LONG)
.field("z", Types.STRING)
......
......@@ -66,6 +66,8 @@ flink-filesystems/flink-s3-fs-hadoop,\
flink-filesystems/flink-s3-fs-presto,\
flink-formats/flink-avro,\
flink-formats/flink-parquet,\
flink-formats/flink-json,\
flink-formats/flink-csv,\
flink-connectors/flink-hbase,\
flink-connectors/flink-hcatalog,\
flink-connectors/flink-hadoop-compatibility,\
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册